tillrohrmann commented on a change in pull request #13722:
URL: https://github.com/apache/flink/pull/13722#discussion_r511376073



##########
File path: 
flink-runtime/src/main/java/org/apache/flink/runtime/jobmaster/slotpool/DefaultDeclarativeSlotPool.java
##########
@@ -0,0 +1,462 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.runtime.jobmaster.slotpool;
+
+import org.apache.flink.annotation.VisibleForTesting;
+import org.apache.flink.api.common.time.Time;
+import org.apache.flink.runtime.clusterframework.types.AllocationID;
+import org.apache.flink.runtime.clusterframework.types.ResourceID;
+import org.apache.flink.runtime.clusterframework.types.ResourceProfile;
+import org.apache.flink.runtime.jobmanager.slots.TaskManagerGateway;
+import org.apache.flink.runtime.jobmaster.SlotInfo;
+import org.apache.flink.runtime.messages.Acknowledge;
+import org.apache.flink.runtime.slots.ResourceRequirement;
+import org.apache.flink.runtime.taskexecutor.slot.SlotOffer;
+import org.apache.flink.runtime.taskmanager.TaskManagerLocation;
+import org.apache.flink.util.FlinkException;
+import org.apache.flink.util.Preconditions;
+
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import javax.annotation.Nonnull;
+import javax.annotation.Nullable;
+
+import java.util.ArrayList;
+import java.util.Collection;
+import java.util.Collections;
+import java.util.HashMap;
+import java.util.Iterator;
+import java.util.Map;
+import java.util.Optional;
+import java.util.concurrent.CompletableFuture;
+import java.util.function.Consumer;
+import java.util.stream.Collectors;
+
+/**
+ * Default {@link DeclarativeSlotPool} implementation.
+ *
+ * <p>The implementation collects the current resource requirements and 
declares them
+ * at the ResourceManager. Whenever new slots are offered, the slot pool 
compares the
+ * offered slots to the set of available and required resources and only 
accepts those
+ * slots which are required.
+ *
+ * <p>Slots which are released won't be returned directly to their owners. 
Instead,
+ * the slot pool implementation will only return them after the 
idleSlotTimeout has
+ * been exceeded by a free slot.
+ *
+ * <p>The slot pool will call {@link #notifyNewSlots} whenever newly offered 
slots are
+ * accepted or if an allocated slot should become free after it is being
+ * {@link #freeReservedSlot released}.
+ */
+public class DefaultDeclarativeSlotPool implements DeclarativeSlotPool {
+
+       private static final Logger LOG = 
LoggerFactory.getLogger(DefaultDeclarativeSlotPool.class);
+
+       private final Consumer<? super Collection<ResourceRequirement>> 
notifyNewResourceRequirements;
+
+       private final Consumer<? super Collection<? extends PhysicalSlot>> 
notifyNewSlots;
+
+       private final Time idleSlotTimeout;
+       private final Time rpcTimeout;
+
+       private final AllocatedSlotPool slotPool;
+
+       private final Map<AllocationID, ResourceProfile> 
slotToRequirementProfileMappings;
+
+       private ResourceCounter totalResourceRequirements;
+
+       private ResourceCounter fulfilledResourceRequirements;
+
+       public DefaultDeclarativeSlotPool(
+               AllocatedSlotPool slotPool,
+               Consumer<? super Collection<ResourceRequirement>> 
notifyNewResourceRequirements,
+               Consumer<? super Collection<? extends PhysicalSlot>> 
notifyNewSlots,
+               Time idleSlotTimeout,
+               Time rpcTimeout) {
+
+               this.slotPool = slotPool;
+               this.notifyNewResourceRequirements = 
notifyNewResourceRequirements;
+               this.notifyNewSlots = notifyNewSlots;
+               this.idleSlotTimeout = idleSlotTimeout;
+               this.rpcTimeout = rpcTimeout;
+               this.totalResourceRequirements = ResourceCounter.empty();
+               this.fulfilledResourceRequirements = ResourceCounter.empty();
+               this.slotToRequirementProfileMappings = new HashMap<>();
+       }
+
+       @Override
+       public void increaseResourceRequirementsBy(ResourceCounter increment) {
+               totalResourceRequirements = 
totalResourceRequirements.add(increment);
+
+               declareResourceRequirements();
+       }
+
+       @Override
+       public void decreaseResourceRequirementsBy(ResourceCounter decrement) {
+               totalResourceRequirements = 
totalResourceRequirements.subtract(decrement);
+
+               declareResourceRequirements();
+       }
+
+       private void declareResourceRequirements() {
+               notifyNewResourceRequirements.accept(getResourceRequirements());
+       }
+
+       @Override
+       public Collection<ResourceRequirement> getResourceRequirements() {
+               final Collection<ResourceRequirement> 
currentResourceRequirements = new ArrayList<>();
+
+               for (Map.Entry<ResourceProfile, Integer> resourceRequirement : 
totalResourceRequirements.getResourcesWithCount()) {
+                       
currentResourceRequirements.add(ResourceRequirement.create(resourceRequirement.getKey(),
 resourceRequirement.getValue()));
+               }
+
+               return currentResourceRequirements;
+       }
+
+       @Override
+       public Collection<SlotOffer> offerSlots(
+               Collection<? extends SlotOffer> offers,
+               TaskManagerLocation taskManagerLocation,
+               TaskManagerGateway taskManagerGateway,
+               long currentTime) {
+
+               LOG.debug("Received {} slot offers from TaskExecutor {}.", 
offers.size(), taskManagerLocation);
+               final Collection<SlotOffer> acceptedSlotOffers = new 
ArrayList<>();
+               final Collection<SlotOffer> candidates = new ArrayList<>();
+
+               // filter out already accepted offers
+               for (SlotOffer offer : offers) {
+                       final AllocationID allocationId = 
offer.getAllocationId();
+                       if (slotPool.containsSlot(allocationId)) {
+                               acceptedSlotOffers.add(offer);
+                       } else {
+                               candidates.add(offer);
+                       }
+               }
+
+               final Collection<SlotOfferMatching> matchings = 
matchOffersWithOutstandingRequirements(candidates);
+
+               final Collection<AllocatedSlot> acceptedSlots = new 
ArrayList<>();
+               ResourceCounter acceptedResources = ResourceCounter.empty();
+
+               for (SlotOfferMatching matching : matchings) {
+                       if (matching.getMatching().isPresent()) {
+                               final ResourceProfile matchedResourceProfile = 
matching.getMatching().get();
+
+                               final AllocatedSlot allocatedSlot = 
createAllocatedSlot(
+                                       matching.getSlotOffer(),
+                                       taskManagerLocation,
+                                       taskManagerGateway);
+
+                               acceptedSlots.add(allocatedSlot);
+                               acceptedSlotOffers.add(matching.getSlotOffer());
+
+                               acceptedResources = 
acceptedResources.add(matchedResourceProfile, 1);
+
+                               // store the ResourceProfile against which the 
given slot has matched for future book-keeping
+                               
slotToRequirementProfileMappings.put(allocatedSlot.getAllocationId(), 
matchedResourceProfile);

Review comment:
       Maybe: Could be factored out as well.

##########
File path: 
flink-runtime/src/main/java/org/apache/flink/runtime/jobmaster/slotpool/DeclarativeSlotPool.java
##########
@@ -0,0 +1,152 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.runtime.jobmaster.slotpool;
+
+import org.apache.flink.runtime.clusterframework.types.AllocationID;
+import org.apache.flink.runtime.clusterframework.types.ResourceID;
+import org.apache.flink.runtime.clusterframework.types.ResourceProfile;
+import org.apache.flink.runtime.jobmanager.slots.TaskManagerGateway;
+import org.apache.flink.runtime.jobmaster.SlotInfo;
+import org.apache.flink.runtime.slots.ResourceRequirement;
+import org.apache.flink.runtime.taskexecutor.slot.SlotOffer;
+import org.apache.flink.runtime.taskmanager.TaskManagerLocation;
+
+import javax.annotation.Nullable;
+
+import java.util.Collection;
+import java.util.Optional;
+
+/**
+ * Slot pool interface which uses Flink's declarative resource management
+ * protocol to acquire resources.
+ *
+ * <p>In order to acquire new resources, users need to increase the required
+ * resources. Once they no longer need the resources, users need to decrease
+ * the required resources so that superfluous resources can be returned.
+ */
+public interface DeclarativeSlotPool {
+
+       /**
+        * Increases the resource requirements by increment.
+        *
+        * @param increment increment by which to increase the resource 
requirements
+        */
+       void increaseResourceRequirementsBy(ResourceCounter increment);
+
+       /**
+        * Decreases the resource requirements by decrement.
+        *
+        * @param decrement decrement by which to decrease the resource 
requirements
+        */
+       void decreaseResourceRequirementsBy(ResourceCounter decrement);
+
+       /**
+        * Returns the current resource requirements.
+        *
+        * @return current resource requirements
+        */
+       Collection<ResourceRequirement> getResourceRequirements();
+
+       /**
+        * Offers slots to this slot pool. The slot pool is free to accept as 
many slots as it
+        * needs.
+        *
+        * @param offers offers containing the list of slots offered to this 
slot pool
+        * @param taskManagerLocation taskManagerLocation is the location of 
the offering TaskExecutor
+        * @param taskManagerGateway taskManagerGateway is the gateway to talk 
to the offering TaskExecutor
+        * @param currentTime currentTime is the time the slots are being 
offered
+        * @return collection of accepted slots; the other slot offers are 
implicitly rejected
+        */
+       Collection<SlotOffer> offerSlots(Collection<? extends SlotOffer> 
offers, TaskManagerLocation taskManagerLocation, TaskManagerGateway 
taskManagerGateway, long currentTime);
+
+       /**
+        * Returns the slot information for all free slots (slots which can be 
allocated from the slot pool).
+        *
+        * @return collection of free slot information
+        */
+       Collection<SlotInfoWithUtilization> getFreeSlotsInformation();
+
+       /**
+        * Returns the slot information for all slots (free and allocated 
slots).
+        *
+        * @return collection of slot information
+        */
+       Collection<? extends SlotInfo> getAllSlotsInformation();
+
+       /**
+        * Reserves the free slot identified by the given allocationId and maps 
it to
+        * the given requiredSlotProfile.
+        *
+        * @param allocationId allocationId identifies the free slot to allocate
+        * @param requiredSlotProfile requiredSlotProfile specifying the 
resource requirement
+        * @return a PhysicalSlot representing the allocated slot
+        * @throws IllegalStateException if no free slot with the given 
allocationId exists or if
+        *                               the specified slot cannot fulfill the 
requiredSlotProfile
+        */
+       PhysicalSlot reserveFreeSlot(AllocationID allocationId, ResourceProfile 
requiredSlotProfile);
+
+       /**
+        * Frees the reserved a slot identified by the given allocationId. If 
no slot

Review comment:
       nit: remove "a"

##########
File path: 
flink-runtime/src/main/java/org/apache/flink/runtime/jobmaster/slotpool/DefaultDeclarativeSlotPool.java
##########
@@ -0,0 +1,462 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.runtime.jobmaster.slotpool;
+
+import org.apache.flink.annotation.VisibleForTesting;
+import org.apache.flink.api.common.time.Time;
+import org.apache.flink.runtime.clusterframework.types.AllocationID;
+import org.apache.flink.runtime.clusterframework.types.ResourceID;
+import org.apache.flink.runtime.clusterframework.types.ResourceProfile;
+import org.apache.flink.runtime.jobmanager.slots.TaskManagerGateway;
+import org.apache.flink.runtime.jobmaster.SlotInfo;
+import org.apache.flink.runtime.messages.Acknowledge;
+import org.apache.flink.runtime.slots.ResourceRequirement;
+import org.apache.flink.runtime.taskexecutor.slot.SlotOffer;
+import org.apache.flink.runtime.taskmanager.TaskManagerLocation;
+import org.apache.flink.util.FlinkException;
+import org.apache.flink.util.Preconditions;
+
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import javax.annotation.Nonnull;
+import javax.annotation.Nullable;
+
+import java.util.ArrayList;
+import java.util.Collection;
+import java.util.Collections;
+import java.util.HashMap;
+import java.util.Iterator;
+import java.util.Map;
+import java.util.Optional;
+import java.util.concurrent.CompletableFuture;
+import java.util.function.Consumer;
+import java.util.stream.Collectors;
+
+/**
+ * Default {@link DeclarativeSlotPool} implementation.
+ *
+ * <p>The implementation collects the current resource requirements and 
declares them
+ * at the ResourceManager. Whenever new slots are offered, the slot pool 
compares the
+ * offered slots to the set of available and required resources and only 
accepts those
+ * slots which are required.
+ *
+ * <p>Slots which are released won't be returned directly to their owners. 
Instead,
+ * the slot pool implementation will only return them after the 
idleSlotTimeout has
+ * been exceeded by a free slot.
+ *
+ * <p>The slot pool will call {@link #notifyNewSlots} whenever newly offered 
slots are
+ * accepted or if an allocated slot should become free after it is being
+ * {@link #freeReservedSlot released}.
+ */
+public class DefaultDeclarativeSlotPool implements DeclarativeSlotPool {
+
+       private static final Logger LOG = 
LoggerFactory.getLogger(DefaultDeclarativeSlotPool.class);
+
+       private final Consumer<? super Collection<ResourceRequirement>> 
notifyNewResourceRequirements;
+
+       private final Consumer<? super Collection<? extends PhysicalSlot>> 
notifyNewSlots;
+
+       private final Time idleSlotTimeout;
+       private final Time rpcTimeout;
+
+       private final AllocatedSlotPool slotPool;
+
+       private final Map<AllocationID, ResourceProfile> 
slotToRequirementProfileMappings;
+
+       private ResourceCounter totalResourceRequirements;
+
+       private ResourceCounter fulfilledResourceRequirements;
+
+       public DefaultDeclarativeSlotPool(
+               AllocatedSlotPool slotPool,
+               Consumer<? super Collection<ResourceRequirement>> 
notifyNewResourceRequirements,
+               Consumer<? super Collection<? extends PhysicalSlot>> 
notifyNewSlots,
+               Time idleSlotTimeout,
+               Time rpcTimeout) {
+
+               this.slotPool = slotPool;
+               this.notifyNewResourceRequirements = 
notifyNewResourceRequirements;
+               this.notifyNewSlots = notifyNewSlots;
+               this.idleSlotTimeout = idleSlotTimeout;
+               this.rpcTimeout = rpcTimeout;
+               this.totalResourceRequirements = ResourceCounter.empty();
+               this.fulfilledResourceRequirements = ResourceCounter.empty();
+               this.slotToRequirementProfileMappings = new HashMap<>();
+       }
+
+       @Override
+       public void increaseResourceRequirementsBy(ResourceCounter increment) {
+               totalResourceRequirements = 
totalResourceRequirements.add(increment);
+
+               declareResourceRequirements();
+       }
+
+       @Override
+       public void decreaseResourceRequirementsBy(ResourceCounter decrement) {
+               totalResourceRequirements = 
totalResourceRequirements.subtract(decrement);
+
+               declareResourceRequirements();
+       }
+
+       private void declareResourceRequirements() {
+               notifyNewResourceRequirements.accept(getResourceRequirements());
+       }
+
+       @Override
+       public Collection<ResourceRequirement> getResourceRequirements() {
+               final Collection<ResourceRequirement> 
currentResourceRequirements = new ArrayList<>();
+
+               for (Map.Entry<ResourceProfile, Integer> resourceRequirement : 
totalResourceRequirements.getResourcesWithCount()) {
+                       
currentResourceRequirements.add(ResourceRequirement.create(resourceRequirement.getKey(),
 resourceRequirement.getValue()));
+               }
+
+               return currentResourceRequirements;
+       }
+
+       @Override
+       public Collection<SlotOffer> offerSlots(
+               Collection<? extends SlotOffer> offers,
+               TaskManagerLocation taskManagerLocation,
+               TaskManagerGateway taskManagerGateway,
+               long currentTime) {
+
+               LOG.debug("Received {} slot offers from TaskExecutor {}.", 
offers.size(), taskManagerLocation);
+               final Collection<SlotOffer> acceptedSlotOffers = new 
ArrayList<>();
+               final Collection<SlotOffer> candidates = new ArrayList<>();
+
+               // filter out already accepted offers
+               for (SlotOffer offer : offers) {
+                       final AllocationID allocationId = 
offer.getAllocationId();
+                       if (slotPool.containsSlot(allocationId)) {
+                               acceptedSlotOffers.add(offer);
+                       } else {
+                               candidates.add(offer);
+                       }
+               }

Review comment:
       one could factor this out into a separate method.

##########
File path: 
flink-runtime/src/main/java/org/apache/flink/runtime/jobmaster/slotpool/DefaultDeclarativeSlotPool.java
##########
@@ -0,0 +1,462 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.runtime.jobmaster.slotpool;
+
+import org.apache.flink.annotation.VisibleForTesting;
+import org.apache.flink.api.common.time.Time;
+import org.apache.flink.runtime.clusterframework.types.AllocationID;
+import org.apache.flink.runtime.clusterframework.types.ResourceID;
+import org.apache.flink.runtime.clusterframework.types.ResourceProfile;
+import org.apache.flink.runtime.jobmanager.slots.TaskManagerGateway;
+import org.apache.flink.runtime.jobmaster.SlotInfo;
+import org.apache.flink.runtime.messages.Acknowledge;
+import org.apache.flink.runtime.slots.ResourceRequirement;
+import org.apache.flink.runtime.taskexecutor.slot.SlotOffer;
+import org.apache.flink.runtime.taskmanager.TaskManagerLocation;
+import org.apache.flink.util.FlinkException;
+import org.apache.flink.util.Preconditions;
+
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import javax.annotation.Nonnull;
+import javax.annotation.Nullable;
+
+import java.util.ArrayList;
+import java.util.Collection;
+import java.util.Collections;
+import java.util.HashMap;
+import java.util.Iterator;
+import java.util.Map;
+import java.util.Optional;
+import java.util.concurrent.CompletableFuture;
+import java.util.function.Consumer;
+import java.util.stream.Collectors;
+
+/**
+ * Default {@link DeclarativeSlotPool} implementation.
+ *
+ * <p>The implementation collects the current resource requirements and 
declares them
+ * at the ResourceManager. Whenever new slots are offered, the slot pool 
compares the
+ * offered slots to the set of available and required resources and only 
accepts those
+ * slots which are required.
+ *
+ * <p>Slots which are released won't be returned directly to their owners. 
Instead,
+ * the slot pool implementation will only return them after the 
idleSlotTimeout has
+ * been exceeded by a free slot.
+ *
+ * <p>The slot pool will call {@link #notifyNewSlots} whenever newly offered 
slots are
+ * accepted or if an allocated slot should become free after it is being
+ * {@link #freeReservedSlot released}.
+ */
+public class DefaultDeclarativeSlotPool implements DeclarativeSlotPool {
+
+       private static final Logger LOG = 
LoggerFactory.getLogger(DefaultDeclarativeSlotPool.class);
+
+       private final Consumer<? super Collection<ResourceRequirement>> 
notifyNewResourceRequirements;
+
+       private final Consumer<? super Collection<? extends PhysicalSlot>> 
notifyNewSlots;
+
+       private final Time idleSlotTimeout;
+       private final Time rpcTimeout;
+
+       private final AllocatedSlotPool slotPool;
+
+       private final Map<AllocationID, ResourceProfile> 
slotToRequirementProfileMappings;
+
+       private ResourceCounter totalResourceRequirements;
+
+       private ResourceCounter fulfilledResourceRequirements;
+
+       public DefaultDeclarativeSlotPool(
+               AllocatedSlotPool slotPool,
+               Consumer<? super Collection<ResourceRequirement>> 
notifyNewResourceRequirements,
+               Consumer<? super Collection<? extends PhysicalSlot>> 
notifyNewSlots,
+               Time idleSlotTimeout,
+               Time rpcTimeout) {
+
+               this.slotPool = slotPool;
+               this.notifyNewResourceRequirements = 
notifyNewResourceRequirements;
+               this.notifyNewSlots = notifyNewSlots;
+               this.idleSlotTimeout = idleSlotTimeout;
+               this.rpcTimeout = rpcTimeout;
+               this.totalResourceRequirements = ResourceCounter.empty();
+               this.fulfilledResourceRequirements = ResourceCounter.empty();
+               this.slotToRequirementProfileMappings = new HashMap<>();
+       }
+
+       @Override
+       public void increaseResourceRequirementsBy(ResourceCounter increment) {
+               totalResourceRequirements = 
totalResourceRequirements.add(increment);
+
+               declareResourceRequirements();
+       }
+
+       @Override
+       public void decreaseResourceRequirementsBy(ResourceCounter decrement) {
+               totalResourceRequirements = 
totalResourceRequirements.subtract(decrement);
+
+               declareResourceRequirements();
+       }
+
+       private void declareResourceRequirements() {
+               notifyNewResourceRequirements.accept(getResourceRequirements());
+       }
+
+       @Override
+       public Collection<ResourceRequirement> getResourceRequirements() {
+               final Collection<ResourceRequirement> 
currentResourceRequirements = new ArrayList<>();
+
+               for (Map.Entry<ResourceProfile, Integer> resourceRequirement : 
totalResourceRequirements.getResourcesWithCount()) {
+                       
currentResourceRequirements.add(ResourceRequirement.create(resourceRequirement.getKey(),
 resourceRequirement.getValue()));
+               }
+
+               return currentResourceRequirements;
+       }
+
+       @Override
+       public Collection<SlotOffer> offerSlots(
+               Collection<? extends SlotOffer> offers,
+               TaskManagerLocation taskManagerLocation,
+               TaskManagerGateway taskManagerGateway,
+               long currentTime) {
+
+               LOG.debug("Received {} slot offers from TaskExecutor {}.", 
offers.size(), taskManagerLocation);
+               final Collection<SlotOffer> acceptedSlotOffers = new 
ArrayList<>();
+               final Collection<SlotOffer> candidates = new ArrayList<>();
+
+               // filter out already accepted offers
+               for (SlotOffer offer : offers) {
+                       final AllocationID allocationId = 
offer.getAllocationId();
+                       if (slotPool.containsSlot(allocationId)) {
+                               acceptedSlotOffers.add(offer);
+                       } else {
+                               candidates.add(offer);
+                       }
+               }
+
+               final Collection<SlotOfferMatching> matchings = 
matchOffersWithOutstandingRequirements(candidates);
+
+               final Collection<AllocatedSlot> acceptedSlots = new 
ArrayList<>();
+               ResourceCounter acceptedResources = ResourceCounter.empty();
+
+               for (SlotOfferMatching matching : matchings) {
+                       if (matching.getMatching().isPresent()) {
+                               final ResourceProfile matchedResourceProfile = 
matching.getMatching().get();
+
+                               final AllocatedSlot allocatedSlot = 
createAllocatedSlot(
+                                       matching.getSlotOffer(),
+                                       taskManagerLocation,
+                                       taskManagerGateway);
+
+                               acceptedSlots.add(allocatedSlot);
+                               acceptedSlotOffers.add(matching.getSlotOffer());
+
+                               acceptedResources = 
acceptedResources.add(matchedResourceProfile, 1);
+
+                               // store the ResourceProfile against which the 
given slot has matched for future book-keeping
+                               
slotToRequirementProfileMappings.put(allocatedSlot.getAllocationId(), 
matchedResourceProfile);
+                       }
+               }
+
+               slotPool.addSlots(acceptedSlots, currentTime);
+               increaseAvailableResources(acceptedResources);
+
+               if (!acceptedSlots.isEmpty()) {
+                       notifyNewSlots.accept(acceptedSlots);
+               }
+
+               return acceptedSlotOffers;
+       }
+
+       private Collection<SlotOfferMatching> 
matchOffersWithOutstandingRequirements(Collection<SlotOffer> slotOffers) {
+               ResourceCounter unfulfilledResources = 
calculateUnfulfilledResources();
+
+               final Collection<SlotOfferMatching> matching = new 
ArrayList<>();
+
+               for (SlotOffer slotOffer : slotOffers) {
+                       ResourceProfile matchingResourceProfile = null;
+
+                       if 
(unfulfilledResources.containsResource(slotOffer.getResourceProfile())) {
+                               unfulfilledResources = 
unfulfilledResources.subtract(slotOffer.getResourceProfile(), 1);
+
+                               // use the profile of the slot offer since it 
is a direct match against the requirement
+                               matchingResourceProfile = 
slotOffer.getResourceProfile();
+                       } else {
+                               for (ResourceProfile unfulfilledResource : 
unfulfilledResources.getResources()) {
+                                       if 
(slotOffer.getResourceProfile().isMatching(unfulfilledResource)) {
+                                               matchingResourceProfile = 
unfulfilledResource;
+                                               break;
+                                       }
+                               }
+                       }
+
+                       final SlotOfferMatching slotOfferMatching;
+                       if (matchingResourceProfile != null) {
+                               slotOfferMatching = 
SlotOfferMatching.createMatching(slotOffer, matchingResourceProfile);
+                       } else {
+                               slotOfferMatching = 
SlotOfferMatching.createMismatch(slotOffer);
+                       }
+
+                       matching.add(slotOfferMatching);
+               }
+
+               return matching;
+       }
+
+       @VisibleForTesting
+       ResourceCounter calculateUnfulfilledResources() {
+               return 
totalResourceRequirements.subtract(fulfilledResourceRequirements);
+       }
+
+       private AllocatedSlot createAllocatedSlot(
+               SlotOffer slotOffer,
+               TaskManagerLocation taskManagerLocation,
+               TaskManagerGateway taskManagerGateway) {
+               return new AllocatedSlot(
+                       slotOffer.getAllocationId(),
+                       taskManagerLocation,
+                       slotOffer.getSlotIndex(),
+                       slotOffer.getResourceProfile(),
+                       taskManagerGateway);
+       }
+
+       private void increaseAvailableResources(ResourceCounter 
acceptedResources) {
+               fulfilledResourceRequirements = 
fulfilledResourceRequirements.add(acceptedResources);
+       }
+
+       @Nonnull
+       private ResourceProfile getMatchingResourceProfile(AllocationID 
allocationId) {
+               return 
Preconditions.checkNotNull(slotToRequirementProfileMappings.get(allocationId), 
"No matching resource profile found for %s", allocationId);
+       }
+
+       @Override
+       public PhysicalSlot reserveFreeSlot(AllocationID allocationId, 
ResourceProfile requiredSlotProfile) {
+               final AllocatedSlot allocatedSlot = 
slotPool.reserveFreeSlot(allocationId);
+
+               
Preconditions.checkState(allocatedSlot.getResourceProfile().isMatching(requiredSlotProfile),
 "");
+
+               ResourceProfile previouslyMatchedResourceProfile = 
Preconditions.checkNotNull(slotToRequirementProfileMappings.get(allocationId));
+
+               updateSlotToRequirementProfileMapping(allocationId, 
requiredSlotProfile);
+               // slots can be reserved for a requirement that is no in line 
with the mapping we computed when the slot was

Review comment:
       typo: `is not in line`

##########
File path: 
flink-runtime/src/main/java/org/apache/flink/runtime/jobmaster/slotpool/DefaultDeclarativeSlotPool.java
##########
@@ -0,0 +1,462 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.runtime.jobmaster.slotpool;
+
+import org.apache.flink.annotation.VisibleForTesting;
+import org.apache.flink.api.common.time.Time;
+import org.apache.flink.runtime.clusterframework.types.AllocationID;
+import org.apache.flink.runtime.clusterframework.types.ResourceID;
+import org.apache.flink.runtime.clusterframework.types.ResourceProfile;
+import org.apache.flink.runtime.jobmanager.slots.TaskManagerGateway;
+import org.apache.flink.runtime.jobmaster.SlotInfo;
+import org.apache.flink.runtime.messages.Acknowledge;
+import org.apache.flink.runtime.slots.ResourceRequirement;
+import org.apache.flink.runtime.taskexecutor.slot.SlotOffer;
+import org.apache.flink.runtime.taskmanager.TaskManagerLocation;
+import org.apache.flink.util.FlinkException;
+import org.apache.flink.util.Preconditions;
+
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import javax.annotation.Nonnull;
+import javax.annotation.Nullable;
+
+import java.util.ArrayList;
+import java.util.Collection;
+import java.util.Collections;
+import java.util.HashMap;
+import java.util.Iterator;
+import java.util.Map;
+import java.util.Optional;
+import java.util.concurrent.CompletableFuture;
+import java.util.function.Consumer;
+import java.util.stream.Collectors;
+
+/**
+ * Default {@link DeclarativeSlotPool} implementation.
+ *
+ * <p>The implementation collects the current resource requirements and 
declares them
+ * at the ResourceManager. Whenever new slots are offered, the slot pool 
compares the
+ * offered slots to the set of available and required resources and only 
accepts those
+ * slots which are required.
+ *
+ * <p>Slots which are released won't be returned directly to their owners. 
Instead,
+ * the slot pool implementation will only return them after the 
idleSlotTimeout has
+ * been exceeded by a free slot.
+ *
+ * <p>The slot pool will call {@link #notifyNewSlots} whenever newly offered 
slots are
+ * accepted or if an allocated slot should become free after it is being
+ * {@link #freeReservedSlot released}.
+ */
+public class DefaultDeclarativeSlotPool implements DeclarativeSlotPool {
+
+       private static final Logger LOG = 
LoggerFactory.getLogger(DefaultDeclarativeSlotPool.class);
+
+       private final Consumer<? super Collection<ResourceRequirement>> 
notifyNewResourceRequirements;
+
+       private final Consumer<? super Collection<? extends PhysicalSlot>> 
notifyNewSlots;
+
+       private final Time idleSlotTimeout;
+       private final Time rpcTimeout;
+
+       private final AllocatedSlotPool slotPool;
+
+       private final Map<AllocationID, ResourceProfile> 
slotToRequirementProfileMappings;
+
+       private ResourceCounter totalResourceRequirements;
+
+       private ResourceCounter fulfilledResourceRequirements;
+
+       public DefaultDeclarativeSlotPool(
+               AllocatedSlotPool slotPool,
+               Consumer<? super Collection<ResourceRequirement>> 
notifyNewResourceRequirements,
+               Consumer<? super Collection<? extends PhysicalSlot>> 
notifyNewSlots,
+               Time idleSlotTimeout,
+               Time rpcTimeout) {
+
+               this.slotPool = slotPool;
+               this.notifyNewResourceRequirements = 
notifyNewResourceRequirements;
+               this.notifyNewSlots = notifyNewSlots;
+               this.idleSlotTimeout = idleSlotTimeout;
+               this.rpcTimeout = rpcTimeout;
+               this.totalResourceRequirements = ResourceCounter.empty();
+               this.fulfilledResourceRequirements = ResourceCounter.empty();
+               this.slotToRequirementProfileMappings = new HashMap<>();
+       }
+
+       @Override
+       public void increaseResourceRequirementsBy(ResourceCounter increment) {
+               totalResourceRequirements = 
totalResourceRequirements.add(increment);
+
+               declareResourceRequirements();
+       }
+
+       @Override
+       public void decreaseResourceRequirementsBy(ResourceCounter decrement) {
+               totalResourceRequirements = 
totalResourceRequirements.subtract(decrement);
+
+               declareResourceRequirements();
+       }
+
+       private void declareResourceRequirements() {
+               notifyNewResourceRequirements.accept(getResourceRequirements());
+       }
+
+       @Override
+       public Collection<ResourceRequirement> getResourceRequirements() {
+               final Collection<ResourceRequirement> 
currentResourceRequirements = new ArrayList<>();
+
+               for (Map.Entry<ResourceProfile, Integer> resourceRequirement : 
totalResourceRequirements.getResourcesWithCount()) {
+                       
currentResourceRequirements.add(ResourceRequirement.create(resourceRequirement.getKey(),
 resourceRequirement.getValue()));
+               }
+
+               return currentResourceRequirements;
+       }
+
+       @Override
+       public Collection<SlotOffer> offerSlots(
+               Collection<? extends SlotOffer> offers,
+               TaskManagerLocation taskManagerLocation,
+               TaskManagerGateway taskManagerGateway,
+               long currentTime) {
+
+               LOG.debug("Received {} slot offers from TaskExecutor {}.", 
offers.size(), taskManagerLocation);
+               final Collection<SlotOffer> acceptedSlotOffers = new 
ArrayList<>();
+               final Collection<SlotOffer> candidates = new ArrayList<>();
+
+               // filter out already accepted offers
+               for (SlotOffer offer : offers) {
+                       final AllocationID allocationId = 
offer.getAllocationId();
+                       if (slotPool.containsSlot(allocationId)) {
+                               acceptedSlotOffers.add(offer);
+                       } else {
+                               candidates.add(offer);
+                       }
+               }
+
+               final Collection<SlotOfferMatching> matchings = 
matchOffersWithOutstandingRequirements(candidates);
+
+               final Collection<AllocatedSlot> acceptedSlots = new 
ArrayList<>();
+               ResourceCounter acceptedResources = ResourceCounter.empty();
+
+               for (SlotOfferMatching matching : matchings) {
+                       if (matching.getMatching().isPresent()) {
+                               final ResourceProfile matchedResourceProfile = 
matching.getMatching().get();
+
+                               final AllocatedSlot allocatedSlot = 
createAllocatedSlot(
+                                       matching.getSlotOffer(),
+                                       taskManagerLocation,
+                                       taskManagerGateway);
+
+                               acceptedSlots.add(allocatedSlot);
+                               acceptedSlotOffers.add(matching.getSlotOffer());
+
+                               acceptedResources = 
acceptedResources.add(matchedResourceProfile, 1);
+
+                               // store the ResourceProfile against which the 
given slot has matched for future book-keeping
+                               
slotToRequirementProfileMappings.put(allocatedSlot.getAllocationId(), 
matchedResourceProfile);
+                       }
+               }
+
+               slotPool.addSlots(acceptedSlots, currentTime);
+               increaseAvailableResources(acceptedResources);
+
+               if (!acceptedSlots.isEmpty()) {
+                       notifyNewSlots.accept(acceptedSlots);
+               }
+
+               return acceptedSlotOffers;
+       }
+
+       private Collection<SlotOfferMatching> 
matchOffersWithOutstandingRequirements(Collection<SlotOffer> slotOffers) {
+               ResourceCounter unfulfilledResources = 
calculateUnfulfilledResources();
+
+               final Collection<SlotOfferMatching> matching = new 
ArrayList<>();
+
+               for (SlotOffer slotOffer : slotOffers) {
+                       ResourceProfile matchingResourceProfile = null;
+
+                       if 
(unfulfilledResources.containsResource(slotOffer.getResourceProfile())) {
+                               unfulfilledResources = 
unfulfilledResources.subtract(slotOffer.getResourceProfile(), 1);
+
+                               // use the profile of the slot offer since it 
is a direct match against the requirement
+                               matchingResourceProfile = 
slotOffer.getResourceProfile();
+                       } else {
+                               for (ResourceProfile unfulfilledResource : 
unfulfilledResources.getResources()) {
+                                       if 
(slotOffer.getResourceProfile().isMatching(unfulfilledResource)) {
+                                               matchingResourceProfile = 
unfulfilledResource;

Review comment:
       Shouldn't we also adjust `unfulfilledResources` here? Concretely, I 
think we should write `unfulfilledResources = 
unfulfilledResources.subtract(slotOffer.getResourceProfile(), 1);` or move it 
out of the two branches and unify it.

##########
File path: 
flink-runtime/src/main/java/org/apache/flink/runtime/jobmaster/slotpool/DefaultDeclarativeSlotPool.java
##########
@@ -0,0 +1,461 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.runtime.jobmaster.slotpool;
+
+import org.apache.flink.annotation.VisibleForTesting;
+import org.apache.flink.api.common.time.Time;
+import org.apache.flink.runtime.clusterframework.types.AllocationID;
+import org.apache.flink.runtime.clusterframework.types.ResourceID;
+import org.apache.flink.runtime.clusterframework.types.ResourceProfile;
+import org.apache.flink.runtime.jobmanager.slots.TaskManagerGateway;
+import org.apache.flink.runtime.jobmaster.SlotInfo;
+import org.apache.flink.runtime.messages.Acknowledge;
+import org.apache.flink.runtime.slots.ResourceRequirement;
+import org.apache.flink.runtime.taskexecutor.slot.SlotOffer;
+import org.apache.flink.runtime.taskmanager.TaskManagerLocation;
+import org.apache.flink.util.FlinkException;
+import org.apache.flink.util.Preconditions;
+
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import javax.annotation.Nonnull;
+import javax.annotation.Nullable;
+
+import java.util.ArrayList;
+import java.util.Collection;
+import java.util.Collections;
+import java.util.HashMap;
+import java.util.Iterator;
+import java.util.Map;
+import java.util.Optional;
+import java.util.concurrent.CompletableFuture;
+import java.util.function.Consumer;
+import java.util.stream.Collectors;
+
+/**
+ * Default {@link DeclarativeSlotPool} implementation.
+ *
+ * <p>The implementation collects the current resource requirements and 
declares them
+ * at the ResourceManager. Whenever new slots are offered, the slot pool 
compares the
+ * offered slots to the set of available and required resources and only 
accepts those
+ * slots which are required.
+ *
+ * <p>Slots which are released won't be returned directly to their owners. 
Instead,
+ * the slot pool implementation will only return them after the 
idleSlotTimeout has
+ * been exceeded by a free slot.
+ *
+ * <p>The slot pool will call {@link #notifyNewSlots} whenever newly offered 
slots are
+ * accepted or if an allocated slot should become free after it is being
+ * {@link #freeReservedSlot released}.
+ */
+public class DefaultDeclarativeSlotPool implements DeclarativeSlotPool {
+
+       private static final Logger LOG = 
LoggerFactory.getLogger(DefaultDeclarativeSlotPool.class);
+
+       private final Consumer<? super Collection<ResourceRequirement>> 
notifyNewResourceRequirements;
+
+       private final Consumer<? super Collection<? extends PhysicalSlot>> 
notifyNewSlots;
+
+       private final Time idleSlotTimeout;
+       private final Time rpcTimeout;
+
+       private final AllocatedSlotPool slotPool;
+
+       private final Map<AllocationID, ResourceProfile> 
slotToRequirementProfileMappings;

Review comment:
       Hmm, the `JobScopedResourceTracker` does a bit more than this simple map 
here.

##########
File path: 
flink-runtime/src/main/java/org/apache/flink/runtime/jobmaster/slotpool/DefaultDeclarativeSlotPool.java
##########
@@ -0,0 +1,461 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.runtime.jobmaster.slotpool;
+
+import org.apache.flink.annotation.VisibleForTesting;
+import org.apache.flink.api.common.time.Time;
+import org.apache.flink.runtime.clusterframework.types.AllocationID;
+import org.apache.flink.runtime.clusterframework.types.ResourceID;
+import org.apache.flink.runtime.clusterframework.types.ResourceProfile;
+import org.apache.flink.runtime.jobmanager.slots.TaskManagerGateway;
+import org.apache.flink.runtime.jobmaster.SlotInfo;
+import org.apache.flink.runtime.messages.Acknowledge;
+import org.apache.flink.runtime.slots.ResourceRequirement;
+import org.apache.flink.runtime.taskexecutor.slot.SlotOffer;
+import org.apache.flink.runtime.taskmanager.TaskManagerLocation;
+import org.apache.flink.util.FlinkException;
+import org.apache.flink.util.Preconditions;
+
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import javax.annotation.Nonnull;
+import javax.annotation.Nullable;
+
+import java.util.ArrayList;
+import java.util.Collection;
+import java.util.Collections;
+import java.util.HashMap;
+import java.util.Iterator;
+import java.util.Map;
+import java.util.Optional;
+import java.util.concurrent.CompletableFuture;
+import java.util.function.Consumer;
+import java.util.stream.Collectors;
+
+/**
+ * Default {@link DeclarativeSlotPool} implementation.
+ *
+ * <p>The implementation collects the current resource requirements and 
declares them
+ * at the ResourceManager. Whenever new slots are offered, the slot pool 
compares the
+ * offered slots to the set of available and required resources and only 
accepts those
+ * slots which are required.
+ *
+ * <p>Slots which are released won't be returned directly to their owners. 
Instead,
+ * the slot pool implementation will only return them after the 
idleSlotTimeout has
+ * been exceeded by a free slot.
+ *
+ * <p>The slot pool will call {@link #notifyNewSlots} whenever newly offered 
slots are
+ * accepted or if an allocated slot should become free after it is being
+ * {@link #freeReservedSlot released}.
+ */
+public class DefaultDeclarativeSlotPool implements DeclarativeSlotPool {
+
+       private static final Logger LOG = 
LoggerFactory.getLogger(DefaultDeclarativeSlotPool.class);
+
+       private final Consumer<? super Collection<ResourceRequirement>> 
notifyNewResourceRequirements;
+
+       private final Consumer<? super Collection<? extends PhysicalSlot>> 
notifyNewSlots;
+
+       private final Time idleSlotTimeout;
+       private final Time rpcTimeout;
+
+       private final AllocatedSlotPool slotPool;
+
+       private final Map<AllocationID, ResourceProfile> 
slotToRequirementProfileMappings;

Review comment:
       But I agree that the matching should be in sync between the RM and the 
JM side.

##########
File path: 
flink-runtime/src/main/java/org/apache/flink/runtime/jobmaster/slotpool/DefaultDeclarativeSlotPool.java
##########
@@ -0,0 +1,462 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.runtime.jobmaster.slotpool;
+
+import org.apache.flink.annotation.VisibleForTesting;
+import org.apache.flink.api.common.time.Time;
+import org.apache.flink.runtime.clusterframework.types.AllocationID;
+import org.apache.flink.runtime.clusterframework.types.ResourceID;
+import org.apache.flink.runtime.clusterframework.types.ResourceProfile;
+import org.apache.flink.runtime.jobmanager.slots.TaskManagerGateway;
+import org.apache.flink.runtime.jobmaster.SlotInfo;
+import org.apache.flink.runtime.messages.Acknowledge;
+import org.apache.flink.runtime.slots.ResourceRequirement;
+import org.apache.flink.runtime.taskexecutor.slot.SlotOffer;
+import org.apache.flink.runtime.taskmanager.TaskManagerLocation;
+import org.apache.flink.util.FlinkException;
+import org.apache.flink.util.Preconditions;
+
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import javax.annotation.Nonnull;
+import javax.annotation.Nullable;
+
+import java.util.ArrayList;
+import java.util.Collection;
+import java.util.Collections;
+import java.util.HashMap;
+import java.util.Iterator;
+import java.util.Map;
+import java.util.Optional;
+import java.util.concurrent.CompletableFuture;
+import java.util.function.Consumer;
+import java.util.stream.Collectors;
+
+/**
+ * Default {@link DeclarativeSlotPool} implementation.
+ *
+ * <p>The implementation collects the current resource requirements and 
declares them
+ * at the ResourceManager. Whenever new slots are offered, the slot pool 
compares the
+ * offered slots to the set of available and required resources and only 
accepts those
+ * slots which are required.
+ *
+ * <p>Slots which are released won't be returned directly to their owners. 
Instead,
+ * the slot pool implementation will only return them after the 
idleSlotTimeout has
+ * been exceeded by a free slot.
+ *
+ * <p>The slot pool will call {@link #notifyNewSlots} whenever newly offered 
slots are
+ * accepted or if an allocated slot should become free after it is being
+ * {@link #freeReservedSlot released}.
+ */
+public class DefaultDeclarativeSlotPool implements DeclarativeSlotPool {
+
+       private static final Logger LOG = 
LoggerFactory.getLogger(DefaultDeclarativeSlotPool.class);
+
+       private final Consumer<? super Collection<ResourceRequirement>> 
notifyNewResourceRequirements;
+
+       private final Consumer<? super Collection<? extends PhysicalSlot>> 
notifyNewSlots;
+
+       private final Time idleSlotTimeout;
+       private final Time rpcTimeout;
+
+       private final AllocatedSlotPool slotPool;
+
+       private final Map<AllocationID, ResourceProfile> 
slotToRequirementProfileMappings;
+
+       private ResourceCounter totalResourceRequirements;
+
+       private ResourceCounter fulfilledResourceRequirements;
+
+       public DefaultDeclarativeSlotPool(
+               AllocatedSlotPool slotPool,
+               Consumer<? super Collection<ResourceRequirement>> 
notifyNewResourceRequirements,
+               Consumer<? super Collection<? extends PhysicalSlot>> 
notifyNewSlots,
+               Time idleSlotTimeout,
+               Time rpcTimeout) {
+
+               this.slotPool = slotPool;
+               this.notifyNewResourceRequirements = 
notifyNewResourceRequirements;
+               this.notifyNewSlots = notifyNewSlots;
+               this.idleSlotTimeout = idleSlotTimeout;
+               this.rpcTimeout = rpcTimeout;
+               this.totalResourceRequirements = ResourceCounter.empty();
+               this.fulfilledResourceRequirements = ResourceCounter.empty();
+               this.slotToRequirementProfileMappings = new HashMap<>();
+       }
+
+       @Override
+       public void increaseResourceRequirementsBy(ResourceCounter increment) {
+               totalResourceRequirements = 
totalResourceRequirements.add(increment);
+
+               declareResourceRequirements();
+       }
+
+       @Override
+       public void decreaseResourceRequirementsBy(ResourceCounter decrement) {
+               totalResourceRequirements = 
totalResourceRequirements.subtract(decrement);
+
+               declareResourceRequirements();
+       }
+
+       private void declareResourceRequirements() {
+               notifyNewResourceRequirements.accept(getResourceRequirements());
+       }
+
+       @Override
+       public Collection<ResourceRequirement> getResourceRequirements() {
+               final Collection<ResourceRequirement> 
currentResourceRequirements = new ArrayList<>();
+
+               for (Map.Entry<ResourceProfile, Integer> resourceRequirement : 
totalResourceRequirements.getResourcesWithCount()) {
+                       
currentResourceRequirements.add(ResourceRequirement.create(resourceRequirement.getKey(),
 resourceRequirement.getValue()));
+               }
+
+               return currentResourceRequirements;
+       }
+
+       @Override
+       public Collection<SlotOffer> offerSlots(
+               Collection<? extends SlotOffer> offers,
+               TaskManagerLocation taskManagerLocation,
+               TaskManagerGateway taskManagerGateway,
+               long currentTime) {
+
+               LOG.debug("Received {} slot offers from TaskExecutor {}.", 
offers.size(), taskManagerLocation);
+               final Collection<SlotOffer> acceptedSlotOffers = new 
ArrayList<>();
+               final Collection<SlotOffer> candidates = new ArrayList<>();
+
+               // filter out already accepted offers
+               for (SlotOffer offer : offers) {
+                       final AllocationID allocationId = 
offer.getAllocationId();
+                       if (slotPool.containsSlot(allocationId)) {
+                               acceptedSlotOffers.add(offer);
+                       } else {
+                               candidates.add(offer);
+                       }
+               }
+
+               final Collection<SlotOfferMatching> matchings = 
matchOffersWithOutstandingRequirements(candidates);
+
+               final Collection<AllocatedSlot> acceptedSlots = new 
ArrayList<>();
+               ResourceCounter acceptedResources = ResourceCounter.empty();
+
+               for (SlotOfferMatching matching : matchings) {
+                       if (matching.getMatching().isPresent()) {
+                               final ResourceProfile matchedResourceProfile = 
matching.getMatching().get();
+
+                               final AllocatedSlot allocatedSlot = 
createAllocatedSlot(
+                                       matching.getSlotOffer(),
+                                       taskManagerLocation,
+                                       taskManagerGateway);
+
+                               acceptedSlots.add(allocatedSlot);
+                               acceptedSlotOffers.add(matching.getSlotOffer());
+
+                               acceptedResources = 
acceptedResources.add(matchedResourceProfile, 1);
+
+                               // store the ResourceProfile against which the 
given slot has matched for future book-keeping
+                               
slotToRequirementProfileMappings.put(allocatedSlot.getAllocationId(), 
matchedResourceProfile);
+                       }
+               }
+
+               slotPool.addSlots(acceptedSlots, currentTime);
+               increaseAvailableResources(acceptedResources);
+
+               if (!acceptedSlots.isEmpty()) {
+                       notifyNewSlots.accept(acceptedSlots);
+               }
+
+               return acceptedSlotOffers;
+       }
+
+       private Collection<SlotOfferMatching> 
matchOffersWithOutstandingRequirements(Collection<SlotOffer> slotOffers) {
+               ResourceCounter unfulfilledResources = 
calculateUnfulfilledResources();
+
+               final Collection<SlotOfferMatching> matching = new 
ArrayList<>();
+
+               for (SlotOffer slotOffer : slotOffers) {
+                       ResourceProfile matchingResourceProfile = null;
+
+                       if 
(unfulfilledResources.containsResource(slotOffer.getResourceProfile())) {
+                               unfulfilledResources = 
unfulfilledResources.subtract(slotOffer.getResourceProfile(), 1);
+
+                               // use the profile of the slot offer since it 
is a direct match against the requirement
+                               matchingResourceProfile = 
slotOffer.getResourceProfile();
+                       } else {
+                               for (ResourceProfile unfulfilledResource : 
unfulfilledResources.getResources()) {
+                                       if 
(slotOffer.getResourceProfile().isMatching(unfulfilledResource)) {
+                                               matchingResourceProfile = 
unfulfilledResource;
+                                               break;
+                                       }
+                               }
+                       }
+
+                       final SlotOfferMatching slotOfferMatching;
+                       if (matchingResourceProfile != null) {
+                               slotOfferMatching = 
SlotOfferMatching.createMatching(slotOffer, matchingResourceProfile);
+                       } else {
+                               slotOfferMatching = 
SlotOfferMatching.createMismatch(slotOffer);
+                       }
+
+                       matching.add(slotOfferMatching);
+               }
+
+               return matching;
+       }
+
+       @VisibleForTesting
+       ResourceCounter calculateUnfulfilledResources() {
+               return 
totalResourceRequirements.subtract(fulfilledResourceRequirements);
+       }
+
+       private AllocatedSlot createAllocatedSlot(
+               SlotOffer slotOffer,
+               TaskManagerLocation taskManagerLocation,
+               TaskManagerGateway taskManagerGateway) {
+               return new AllocatedSlot(
+                       slotOffer.getAllocationId(),
+                       taskManagerLocation,
+                       slotOffer.getSlotIndex(),
+                       slotOffer.getResourceProfile(),
+                       taskManagerGateway);
+       }
+
+       private void increaseAvailableResources(ResourceCounter 
acceptedResources) {
+               fulfilledResourceRequirements = 
fulfilledResourceRequirements.add(acceptedResources);
+       }
+
+       @Nonnull
+       private ResourceProfile getMatchingResourceProfile(AllocationID 
allocationId) {
+               return 
Preconditions.checkNotNull(slotToRequirementProfileMappings.get(allocationId), 
"No matching resource profile found for %s", allocationId);
+       }
+
+       @Override
+       public PhysicalSlot reserveFreeSlot(AllocationID allocationId, 
ResourceProfile requiredSlotProfile) {
+               final AllocatedSlot allocatedSlot = 
slotPool.reserveFreeSlot(allocationId);
+
+               
Preconditions.checkState(allocatedSlot.getResourceProfile().isMatching(requiredSlotProfile),
 "");
+
+               ResourceProfile previouslyMatchedResourceProfile = 
Preconditions.checkNotNull(slotToRequirementProfileMappings.get(allocationId));
+
+               updateSlotToRequirementProfileMapping(allocationId, 
requiredSlotProfile);
+               // slots can be reserved for a requirement that is no in line 
with the mapping we computed when the slot was
+               // offered, so we have to adjust the requirements accordingly 
to ensure we still request enough slots to
+               // be able to fulfill the total requirements
+               adjustRequirements(previouslyMatchedResourceProfile, 
requiredSlotProfile);
+
+               return allocatedSlot;
+       }
+
+       @Override
+       public Optional<ResourceCounter> freeReservedSlot(AllocationID 
allocationId, @Nullable Throwable cause, long currentTime) {
+               LOG.debug("Release slot {}.", allocationId);
+
+               final Optional<AllocatedSlot> releasedSlot = 
slotPool.freeReservedSlot(allocationId, currentTime);
+
+               Optional<ResourceCounter> previouslyFulfilledRequirement = 
releasedSlot.map(Collections::singleton).map(this::getFulfilledRequirements);
+
+               releasedSlot.ifPresent(allocatedSlot -> {
+                       releasePayload(Collections.singleton(allocatedSlot), 
cause);
+                       tryToFulfillResourceRequirement(allocatedSlot);
+                       
notifyNewSlots.accept(Collections.singletonList(allocatedSlot));
+               });
+
+               return previouslyFulfilledRequirement;
+       }
+
+       private void tryToFulfillResourceRequirement(AllocatedSlot 
allocatedSlot) {
+               final Collection<SlotOfferMatching> slotOfferMatchings = 
matchOffersWithOutstandingRequirements(Collections.singleton(allocatedSlotToSlotOffer(allocatedSlot)));
+
+               for (SlotOfferMatching slotOfferMatching : slotOfferMatchings) {
+                       if (slotOfferMatching.getMatching().isPresent()) {
+                               final ResourceProfile matchedResourceProfile = 
slotOfferMatching.getMatching().get();
+
+                               
updateSlotToRequirementProfileMapping(allocatedSlot.getAllocationId(), 
matchedResourceProfile);
+                       }
+               }
+       }
+
+       private void updateSlotToRequirementProfileMapping(AllocationID 
allocationId, ResourceProfile matchedResourceProfile) {
+               final ResourceProfile oldResourceProfile = 
Preconditions.checkNotNull(slotToRequirementProfileMappings.put(allocationId, 
matchedResourceProfile), "Expected slot profile matching to be non-empty.");
+
+               fulfilledResourceRequirements = 
fulfilledResourceRequirements.add(matchedResourceProfile, 1);
+               fulfilledResourceRequirements = 
fulfilledResourceRequirements.subtract(oldResourceProfile, 1);
+       }
+
+       private void adjustRequirements(ResourceProfile oldResourceProfile, 
ResourceProfile newResourceProfile) {
+               // slots can be reserved for a requirement that is no in line 
with the mapping we computed when the slot was
+               // offered, so we have to adjust the requirements accordingly 
to ensure we still request enough slots to
+               // be able to fulfill the total requirements
+               
decreaseResourceRequirementsBy(ResourceCounter.withResource(newResourceProfile, 
1));
+               
increaseResourceRequirementsBy(ResourceCounter.withResource(oldResourceProfile, 
1));
+       }

Review comment:
       Related to one of my previous comments: I am wondering whether this 
functionality is really necessary to keep in the `DefaultDeclarativeSlotPool`. 
I would hope that this is really only needed by the `FutureSlotPool`.

##########
File path: 
flink-runtime/src/main/java/org/apache/flink/runtime/jobmaster/slotpool/DefaultDeclarativeSlotPool.java
##########
@@ -0,0 +1,461 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.runtime.jobmaster.slotpool;
+
+import org.apache.flink.annotation.VisibleForTesting;
+import org.apache.flink.api.common.time.Time;
+import org.apache.flink.runtime.clusterframework.types.AllocationID;
+import org.apache.flink.runtime.clusterframework.types.ResourceID;
+import org.apache.flink.runtime.clusterframework.types.ResourceProfile;
+import org.apache.flink.runtime.jobmanager.slots.TaskManagerGateway;
+import org.apache.flink.runtime.jobmaster.SlotInfo;
+import org.apache.flink.runtime.messages.Acknowledge;
+import org.apache.flink.runtime.slots.ResourceRequirement;
+import org.apache.flink.runtime.taskexecutor.slot.SlotOffer;
+import org.apache.flink.runtime.taskmanager.TaskManagerLocation;
+import org.apache.flink.util.FlinkException;
+import org.apache.flink.util.Preconditions;
+
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import javax.annotation.Nonnull;
+import javax.annotation.Nullable;
+
+import java.util.ArrayList;
+import java.util.Collection;
+import java.util.Collections;
+import java.util.HashMap;
+import java.util.Iterator;
+import java.util.Map;
+import java.util.Optional;
+import java.util.concurrent.CompletableFuture;
+import java.util.function.Consumer;
+import java.util.stream.Collectors;
+
+/**
+ * Default {@link DeclarativeSlotPool} implementation.
+ *
+ * <p>The implementation collects the current resource requirements and 
declares them
+ * at the ResourceManager. Whenever new slots are offered, the slot pool 
compares the
+ * offered slots to the set of available and required resources and only 
accepts those
+ * slots which are required.
+ *
+ * <p>Slots which are released won't be returned directly to their owners. 
Instead,
+ * the slot pool implementation will only return them after the 
idleSlotTimeout has
+ * been exceeded by a free slot.
+ *
+ * <p>The slot pool will call {@link #notifyNewSlots} whenever newly offered 
slots are
+ * accepted or if an allocated slot should become free after it is being
+ * {@link #freeReservedSlot released}.
+ */
+public class DefaultDeclarativeSlotPool implements DeclarativeSlotPool {
+
+       private static final Logger LOG = 
LoggerFactory.getLogger(DefaultDeclarativeSlotPool.class);
+
+       private final Consumer<? super Collection<ResourceRequirement>> 
notifyNewResourceRequirements;
+
+       private final Consumer<? super Collection<? extends PhysicalSlot>> 
notifyNewSlots;
+
+       private final Time idleSlotTimeout;
+       private final Time rpcTimeout;
+
+       private final AllocatedSlotPool slotPool;
+
+       private final Map<AllocationID, ResourceProfile> 
slotToRequirementProfileMappings;
+
+       private ResourceCounter totalResourceRequirements;
+
+       private ResourceCounter fulfilledResourceRequirements;
+
+       public DefaultDeclarativeSlotPool(
+               AllocatedSlotPool slotPool,
+               Consumer<? super Collection<ResourceRequirement>> 
notifyNewResourceRequirements,
+               Consumer<? super Collection<? extends PhysicalSlot>> 
notifyNewSlots,
+               Time idleSlotTimeout,
+               Time rpcTimeout) {
+
+               this.slotPool = slotPool;
+               this.notifyNewResourceRequirements = 
notifyNewResourceRequirements;
+               this.notifyNewSlots = notifyNewSlots;
+               this.idleSlotTimeout = idleSlotTimeout;
+               this.rpcTimeout = rpcTimeout;
+               this.totalResourceRequirements = ResourceCounter.empty();
+               this.fulfilledResourceRequirements = ResourceCounter.empty();
+               this.slotToRequirementProfileMappings = new HashMap<>();
+       }
+
+       @Override
+       public void increaseResourceRequirementsBy(ResourceCounter increment) {
+               totalResourceRequirements = 
totalResourceRequirements.add(increment);
+
+               declareResourceRequirements();
+       }
+
+       @Override
+       public void decreaseResourceRequirementsBy(ResourceCounter decrement) {
+               totalResourceRequirements = 
totalResourceRequirements.subtract(decrement);
+
+               declareResourceRequirements();
+       }
+
+       private void declareResourceRequirements() {
+               notifyNewResourceRequirements.accept(getResourceRequirements());
+       }
+
+       @Override
+       public Collection<ResourceRequirement> getResourceRequirements() {
+               final Collection<ResourceRequirement> 
currentResourceRequirements = new ArrayList<>();
+
+               for (Map.Entry<ResourceProfile, Integer> resourceRequirement : 
totalResourceRequirements.getResourcesWithCount()) {
+                       
currentResourceRequirements.add(ResourceRequirement.create(resourceRequirement.getKey(),
 resourceRequirement.getValue()));
+               }
+
+               return currentResourceRequirements;
+       }
+
+       @Override
+       public Collection<SlotOffer> offerSlots(
+               Collection<? extends SlotOffer> offers,
+               TaskManagerLocation taskManagerLocation,
+               TaskManagerGateway taskManagerGateway,
+               long currentTime) {
+
+               LOG.debug("Received {} slot offers from TaskExecutor {}.", 
offers.size(), taskManagerLocation);
+               final Collection<SlotOffer> acceptedSlotOffers = new 
ArrayList<>();
+               final Collection<SlotOffer> candidates = new ArrayList<>();
+
+               // filter out already accepted offers
+               for (SlotOffer offer : offers) {
+                       final AllocationID allocationId = 
offer.getAllocationId();
+                       if (slotPool.containsSlot(allocationId)) {
+                               acceptedSlotOffers.add(offer);
+                       } else {
+                               candidates.add(offer);
+                       }
+               }
+
+               final Collection<SlotOfferMatching> matchings = 
matchOffersWithOutstandingRequirements(candidates);
+
+               final Collection<AllocatedSlot> acceptedSlots = new 
ArrayList<>();
+               ResourceCounter acceptedResources = ResourceCounter.empty();
+
+               for (SlotOfferMatching matching : matchings) {

Review comment:
       But then, we couldn't reuse `matchOffersWithOutstandingRequirements` in 
`tryToFulfillResourceRequirement`.

##########
File path: 
flink-runtime/src/main/java/org/apache/flink/runtime/jobmaster/slotpool/AutoRequirementDecrementingSlotPoolWrapper.java
##########
@@ -0,0 +1,115 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.runtime.jobmaster.slotpool;
+
+import org.apache.flink.runtime.clusterframework.types.AllocationID;
+import org.apache.flink.runtime.clusterframework.types.ResourceID;
+import org.apache.flink.runtime.clusterframework.types.ResourceProfile;
+import org.apache.flink.runtime.jobmanager.slots.TaskManagerGateway;
+import org.apache.flink.runtime.jobmaster.SlotInfo;
+import org.apache.flink.runtime.slots.ResourceRequirement;
+import org.apache.flink.runtime.taskexecutor.slot.SlotOffer;
+import org.apache.flink.runtime.taskmanager.TaskManagerLocation;
+
+import javax.annotation.Nullable;
+
+import java.util.Collection;
+
+/**
+ * A wrapper around a {@link DeclarativeSlotPool} that automatically 
decrements requirements if a slot is freed or
+ * released.
+ */
+class AutoRequirementDecrementingSlotPoolWrapper implements 
DeclarativeSlotPool {

Review comment:
       Yes, I think this might be cleaner.

##########
File path: 
flink-runtime/src/main/java/org/apache/flink/runtime/jobmaster/slotpool/AutoRequirementDecrementingSlotPoolWrapper.java
##########
@@ -0,0 +1,114 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.runtime.jobmaster.slotpool;
+
+import org.apache.flink.runtime.clusterframework.types.AllocationID;
+import org.apache.flink.runtime.clusterframework.types.ResourceID;
+import org.apache.flink.runtime.clusterframework.types.ResourceProfile;
+import org.apache.flink.runtime.jobmanager.slots.TaskManagerGateway;
+import org.apache.flink.runtime.jobmaster.SlotInfo;
+import org.apache.flink.runtime.slots.ResourceRequirement;
+import org.apache.flink.runtime.taskexecutor.slot.SlotOffer;
+import org.apache.flink.runtime.taskmanager.TaskManagerLocation;
+
+import javax.annotation.Nullable;
+
+import java.util.Collection;
+import java.util.Optional;
+
+/**
+ * A wrapper around a {@link DeclarativeSlotPool} that automatically 
decrements requirements if a slot is freed or
+ * released.
+ */
+class AutoRequirementDecrementingSlotPoolWrapper implements 
DeclarativeSlotPool {
+
+       private final DeclarativeSlotPool backingSlotPool;
+
+       AutoRequirementDecrementingSlotPoolWrapper(DeclarativeSlotPool 
backingSlotPool) {
+               this.backingSlotPool = backingSlotPool;
+       }
+
+       @Override
+       public void increaseResourceRequirementsBy(ResourceCounter increment) {
+               backingSlotPool.increaseResourceRequirementsBy(increment);
+       }
+
+       @Override
+       public void decreaseResourceRequirementsBy(ResourceCounter decrement) {
+               backingSlotPool.decreaseResourceRequirementsBy(decrement);
+       }
+
+       @Override
+       public Collection<ResourceRequirement> getResourceRequirements() {
+               return backingSlotPool.getResourceRequirements();
+       }
+
+       @Override
+       public Collection<SlotOffer> offerSlots(Collection<? extends SlotOffer> 
offers, TaskManagerLocation taskManagerLocation, TaskManagerGateway 
taskManagerGateway, long currentTime) {
+               return backingSlotPool.offerSlots(offers, taskManagerLocation, 
taskManagerGateway, currentTime);
+       }
+
+       @Override
+       public Collection<SlotInfoWithUtilization> getFreeSlotsInformation() {
+               return backingSlotPool.getFreeSlotsInformation();
+       }
+
+       @Override
+       public Collection<? extends SlotInfo> getAllSlotsInformation() {
+               return backingSlotPool.getAllSlotsInformation();
+       }
+
+       @Override
+       public PhysicalSlot reserveFreeSlot(AllocationID allocationId, 
ResourceProfile requiredSlotProfile) {
+               return backingSlotPool.reserveFreeSlot(allocationId, 
requiredSlotProfile);
+       }
+
+       @Override
+       public Optional<ResourceCounter> freeReservedSlot(AllocationID 
allocationId, @Nullable Throwable cause, long currentTime) {
+               Optional<ResourceCounter> previouslyFulfilledRequirement = 
backingSlotPool.freeReservedSlot(allocationId, cause, currentTime);
+               previouslyFulfilledRequirement
+                       
.ifPresent(backingSlotPool::decreaseResourceRequirementsBy);
+               return previouslyFulfilledRequirement;
+       }
+
+       @Override
+       public ResourceCounter releaseSlots(ResourceID owner, Exception cause) {
+               ResourceCounter previouslyFulfilledRequirement = 
backingSlotPool.releaseSlots(owner, cause);
+
+               
backingSlotPool.decreaseResourceRequirementsBy(previouslyFulfilledRequirement);
+               return previouslyFulfilledRequirement;
+       }
+
+       @Override
+       public Optional<ResourceCounter> releaseSlot(AllocationID allocationId, 
Exception cause) {
+               Optional<ResourceCounter> previouslyFulfilledRequirements = 
backingSlotPool.releaseSlot(allocationId, cause);
+               previouslyFulfilledRequirements
+                       
.ifPresent(backingSlotPool::decreaseResourceRequirementsBy);
+               return previouslyFulfilledRequirements;
+       }

Review comment:
       Same here, shouldn't `releaseSlot` keep the requirements constant?

##########
File path: 
flink-runtime/src/main/java/org/apache/flink/runtime/jobmaster/slotpool/AutoRequirementDecrementingSlotPoolWrapper.java
##########
@@ -0,0 +1,114 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.runtime.jobmaster.slotpool;
+
+import org.apache.flink.runtime.clusterframework.types.AllocationID;
+import org.apache.flink.runtime.clusterframework.types.ResourceID;
+import org.apache.flink.runtime.clusterframework.types.ResourceProfile;
+import org.apache.flink.runtime.jobmanager.slots.TaskManagerGateway;
+import org.apache.flink.runtime.jobmaster.SlotInfo;
+import org.apache.flink.runtime.slots.ResourceRequirement;
+import org.apache.flink.runtime.taskexecutor.slot.SlotOffer;
+import org.apache.flink.runtime.taskmanager.TaskManagerLocation;
+
+import javax.annotation.Nullable;
+
+import java.util.Collection;
+import java.util.Optional;
+
+/**
+ * A wrapper around a {@link DeclarativeSlotPool} that automatically 
decrements requirements if a slot is freed or
+ * released.
+ */
+class AutoRequirementDecrementingSlotPoolWrapper implements 
DeclarativeSlotPool {
+
+       private final DeclarativeSlotPool backingSlotPool;
+
+       AutoRequirementDecrementingSlotPoolWrapper(DeclarativeSlotPool 
backingSlotPool) {
+               this.backingSlotPool = backingSlotPool;
+       }
+
+       @Override
+       public void increaseResourceRequirementsBy(ResourceCounter increment) {
+               backingSlotPool.increaseResourceRequirementsBy(increment);
+       }
+
+       @Override
+       public void decreaseResourceRequirementsBy(ResourceCounter decrement) {
+               backingSlotPool.decreaseResourceRequirementsBy(decrement);
+       }
+
+       @Override
+       public Collection<ResourceRequirement> getResourceRequirements() {
+               return backingSlotPool.getResourceRequirements();
+       }
+
+       @Override
+       public Collection<SlotOffer> offerSlots(Collection<? extends SlotOffer> 
offers, TaskManagerLocation taskManagerLocation, TaskManagerGateway 
taskManagerGateway, long currentTime) {
+               return backingSlotPool.offerSlots(offers, taskManagerLocation, 
taskManagerGateway, currentTime);
+       }
+
+       @Override
+       public Collection<SlotInfoWithUtilization> getFreeSlotsInformation() {
+               return backingSlotPool.getFreeSlotsInformation();
+       }
+
+       @Override
+       public Collection<? extends SlotInfo> getAllSlotsInformation() {
+               return backingSlotPool.getAllSlotsInformation();
+       }
+
+       @Override
+       public PhysicalSlot reserveFreeSlot(AllocationID allocationId, 
ResourceProfile requiredSlotProfile) {
+               return backingSlotPool.reserveFreeSlot(allocationId, 
requiredSlotProfile);
+       }
+
+       @Override
+       public Optional<ResourceCounter> freeReservedSlot(AllocationID 
allocationId, @Nullable Throwable cause, long currentTime) {
+               Optional<ResourceCounter> previouslyFulfilledRequirement = 
backingSlotPool.freeReservedSlot(allocationId, cause, currentTime);
+               previouslyFulfilledRequirement
+                       
.ifPresent(backingSlotPool::decreaseResourceRequirementsBy);
+               return previouslyFulfilledRequirement;
+       }
+
+       @Override
+       public ResourceCounter releaseSlots(ResourceID owner, Exception cause) {
+               ResourceCounter previouslyFulfilledRequirement = 
backingSlotPool.releaseSlots(owner, cause);
+
+               
backingSlotPool.decreaseResourceRequirementsBy(previouslyFulfilledRequirement);
+               return previouslyFulfilledRequirement;
+       }

Review comment:
       Why are we decreasing the requirements if `releaseSlots` is called? I 
think this should be called if a TM's heartbeat times out.

##########
File path: 
flink-runtime/src/main/java/org/apache/flink/runtime/jobmaster/slotpool/DefaultDeclarativeSlotPool.java
##########
@@ -0,0 +1,462 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.runtime.jobmaster.slotpool;
+
+import org.apache.flink.annotation.VisibleForTesting;
+import org.apache.flink.api.common.time.Time;
+import org.apache.flink.runtime.clusterframework.types.AllocationID;
+import org.apache.flink.runtime.clusterframework.types.ResourceID;
+import org.apache.flink.runtime.clusterframework.types.ResourceProfile;
+import org.apache.flink.runtime.jobmanager.slots.TaskManagerGateway;
+import org.apache.flink.runtime.jobmaster.SlotInfo;
+import org.apache.flink.runtime.messages.Acknowledge;
+import org.apache.flink.runtime.slots.ResourceRequirement;
+import org.apache.flink.runtime.taskexecutor.slot.SlotOffer;
+import org.apache.flink.runtime.taskmanager.TaskManagerLocation;
+import org.apache.flink.util.FlinkException;
+import org.apache.flink.util.Preconditions;
+
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import javax.annotation.Nonnull;
+import javax.annotation.Nullable;
+
+import java.util.ArrayList;
+import java.util.Collection;
+import java.util.Collections;
+import java.util.HashMap;
+import java.util.Iterator;
+import java.util.Map;
+import java.util.Optional;
+import java.util.concurrent.CompletableFuture;
+import java.util.function.Consumer;
+import java.util.stream.Collectors;
+
+/**
+ * Default {@link DeclarativeSlotPool} implementation.
+ *
+ * <p>The implementation collects the current resource requirements and 
declares them
+ * at the ResourceManager. Whenever new slots are offered, the slot pool 
compares the
+ * offered slots to the set of available and required resources and only 
accepts those
+ * slots which are required.
+ *
+ * <p>Slots which are released won't be returned directly to their owners. 
Instead,
+ * the slot pool implementation will only return them after the 
idleSlotTimeout has
+ * been exceeded by a free slot.
+ *
+ * <p>The slot pool will call {@link #notifyNewSlots} whenever newly offered 
slots are
+ * accepted or if an allocated slot should become free after it is being
+ * {@link #freeReservedSlot released}.
+ */
+public class DefaultDeclarativeSlotPool implements DeclarativeSlotPool {
+
+       private static final Logger LOG = 
LoggerFactory.getLogger(DefaultDeclarativeSlotPool.class);
+
+       private final Consumer<? super Collection<ResourceRequirement>> 
notifyNewResourceRequirements;
+
+       private final Consumer<? super Collection<? extends PhysicalSlot>> 
notifyNewSlots;
+
+       private final Time idleSlotTimeout;
+       private final Time rpcTimeout;
+
+       private final AllocatedSlotPool slotPool;
+
+       private final Map<AllocationID, ResourceProfile> 
slotToRequirementProfileMappings;
+
+       private ResourceCounter totalResourceRequirements;

Review comment:
       I have to admit that I prefer it as is because of the following reasons: 
Making the `ResourceCounter` immutable simplifies its operations (e.g. adding 
or subtracting will result into a new value). Moreover, in this class it is 
correctly shown that the `totalResourceRequirements` field is mutable. If it 
were `final` then it would not be obvious whether this field represents a 
static value or can be changed.

##########
File path: 
flink-runtime/src/main/java/org/apache/flink/runtime/jobmaster/slotpool/DefaultDeclarativeSlotPool.java
##########
@@ -0,0 +1,462 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.runtime.jobmaster.slotpool;
+
+import org.apache.flink.annotation.VisibleForTesting;
+import org.apache.flink.api.common.time.Time;
+import org.apache.flink.runtime.clusterframework.types.AllocationID;
+import org.apache.flink.runtime.clusterframework.types.ResourceID;
+import org.apache.flink.runtime.clusterframework.types.ResourceProfile;
+import org.apache.flink.runtime.jobmanager.slots.TaskManagerGateway;
+import org.apache.flink.runtime.jobmaster.SlotInfo;
+import org.apache.flink.runtime.messages.Acknowledge;
+import org.apache.flink.runtime.slots.ResourceRequirement;
+import org.apache.flink.runtime.taskexecutor.slot.SlotOffer;
+import org.apache.flink.runtime.taskmanager.TaskManagerLocation;
+import org.apache.flink.util.FlinkException;
+import org.apache.flink.util.Preconditions;
+
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import javax.annotation.Nonnull;
+import javax.annotation.Nullable;
+
+import java.util.ArrayList;
+import java.util.Collection;
+import java.util.Collections;
+import java.util.HashMap;
+import java.util.Iterator;
+import java.util.Map;
+import java.util.Optional;
+import java.util.concurrent.CompletableFuture;
+import java.util.function.Consumer;
+import java.util.stream.Collectors;
+
+/**
+ * Default {@link DeclarativeSlotPool} implementation.
+ *
+ * <p>The implementation collects the current resource requirements and 
declares them
+ * at the ResourceManager. Whenever new slots are offered, the slot pool 
compares the
+ * offered slots to the set of available and required resources and only 
accepts those
+ * slots which are required.
+ *
+ * <p>Slots which are released won't be returned directly to their owners. 
Instead,
+ * the slot pool implementation will only return them after the 
idleSlotTimeout has
+ * been exceeded by a free slot.
+ *
+ * <p>The slot pool will call {@link #notifyNewSlots} whenever newly offered 
slots are
+ * accepted or if an allocated slot should become free after it is being
+ * {@link #freeReservedSlot released}.
+ */
+public class DefaultDeclarativeSlotPool implements DeclarativeSlotPool {
+
+       private static final Logger LOG = 
LoggerFactory.getLogger(DefaultDeclarativeSlotPool.class);
+
+       private final Consumer<? super Collection<ResourceRequirement>> 
notifyNewResourceRequirements;
+
+       private final Consumer<? super Collection<? extends PhysicalSlot>> 
notifyNewSlots;
+
+       private final Time idleSlotTimeout;
+       private final Time rpcTimeout;
+
+       private final AllocatedSlotPool slotPool;
+
+       private final Map<AllocationID, ResourceProfile> 
slotToRequirementProfileMappings;
+
+       private ResourceCounter totalResourceRequirements;
+
+       private ResourceCounter fulfilledResourceRequirements;
+
+       public DefaultDeclarativeSlotPool(
+               AllocatedSlotPool slotPool,
+               Consumer<? super Collection<ResourceRequirement>> 
notifyNewResourceRequirements,
+               Consumer<? super Collection<? extends PhysicalSlot>> 
notifyNewSlots,
+               Time idleSlotTimeout,
+               Time rpcTimeout) {
+
+               this.slotPool = slotPool;
+               this.notifyNewResourceRequirements = 
notifyNewResourceRequirements;
+               this.notifyNewSlots = notifyNewSlots;
+               this.idleSlotTimeout = idleSlotTimeout;
+               this.rpcTimeout = rpcTimeout;
+               this.totalResourceRequirements = ResourceCounter.empty();
+               this.fulfilledResourceRequirements = ResourceCounter.empty();
+               this.slotToRequirementProfileMappings = new HashMap<>();
+       }
+
+       @Override
+       public void increaseResourceRequirementsBy(ResourceCounter increment) {
+               totalResourceRequirements = 
totalResourceRequirements.add(increment);
+
+               declareResourceRequirements();
+       }
+
+       @Override
+       public void decreaseResourceRequirementsBy(ResourceCounter decrement) {
+               totalResourceRequirements = 
totalResourceRequirements.subtract(decrement);
+
+               declareResourceRequirements();
+       }
+
+       private void declareResourceRequirements() {
+               notifyNewResourceRequirements.accept(getResourceRequirements());
+       }
+
+       @Override
+       public Collection<ResourceRequirement> getResourceRequirements() {
+               final Collection<ResourceRequirement> 
currentResourceRequirements = new ArrayList<>();
+
+               for (Map.Entry<ResourceProfile, Integer> resourceRequirement : 
totalResourceRequirements.getResourcesWithCount()) {
+                       
currentResourceRequirements.add(ResourceRequirement.create(resourceRequirement.getKey(),
 resourceRequirement.getValue()));
+               }
+
+               return currentResourceRequirements;
+       }
+
+       @Override
+       public Collection<SlotOffer> offerSlots(
+               Collection<? extends SlotOffer> offers,
+               TaskManagerLocation taskManagerLocation,
+               TaskManagerGateway taskManagerGateway,
+               long currentTime) {
+
+               LOG.debug("Received {} slot offers from TaskExecutor {}.", 
offers.size(), taskManagerLocation);
+               final Collection<SlotOffer> acceptedSlotOffers = new 
ArrayList<>();
+               final Collection<SlotOffer> candidates = new ArrayList<>();
+
+               // filter out already accepted offers
+               for (SlotOffer offer : offers) {
+                       final AllocationID allocationId = 
offer.getAllocationId();
+                       if (slotPool.containsSlot(allocationId)) {
+                               acceptedSlotOffers.add(offer);
+                       } else {
+                               candidates.add(offer);
+                       }
+               }
+
+               final Collection<SlotOfferMatching> matchings = 
matchOffersWithOutstandingRequirements(candidates);
+
+               final Collection<AllocatedSlot> acceptedSlots = new 
ArrayList<>();
+               ResourceCounter acceptedResources = ResourceCounter.empty();
+
+               for (SlotOfferMatching matching : matchings) {
+                       if (matching.getMatching().isPresent()) {
+                               final ResourceProfile matchedResourceProfile = 
matching.getMatching().get();
+
+                               final AllocatedSlot allocatedSlot = 
createAllocatedSlot(
+                                       matching.getSlotOffer(),
+                                       taskManagerLocation,
+                                       taskManagerGateway);
+
+                               acceptedSlots.add(allocatedSlot);
+                               acceptedSlotOffers.add(matching.getSlotOffer());
+
+                               acceptedResources = 
acceptedResources.add(matchedResourceProfile, 1);
+
+                               // store the ResourceProfile against which the 
given slot has matched for future book-keeping
+                               
slotToRequirementProfileMappings.put(allocatedSlot.getAllocationId(), 
matchedResourceProfile);
+                       }
+               }
+
+               slotPool.addSlots(acceptedSlots, currentTime);
+               increaseAvailableResources(acceptedResources);
+
+               if (!acceptedSlots.isEmpty()) {
+                       notifyNewSlots.accept(acceptedSlots);
+               }
+
+               return acceptedSlotOffers;
+       }
+
+       private Collection<SlotOfferMatching> 
matchOffersWithOutstandingRequirements(Collection<SlotOffer> slotOffers) {
+               ResourceCounter unfulfilledResources = 
calculateUnfulfilledResources();
+
+               final Collection<SlotOfferMatching> matching = new 
ArrayList<>();
+
+               for (SlotOffer slotOffer : slotOffers) {
+                       ResourceProfile matchingResourceProfile = null;
+
+                       if 
(unfulfilledResources.containsResource(slotOffer.getResourceProfile())) {
+                               unfulfilledResources = 
unfulfilledResources.subtract(slotOffer.getResourceProfile(), 1);
+
+                               // use the profile of the slot offer since it 
is a direct match against the requirement
+                               matchingResourceProfile = 
slotOffer.getResourceProfile();
+                       } else {
+                               for (ResourceProfile unfulfilledResource : 
unfulfilledResources.getResources()) {
+                                       if 
(slotOffer.getResourceProfile().isMatching(unfulfilledResource)) {
+                                               matchingResourceProfile = 
unfulfilledResource;
+                                               break;
+                                       }
+                               }
+                       }
+
+                       final SlotOfferMatching slotOfferMatching;
+                       if (matchingResourceProfile != null) {
+                               slotOfferMatching = 
SlotOfferMatching.createMatching(slotOffer, matchingResourceProfile);
+                       } else {
+                               slotOfferMatching = 
SlotOfferMatching.createMismatch(slotOffer);
+                       }
+
+                       matching.add(slotOfferMatching);
+               }
+
+               return matching;
+       }
+
+       @VisibleForTesting
+       ResourceCounter calculateUnfulfilledResources() {
+               return 
totalResourceRequirements.subtract(fulfilledResourceRequirements);
+       }
+
+       private AllocatedSlot createAllocatedSlot(
+               SlotOffer slotOffer,
+               TaskManagerLocation taskManagerLocation,
+               TaskManagerGateway taskManagerGateway) {
+               return new AllocatedSlot(
+                       slotOffer.getAllocationId(),
+                       taskManagerLocation,
+                       slotOffer.getSlotIndex(),
+                       slotOffer.getResourceProfile(),
+                       taskManagerGateway);
+       }
+
+       private void increaseAvailableResources(ResourceCounter 
acceptedResources) {
+               fulfilledResourceRequirements = 
fulfilledResourceRequirements.add(acceptedResources);
+       }
+
+       @Nonnull
+       private ResourceProfile getMatchingResourceProfile(AllocationID 
allocationId) {
+               return 
Preconditions.checkNotNull(slotToRequirementProfileMappings.get(allocationId), 
"No matching resource profile found for %s", allocationId);
+       }
+
+       @Override
+       public PhysicalSlot reserveFreeSlot(AllocationID allocationId, 
ResourceProfile requiredSlotProfile) {
+               final AllocatedSlot allocatedSlot = 
slotPool.reserveFreeSlot(allocationId);
+
+               
Preconditions.checkState(allocatedSlot.getResourceProfile().isMatching(requiredSlotProfile),
 "");
+
+               ResourceProfile previouslyMatchedResourceProfile = 
Preconditions.checkNotNull(slotToRequirementProfileMappings.get(allocationId));
+
+               updateSlotToRequirementProfileMapping(allocationId, 
requiredSlotProfile);

Review comment:
       I really like to rethink whether we need this mapping in this class or 
whether it could live in the wrapper. It somehow feels wrong that the 
`DefaultDeclarativeSlotPool` needs to track for which `ResourceProfile` a given 
slot has been reserved.




----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
[email protected]


Reply via email to