tillrohrmann commented on a change in pull request #13722:
URL: https://github.com/apache/flink/pull/13722#discussion_r524345947



##########
File path: 
flink-runtime/src/main/java/org/apache/flink/runtime/jobmaster/slotpool/DefaultDeclarativeSlotPool.java
##########
@@ -0,0 +1,428 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.runtime.jobmaster.slotpool;
+
+import org.apache.flink.annotation.VisibleForTesting;
+import org.apache.flink.api.common.time.Time;
+import org.apache.flink.runtime.clusterframework.types.AllocationID;
+import org.apache.flink.runtime.clusterframework.types.ResourceID;
+import org.apache.flink.runtime.clusterframework.types.ResourceProfile;
+import org.apache.flink.runtime.jobmanager.slots.TaskManagerGateway;
+import org.apache.flink.runtime.jobmaster.SlotInfo;
+import org.apache.flink.runtime.messages.Acknowledge;
+import org.apache.flink.runtime.slots.DefaultRequirementMatcher;
+import org.apache.flink.runtime.slots.RequirementMatcher;
+import org.apache.flink.runtime.slots.ResourceRequirement;
+import org.apache.flink.runtime.taskexecutor.slot.SlotOffer;
+import org.apache.flink.runtime.taskmanager.TaskManagerLocation;
+import org.apache.flink.util.FlinkException;
+import org.apache.flink.util.Preconditions;
+
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import javax.annotation.Nonnull;
+import javax.annotation.Nullable;
+
+import java.util.ArrayList;
+import java.util.Collection;
+import java.util.Collections;
+import java.util.HashMap;
+import java.util.Iterator;
+import java.util.Map;
+import java.util.Optional;
+import java.util.concurrent.CompletableFuture;
+import java.util.function.Consumer;
+import java.util.stream.Collectors;
+
+/**
+ * Default {@link DeclarativeSlotPool} implementation.
+ *
+ * <p>The implementation collects the current resource requirements and 
declares them
+ * at the ResourceManager. Whenever new slots are offered, the slot pool 
compares the
+ * offered slots to the set of available and required resources and only 
accepts those
+ * slots which are required.
+ *
+ * <p>Slots which are released won't be returned directly to their owners. 
Instead,
+ * the slot pool implementation will only return them after the 
idleSlotTimeout has
+ * been exceeded by a free slot.
+ *
+ * <p>The slot pool will call {@link #notifyNewSlots} whenever newly offered 
slots are
+ * accepted or if an allocated slot should become free after it is being
+ * {@link #freeReservedSlot released}.

Review comment:
       ```suggestion
    * {@link #freeReservedSlot freed}.
   ```
   
   bc release is used in the API and means that we remove the slot from the 
pool.

##########
File path: 
flink-runtime/src/main/java/org/apache/flink/runtime/jobmaster/slotpool/DefaultDeclarativeSlotPool.java
##########
@@ -0,0 +1,428 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.runtime.jobmaster.slotpool;
+
+import org.apache.flink.annotation.VisibleForTesting;
+import org.apache.flink.api.common.time.Time;
+import org.apache.flink.runtime.clusterframework.types.AllocationID;
+import org.apache.flink.runtime.clusterframework.types.ResourceID;
+import org.apache.flink.runtime.clusterframework.types.ResourceProfile;
+import org.apache.flink.runtime.jobmanager.slots.TaskManagerGateway;
+import org.apache.flink.runtime.jobmaster.SlotInfo;
+import org.apache.flink.runtime.messages.Acknowledge;
+import org.apache.flink.runtime.slots.DefaultRequirementMatcher;
+import org.apache.flink.runtime.slots.RequirementMatcher;
+import org.apache.flink.runtime.slots.ResourceRequirement;
+import org.apache.flink.runtime.taskexecutor.slot.SlotOffer;
+import org.apache.flink.runtime.taskmanager.TaskManagerLocation;
+import org.apache.flink.util.FlinkException;
+import org.apache.flink.util.Preconditions;
+
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import javax.annotation.Nonnull;
+import javax.annotation.Nullable;
+
+import java.util.ArrayList;
+import java.util.Collection;
+import java.util.Collections;
+import java.util.HashMap;
+import java.util.Iterator;
+import java.util.Map;
+import java.util.Optional;
+import java.util.concurrent.CompletableFuture;
+import java.util.function.Consumer;
+import java.util.stream.Collectors;
+
+/**
+ * Default {@link DeclarativeSlotPool} implementation.
+ *
+ * <p>The implementation collects the current resource requirements and 
declares them
+ * at the ResourceManager. Whenever new slots are offered, the slot pool 
compares the
+ * offered slots to the set of available and required resources and only 
accepts those
+ * slots which are required.
+ *
+ * <p>Slots which are released won't be returned directly to their owners. 
Instead,
+ * the slot pool implementation will only return them after the 
idleSlotTimeout has
+ * been exceeded by a free slot.
+ *
+ * <p>The slot pool will call {@link #notifyNewSlots} whenever newly offered 
slots are
+ * accepted or if an allocated slot should become free after it is being
+ * {@link #freeReservedSlot released}.
+ */
+public class DefaultDeclarativeSlotPool implements DeclarativeSlotPool {
+
+       private static final Logger LOG = 
LoggerFactory.getLogger(DefaultDeclarativeSlotPool.class);
+
+       private final Consumer<? super Collection<ResourceRequirement>> 
notifyNewResourceRequirements;
+
+       private final Consumer<? super Collection<? extends PhysicalSlot>> 
notifyNewSlots;
+
+       private final Time idleSlotTimeout;
+       private final Time rpcTimeout;
+
+       private final AllocatedSlotPool slotPool;
+
+       private final Map<AllocationID, ResourceProfile> 
slotToRequirementProfileMappings;
+
+       private ResourceCounter totalResourceRequirements;
+
+       private ResourceCounter fulfilledResourceRequirements;
+
+       private final RequirementMatcher requirementMatcher = new 
DefaultRequirementMatcher();
+
+       public DefaultDeclarativeSlotPool(
+               AllocatedSlotPool slotPool,
+               Consumer<? super Collection<ResourceRequirement>> 
notifyNewResourceRequirements,
+               Consumer<? super Collection<? extends PhysicalSlot>> 
notifyNewSlots,
+               Time idleSlotTimeout,
+               Time rpcTimeout) {
+
+               this.slotPool = slotPool;
+               this.notifyNewResourceRequirements = 
notifyNewResourceRequirements;
+               this.notifyNewSlots = notifyNewSlots;
+               this.idleSlotTimeout = idleSlotTimeout;
+               this.rpcTimeout = rpcTimeout;
+               this.totalResourceRequirements = ResourceCounter.empty();
+               this.fulfilledResourceRequirements = ResourceCounter.empty();
+               this.slotToRequirementProfileMappings = new HashMap<>();
+       }
+
+       @Override
+       public void increaseResourceRequirementsBy(ResourceCounter increment) {
+               totalResourceRequirements = 
totalResourceRequirements.add(increment);
+
+               declareResourceRequirements();
+       }
+
+       @Override
+       public void decreaseResourceRequirementsBy(ResourceCounter decrement) {
+               totalResourceRequirements = 
totalResourceRequirements.subtract(decrement);
+
+               declareResourceRequirements();
+       }
+
+       private void declareResourceRequirements() {
+               notifyNewResourceRequirements.accept(getResourceRequirements());
+       }
+
+       @Override
+       public Collection<ResourceRequirement> getResourceRequirements() {
+               final Collection<ResourceRequirement> 
currentResourceRequirements = new ArrayList<>();
+
+               for (Map.Entry<ResourceProfile, Integer> resourceRequirement : 
totalResourceRequirements.getResourcesWithCount()) {
+                       
currentResourceRequirements.add(ResourceRequirement.create(resourceRequirement.getKey(),
 resourceRequirement.getValue()));
+               }
+
+               return currentResourceRequirements;
+       }
+
+       @Override
+       public Collection<SlotOffer> offerSlots(
+               Collection<? extends SlotOffer> offers,
+               TaskManagerLocation taskManagerLocation,
+               TaskManagerGateway taskManagerGateway,
+               long currentTime) {
+
+               LOG.debug("Received {} slot offers from TaskExecutor {}.", 
offers.size(), taskManagerLocation);
+               final Collection<SlotOffer> acceptedSlotOffers = new 
ArrayList<>();
+               final Collection<AllocatedSlot> acceptedSlots = new 
ArrayList<>();
+
+               for (SlotOffer offer : offers) {
+                       if (slotPool.containsSlot(offer.getAllocationId())) {
+                               // we have already accepted this offer
+                               acceptedSlotOffers.add(offer);
+                       } else {
+                               Optional<AllocatedSlot> acceptedSlot = 
matchOfferWithOutstandingRequirements(offer, taskManagerLocation, 
taskManagerGateway);
+                               if (acceptedSlot.isPresent()) {
+                                       acceptedSlotOffers.add(offer);

Review comment:
       We could log on debug which offers are matching with which requirements.

##########
File path: 
flink-runtime/src/main/java/org/apache/flink/runtime/jobmaster/slotpool/DefaultDeclarativeSlotPool.java
##########
@@ -0,0 +1,428 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.runtime.jobmaster.slotpool;
+
+import org.apache.flink.annotation.VisibleForTesting;
+import org.apache.flink.api.common.time.Time;
+import org.apache.flink.runtime.clusterframework.types.AllocationID;
+import org.apache.flink.runtime.clusterframework.types.ResourceID;
+import org.apache.flink.runtime.clusterframework.types.ResourceProfile;
+import org.apache.flink.runtime.jobmanager.slots.TaskManagerGateway;
+import org.apache.flink.runtime.jobmaster.SlotInfo;
+import org.apache.flink.runtime.messages.Acknowledge;
+import org.apache.flink.runtime.slots.DefaultRequirementMatcher;
+import org.apache.flink.runtime.slots.RequirementMatcher;
+import org.apache.flink.runtime.slots.ResourceRequirement;
+import org.apache.flink.runtime.taskexecutor.slot.SlotOffer;
+import org.apache.flink.runtime.taskmanager.TaskManagerLocation;
+import org.apache.flink.util.FlinkException;
+import org.apache.flink.util.Preconditions;
+
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import javax.annotation.Nonnull;
+import javax.annotation.Nullable;
+
+import java.util.ArrayList;
+import java.util.Collection;
+import java.util.Collections;
+import java.util.HashMap;
+import java.util.Iterator;
+import java.util.Map;
+import java.util.Optional;
+import java.util.concurrent.CompletableFuture;
+import java.util.function.Consumer;
+import java.util.stream.Collectors;
+
+/**
+ * Default {@link DeclarativeSlotPool} implementation.
+ *
+ * <p>The implementation collects the current resource requirements and 
declares them
+ * at the ResourceManager. Whenever new slots are offered, the slot pool 
compares the
+ * offered slots to the set of available and required resources and only 
accepts those
+ * slots which are required.
+ *
+ * <p>Slots which are released won't be returned directly to their owners. 
Instead,
+ * the slot pool implementation will only return them after the 
idleSlotTimeout has
+ * been exceeded by a free slot.
+ *
+ * <p>The slot pool will call {@link #notifyNewSlots} whenever newly offered 
slots are
+ * accepted or if an allocated slot should become free after it is being
+ * {@link #freeReservedSlot released}.
+ */
+public class DefaultDeclarativeSlotPool implements DeclarativeSlotPool {
+
+       private static final Logger LOG = 
LoggerFactory.getLogger(DefaultDeclarativeSlotPool.class);
+
+       private final Consumer<? super Collection<ResourceRequirement>> 
notifyNewResourceRequirements;
+
+       private final Consumer<? super Collection<? extends PhysicalSlot>> 
notifyNewSlots;
+
+       private final Time idleSlotTimeout;
+       private final Time rpcTimeout;
+
+       private final AllocatedSlotPool slotPool;
+
+       private final Map<AllocationID, ResourceProfile> 
slotToRequirementProfileMappings;
+
+       private ResourceCounter totalResourceRequirements;
+
+       private ResourceCounter fulfilledResourceRequirements;
+
+       private final RequirementMatcher requirementMatcher = new 
DefaultRequirementMatcher();
+
+       public DefaultDeclarativeSlotPool(
+               AllocatedSlotPool slotPool,
+               Consumer<? super Collection<ResourceRequirement>> 
notifyNewResourceRequirements,
+               Consumer<? super Collection<? extends PhysicalSlot>> 
notifyNewSlots,
+               Time idleSlotTimeout,
+               Time rpcTimeout) {
+
+               this.slotPool = slotPool;
+               this.notifyNewResourceRequirements = 
notifyNewResourceRequirements;
+               this.notifyNewSlots = notifyNewSlots;
+               this.idleSlotTimeout = idleSlotTimeout;
+               this.rpcTimeout = rpcTimeout;
+               this.totalResourceRequirements = ResourceCounter.empty();
+               this.fulfilledResourceRequirements = ResourceCounter.empty();
+               this.slotToRequirementProfileMappings = new HashMap<>();
+       }
+
+       @Override
+       public void increaseResourceRequirementsBy(ResourceCounter increment) {
+               totalResourceRequirements = 
totalResourceRequirements.add(increment);
+
+               declareResourceRequirements();
+       }
+
+       @Override
+       public void decreaseResourceRequirementsBy(ResourceCounter decrement) {
+               totalResourceRequirements = 
totalResourceRequirements.subtract(decrement);
+
+               declareResourceRequirements();
+       }
+
+       private void declareResourceRequirements() {
+               notifyNewResourceRequirements.accept(getResourceRequirements());
+       }
+
+       @Override
+       public Collection<ResourceRequirement> getResourceRequirements() {
+               final Collection<ResourceRequirement> 
currentResourceRequirements = new ArrayList<>();
+
+               for (Map.Entry<ResourceProfile, Integer> resourceRequirement : 
totalResourceRequirements.getResourcesWithCount()) {
+                       
currentResourceRequirements.add(ResourceRequirement.create(resourceRequirement.getKey(),
 resourceRequirement.getValue()));
+               }
+
+               return currentResourceRequirements;
+       }
+
+       @Override
+       public Collection<SlotOffer> offerSlots(
+               Collection<? extends SlotOffer> offers,
+               TaskManagerLocation taskManagerLocation,
+               TaskManagerGateway taskManagerGateway,
+               long currentTime) {
+
+               LOG.debug("Received {} slot offers from TaskExecutor {}.", 
offers.size(), taskManagerLocation);
+               final Collection<SlotOffer> acceptedSlotOffers = new 
ArrayList<>();
+               final Collection<AllocatedSlot> acceptedSlots = new 
ArrayList<>();
+
+               for (SlotOffer offer : offers) {
+                       if (slotPool.containsSlot(offer.getAllocationId())) {
+                               // we have already accepted this offer
+                               acceptedSlotOffers.add(offer);
+                       } else {
+                               Optional<AllocatedSlot> acceptedSlot = 
matchOfferWithOutstandingRequirements(offer, taskManagerLocation, 
taskManagerGateway);
+                               if (acceptedSlot.isPresent()) {
+                                       acceptedSlotOffers.add(offer);
+                                       acceptedSlots.add(acceptedSlot.get());
+                               }
+                       }
+               }
+
+               slotPool.addSlots(acceptedSlots, currentTime);
+
+               if (!acceptedSlots.isEmpty()) {
+                       notifyNewSlots.accept(acceptedSlots);
+               }
+
+               return acceptedSlotOffers;
+       }
+
+       private Optional<AllocatedSlot> matchOfferWithOutstandingRequirements(
+                       SlotOffer slotOffer,
+                       TaskManagerLocation taskManagerLocation,
+                       TaskManagerGateway taskManagerGateway) {
+
+               final Optional<ResourceProfile> match = 
requirementMatcher.match(
+                               slotOffer.getResourceProfile(),
+                               
totalResourceRequirements.getResourcesWithCount(),
+                               
fulfilledResourceRequirements::getResourceCount);
+
+               if (match.isPresent()) {
+                       
increaseAvailableResources(ResourceCounter.withResource(match.get(), 1));
+
+                       final AllocatedSlot allocatedSlot = createAllocatedSlot(
+                                       slotOffer,
+                                       taskManagerLocation,
+                                       taskManagerGateway);
+
+                       // store the ResourceProfile against which the given 
slot has matched for future book-keeping
+                       
slotToRequirementProfileMappings.put(allocatedSlot.getAllocationId(), 
match.get());
+
+                       return Optional.of(allocatedSlot);
+               }
+               return Optional.empty();
+       }
+
+       @VisibleForTesting
+       ResourceCounter calculateUnfulfilledResources() {
+               return 
totalResourceRequirements.subtract(fulfilledResourceRequirements);
+       }
+
+       private AllocatedSlot createAllocatedSlot(
+               SlotOffer slotOffer,
+               TaskManagerLocation taskManagerLocation,
+               TaskManagerGateway taskManagerGateway) {
+               return new AllocatedSlot(
+                       slotOffer.getAllocationId(),
+                       taskManagerLocation,
+                       slotOffer.getSlotIndex(),
+                       slotOffer.getResourceProfile(),
+                       taskManagerGateway);
+       }
+
+       private void increaseAvailableResources(ResourceCounter 
acceptedResources) {
+               fulfilledResourceRequirements = 
fulfilledResourceRequirements.add(acceptedResources);
+       }
+
+       @Nonnull
+       private ResourceProfile getMatchingResourceProfile(AllocationID 
allocationId) {
+               return 
Preconditions.checkNotNull(slotToRequirementProfileMappings.get(allocationId), 
"No matching resource profile found for %s", allocationId);
+       }
+
+       @Override
+       public PhysicalSlot reserveFreeSlot(AllocationID allocationId, 
ResourceProfile requiredSlotProfile) {
+               final AllocatedSlot allocatedSlot = 
slotPool.reserveFreeSlot(allocationId);
+
+               
Preconditions.checkState(allocatedSlot.getResourceProfile().isMatching(requiredSlotProfile),
 "");
+
+               ResourceProfile previouslyMatchedResourceProfile = 
Preconditions.checkNotNull(slotToRequirementProfileMappings.get(allocationId));
+
+               if 
(!previouslyMatchedResourceProfile.equals(requiredSlotProfile)) {
+                       // slots can be reserved for a requirement that is not 
in line with the mapping we computed when the slot was
+                       // offered, so we have to update the mapping adjust the 
requirements accordingly to ensure we still request enough slots to
+                       // be able to fulfill the total requirements

Review comment:
       Add debug logging which tells about the requirements adjustment.

##########
File path: 
flink-runtime/src/main/java/org/apache/flink/runtime/jobmaster/slotpool/DefaultDeclarativeSlotPool.java
##########
@@ -0,0 +1,428 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.runtime.jobmaster.slotpool;
+
+import org.apache.flink.annotation.VisibleForTesting;
+import org.apache.flink.api.common.time.Time;
+import org.apache.flink.runtime.clusterframework.types.AllocationID;
+import org.apache.flink.runtime.clusterframework.types.ResourceID;
+import org.apache.flink.runtime.clusterframework.types.ResourceProfile;
+import org.apache.flink.runtime.jobmanager.slots.TaskManagerGateway;
+import org.apache.flink.runtime.jobmaster.SlotInfo;
+import org.apache.flink.runtime.messages.Acknowledge;
+import org.apache.flink.runtime.slots.DefaultRequirementMatcher;
+import org.apache.flink.runtime.slots.RequirementMatcher;
+import org.apache.flink.runtime.slots.ResourceRequirement;
+import org.apache.flink.runtime.taskexecutor.slot.SlotOffer;
+import org.apache.flink.runtime.taskmanager.TaskManagerLocation;
+import org.apache.flink.util.FlinkException;
+import org.apache.flink.util.Preconditions;
+
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import javax.annotation.Nonnull;
+import javax.annotation.Nullable;
+
+import java.util.ArrayList;
+import java.util.Collection;
+import java.util.Collections;
+import java.util.HashMap;
+import java.util.Iterator;
+import java.util.Map;
+import java.util.Optional;
+import java.util.concurrent.CompletableFuture;
+import java.util.function.Consumer;
+import java.util.stream.Collectors;
+
+/**
+ * Default {@link DeclarativeSlotPool} implementation.
+ *
+ * <p>The implementation collects the current resource requirements and 
declares them
+ * at the ResourceManager. Whenever new slots are offered, the slot pool 
compares the
+ * offered slots to the set of available and required resources and only 
accepts those
+ * slots which are required.
+ *
+ * <p>Slots which are released won't be returned directly to their owners. 
Instead,
+ * the slot pool implementation will only return them after the 
idleSlotTimeout has
+ * been exceeded by a free slot.
+ *
+ * <p>The slot pool will call {@link #notifyNewSlots} whenever newly offered 
slots are
+ * accepted or if an allocated slot should become free after it is being
+ * {@link #freeReservedSlot released}.
+ */
+public class DefaultDeclarativeSlotPool implements DeclarativeSlotPool {
+
+       private static final Logger LOG = 
LoggerFactory.getLogger(DefaultDeclarativeSlotPool.class);
+
+       private final Consumer<? super Collection<ResourceRequirement>> 
notifyNewResourceRequirements;
+
+       private final Consumer<? super Collection<? extends PhysicalSlot>> 
notifyNewSlots;
+
+       private final Time idleSlotTimeout;
+       private final Time rpcTimeout;
+
+       private final AllocatedSlotPool slotPool;
+
+       private final Map<AllocationID, ResourceProfile> 
slotToRequirementProfileMappings;
+
+       private ResourceCounter totalResourceRequirements;
+
+       private ResourceCounter fulfilledResourceRequirements;
+
+       private final RequirementMatcher requirementMatcher = new 
DefaultRequirementMatcher();
+
+       public DefaultDeclarativeSlotPool(
+               AllocatedSlotPool slotPool,
+               Consumer<? super Collection<ResourceRequirement>> 
notifyNewResourceRequirements,
+               Consumer<? super Collection<? extends PhysicalSlot>> 
notifyNewSlots,
+               Time idleSlotTimeout,
+               Time rpcTimeout) {
+
+               this.slotPool = slotPool;
+               this.notifyNewResourceRequirements = 
notifyNewResourceRequirements;
+               this.notifyNewSlots = notifyNewSlots;
+               this.idleSlotTimeout = idleSlotTimeout;
+               this.rpcTimeout = rpcTimeout;
+               this.totalResourceRequirements = ResourceCounter.empty();
+               this.fulfilledResourceRequirements = ResourceCounter.empty();
+               this.slotToRequirementProfileMappings = new HashMap<>();
+       }
+
+       @Override
+       public void increaseResourceRequirementsBy(ResourceCounter increment) {
+               totalResourceRequirements = 
totalResourceRequirements.add(increment);
+
+               declareResourceRequirements();
+       }
+
+       @Override
+       public void decreaseResourceRequirementsBy(ResourceCounter decrement) {
+               totalResourceRequirements = 
totalResourceRequirements.subtract(decrement);
+
+               declareResourceRequirements();
+       }
+
+       private void declareResourceRequirements() {
+               notifyNewResourceRequirements.accept(getResourceRequirements());
+       }
+
+       @Override
+       public Collection<ResourceRequirement> getResourceRequirements() {
+               final Collection<ResourceRequirement> 
currentResourceRequirements = new ArrayList<>();
+
+               for (Map.Entry<ResourceProfile, Integer> resourceRequirement : 
totalResourceRequirements.getResourcesWithCount()) {
+                       
currentResourceRequirements.add(ResourceRequirement.create(resourceRequirement.getKey(),
 resourceRequirement.getValue()));
+               }
+
+               return currentResourceRequirements;
+       }
+
+       @Override
+       public Collection<SlotOffer> offerSlots(
+               Collection<? extends SlotOffer> offers,
+               TaskManagerLocation taskManagerLocation,
+               TaskManagerGateway taskManagerGateway,
+               long currentTime) {
+
+               LOG.debug("Received {} slot offers from TaskExecutor {}.", 
offers.size(), taskManagerLocation);
+               final Collection<SlotOffer> acceptedSlotOffers = new 
ArrayList<>();
+               final Collection<AllocatedSlot> acceptedSlots = new 
ArrayList<>();
+
+               for (SlotOffer offer : offers) {
+                       if (slotPool.containsSlot(offer.getAllocationId())) {
+                               // we have already accepted this offer
+                               acceptedSlotOffers.add(offer);
+                       } else {
+                               Optional<AllocatedSlot> acceptedSlot = 
matchOfferWithOutstandingRequirements(offer, taskManagerLocation, 
taskManagerGateway);
+                               if (acceptedSlot.isPresent()) {
+                                       acceptedSlotOffers.add(offer);
+                                       acceptedSlots.add(acceptedSlot.get());
+                               }
+                       }
+               }
+
+               slotPool.addSlots(acceptedSlots, currentTime);
+
+               if (!acceptedSlots.isEmpty()) {
+                       notifyNewSlots.accept(acceptedSlots);
+               }
+
+               return acceptedSlotOffers;
+       }
+
+       private Optional<AllocatedSlot> matchOfferWithOutstandingRequirements(
+                       SlotOffer slotOffer,
+                       TaskManagerLocation taskManagerLocation,
+                       TaskManagerGateway taskManagerGateway) {
+
+               final Optional<ResourceProfile> match = 
requirementMatcher.match(
+                               slotOffer.getResourceProfile(),
+                               
totalResourceRequirements.getResourcesWithCount(),
+                               
fulfilledResourceRequirements::getResourceCount);
+
+               if (match.isPresent()) {
+                       
increaseAvailableResources(ResourceCounter.withResource(match.get(), 1));
+
+                       final AllocatedSlot allocatedSlot = createAllocatedSlot(
+                                       slotOffer,
+                                       taskManagerLocation,
+                                       taskManagerGateway);
+
+                       // store the ResourceProfile against which the given 
slot has matched for future book-keeping
+                       
slotToRequirementProfileMappings.put(allocatedSlot.getAllocationId(), 
match.get());
+
+                       return Optional.of(allocatedSlot);
+               }
+               return Optional.empty();
+       }
+
+       @VisibleForTesting
+       ResourceCounter calculateUnfulfilledResources() {
+               return 
totalResourceRequirements.subtract(fulfilledResourceRequirements);
+       }
+
+       private AllocatedSlot createAllocatedSlot(
+               SlotOffer slotOffer,
+               TaskManagerLocation taskManagerLocation,
+               TaskManagerGateway taskManagerGateway) {
+               return new AllocatedSlot(
+                       slotOffer.getAllocationId(),
+                       taskManagerLocation,
+                       slotOffer.getSlotIndex(),
+                       slotOffer.getResourceProfile(),
+                       taskManagerGateway);
+       }
+
+       private void increaseAvailableResources(ResourceCounter 
acceptedResources) {
+               fulfilledResourceRequirements = 
fulfilledResourceRequirements.add(acceptedResources);
+       }
+
+       @Nonnull
+       private ResourceProfile getMatchingResourceProfile(AllocationID 
allocationId) {
+               return 
Preconditions.checkNotNull(slotToRequirementProfileMappings.get(allocationId), 
"No matching resource profile found for %s", allocationId);
+       }
+
+       @Override
+       public PhysicalSlot reserveFreeSlot(AllocationID allocationId, 
ResourceProfile requiredSlotProfile) {
+               final AllocatedSlot allocatedSlot = 
slotPool.reserveFreeSlot(allocationId);
+
+               
Preconditions.checkState(allocatedSlot.getResourceProfile().isMatching(requiredSlotProfile),
 "");
+
+               ResourceProfile previouslyMatchedResourceProfile = 
Preconditions.checkNotNull(slotToRequirementProfileMappings.get(allocationId));
+
+               if 
(!previouslyMatchedResourceProfile.equals(requiredSlotProfile)) {
+                       // slots can be reserved for a requirement that is not 
in line with the mapping we computed when the slot was
+                       // offered, so we have to update the mapping adjust the 
requirements accordingly to ensure we still request enough slots to
+                       // be able to fulfill the total requirements
+                       updateSlotToRequirementProfileMapping(allocationId, 
requiredSlotProfile);
+                       adjustRequirements(previouslyMatchedResourceProfile, 
requiredSlotProfile);
+               }
+
+               return allocatedSlot;
+       }
+
+       @Override
+       public ResourceCounter freeReservedSlot(AllocationID allocationId, 
@Nullable Throwable cause, long currentTime) {
+               LOG.debug("Release slot {}.", allocationId);

Review comment:
       Let's be consistent with the terms and use `Free reserved slot {}.` here.

##########
File path: 
flink-runtime/src/test/java/org/apache/flink/runtime/jobmaster/slotpool/DefaultDeclarativeSlotPoolTest.java
##########
@@ -0,0 +1,550 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.runtime.jobmaster.slotpool;
+
+import org.apache.flink.api.common.time.Time;
+import org.apache.flink.runtime.clusterframework.types.AllocationID;
+import org.apache.flink.runtime.clusterframework.types.ResourceProfile;
+import org.apache.flink.runtime.jobmanager.slots.TaskManagerGateway;
+import org.apache.flink.runtime.jobmaster.JobMasterId;
+import org.apache.flink.runtime.jobmaster.RpcTaskManagerGateway;
+import org.apache.flink.runtime.messages.Acknowledge;
+import org.apache.flink.runtime.slots.ResourceRequirement;
+import org.apache.flink.runtime.taskexecutor.TaskExecutorGateway;
+import org.apache.flink.runtime.taskexecutor.TestingTaskExecutorGateway;
+import org.apache.flink.runtime.taskexecutor.TestingTaskExecutorGatewayBuilder;
+import org.apache.flink.runtime.taskexecutor.slot.SlotOffer;
+import org.apache.flink.runtime.taskmanager.LocalTaskManagerLocation;
+import org.apache.flink.util.FlinkException;
+import org.apache.flink.util.TestLogger;
+
+import org.apache.flink.shaded.guava18.com.google.common.collect.Iterables;
+
+import org.hamcrest.Description;
+import org.hamcrest.TypeSafeMatcher;
+import org.junit.Test;
+
+import javax.annotation.Nonnull;
+import javax.annotation.Nullable;
+
+import java.util.ArrayList;
+import java.util.Collection;
+import java.util.HashMap;
+import java.util.Map;
+import java.util.concurrent.ArrayBlockingQueue;
+import java.util.concurrent.BlockingQueue;
+import java.util.concurrent.CompletableFuture;
+import java.util.function.BiFunction;
+import java.util.function.Consumer;
+import java.util.function.Function;
+import java.util.stream.Collectors;
+
+import static org.hamcrest.CoreMatchers.hasItems;
+import static org.hamcrest.CoreMatchers.is;
+import static org.hamcrest.CoreMatchers.not;
+import static org.hamcrest.CoreMatchers.sameInstance;
+import static org.hamcrest.Matchers.containsInAnyOrder;
+import static org.hamcrest.Matchers.empty;
+import static org.junit.Assert.assertFalse;
+import static org.junit.Assert.assertThat;
+import static org.junit.Assert.assertTrue;
+
+/**
+ * Tests for the {@link DefaultDeclarativeSlotPool}.
+ */
+public class DefaultDeclarativeSlotPoolTest extends TestLogger {
+
+       private static final ResourceProfile RESOURCE_PROFILE_1 = 
ResourceProfile.newBuilder().setCpuCores(1.7).build();
+       private static final ResourceProfile RESOURCE_PROFILE_2 = 
ResourceProfile.newBuilder().setManagedMemoryMB(100).build();
+
+       @Test
+       public void 
testIncreasingResourceRequirementsWillSendResourceRequirementNotification() 
throws InterruptedException {
+               final NewResourceRequirementsService requirementsListener = new 
NewResourceRequirementsService();
+               final DeclarativeSlotPool slotPool = 
createDefaultDeclarativeSlotPool(requirementsListener);
+
+               final ResourceCounter increment1 = 
ResourceCounter.withResource(RESOURCE_PROFILE_1, 1);
+               final ResourceCounter increment2 = createResourceRequirements();
+               slotPool.increaseResourceRequirementsBy(increment1);
+               slotPool.increaseResourceRequirementsBy(increment2);
+
+               assertThat(requirementsListener.takeResourceRequirements(), 
is(toResourceRequirements(increment1)));
+
+               final ResourceCounter totalResources = 
increment1.add(increment2);
+               assertThat(requirementsListener.takeResourceRequirements(), 
is(toResourceRequirements(totalResources)));
+               assertThat(requirementsListener.hasNextResourceRequirements(), 
is(false));
+       }
+
+       @Test
+       public void 
testDecreasingResourceRequirementsWillSendResourceRequirementNotification() 
throws InterruptedException {
+               final NewResourceRequirementsService requirementsListener = new 
NewResourceRequirementsService();
+               final DefaultDeclarativeSlotPool slotPool = 
createDefaultDeclarativeSlotPool(requirementsListener);
+
+               final ResourceCounter increment = 
ResourceCounter.withResource(RESOURCE_PROFILE_1, 3);
+               slotPool.increaseResourceRequirementsBy(increment);
+
+               requirementsListener.takeResourceRequirements();
+
+               final ResourceCounter decrement = 
ResourceCounter.withResource(RESOURCE_PROFILE_1, 2);
+               slotPool.decreaseResourceRequirementsBy(decrement);
+
+               final ResourceCounter totalResources = 
increment.subtract(decrement);
+               assertThat(requirementsListener.takeResourceRequirements(), 
is(toResourceRequirements(totalResources)));
+               assertThat(requirementsListener.hasNextResourceRequirements(), 
is(false));
+       }
+
+       @Test
+       public void testGetResourceRequirements() {
+               final DefaultDeclarativeSlotPool slotPool = 
DefaultDeclarativeSlotPoolBuilder.builder().build();
+
+               assertThat(slotPool.getResourceRequirements(), 
is(toResourceRequirements(ResourceCounter.empty())));
+
+               final ResourceCounter resourceRequirements = 
createResourceRequirements();
+
+               slotPool.increaseResourceRequirementsBy(resourceRequirements);
+
+               assertThat(slotPool.getResourceRequirements(), 
is(toResourceRequirements(resourceRequirements)));
+       }
+
+       @Test
+       public void testOfferSlots() throws InterruptedException {
+               final NewSlotsService notifyNewSlots = new NewSlotsService();
+               final DefaultDeclarativeSlotPool slotPool = 
createDefaultDeclarativeSlotPool(notifyNewSlots);
+
+               final ResourceCounter resourceRequirements = 
createResourceRequirements();
+
+               slotPool.increaseResourceRequirementsBy(resourceRequirements);
+
+               Collection<SlotOffer> slotOffers = 
createSlotOffersForResourceRequirements(resourceRequirements);
+
+               final Collection<SlotOffer> acceptedSlots = 
offerSlots(slotPool, slotOffers);
+
+               assertThat(acceptedSlots, 
containsInAnyOrder(slotOffers.toArray()));
+
+               final Collection<PhysicalSlot> newSlots = 
drainNewSlotService(notifyNewSlots);
+
+               assertThat(newSlots, 
containsInAnyOrder(slotOffers.stream().map(DefaultDeclarativeSlotPoolTest::matchesSlotOffer).collect(Collectors.toList())));
+               assertThat(slotPool.getAllSlotsInformation(), 
containsInAnyOrder(newSlots.stream().map(DefaultAllocatedSlotPoolTest::matchesPhysicalSlot).collect(Collectors.toList())));
+       }
+
+       @Test
+       public void testDuplicateSlotOfferings() throws InterruptedException {
+               final NewSlotsService notifyNewSlots = new NewSlotsService();
+               final DefaultDeclarativeSlotPool slotPool = 
createDefaultDeclarativeSlotPool(notifyNewSlots);
+
+               final ResourceCounter resourceRequirements = 
createResourceRequirements();
+
+               slotPool.increaseResourceRequirementsBy(resourceRequirements);
+
+               final Collection<SlotOffer> slotOffers = 
createSlotOffersForResourceRequirements(resourceRequirements);
+
+               offerSlots(slotPool, slotOffers);
+
+               drainNewSlotService(notifyNewSlots);
+
+               final Collection<SlotOffer> acceptedSlots = 
offerSlots(slotPool, slotOffers);
+
+               assertThat(acceptedSlots, 
containsInAnyOrder(slotOffers.toArray()));
+               // duplicate slots should not trigger notify new slots
+               assertFalse(notifyNewSlots.hasNextNewSlots());
+       }
+
+       @Test
+       public void testOfferingTooManySlots() {
+               final NewSlotsService notifyNewSlots = new NewSlotsService();
+               final DefaultDeclarativeSlotPool slotPool = 
createDefaultDeclarativeSlotPool(notifyNewSlots);
+
+               final ResourceCounter resourceRequirements = 
createResourceRequirements();
+
+               slotPool.increaseResourceRequirementsBy(resourceRequirements);
+
+               final ResourceCounter increasedRequirements = 
resourceRequirements.add(RESOURCE_PROFILE_1, 2);
+
+               final Collection<SlotOffer> slotOffers = 
createSlotOffersForResourceRequirements(increasedRequirements);
+
+               final Collection<SlotOffer> acceptedSlots = 
offerSlots(slotPool, slotOffers);
+
+               final Map<ResourceProfile, Long> resourceProfileCount = 
acceptedSlots.stream().map(SlotOffer::getResourceProfile).collect(Collectors.groupingBy(Function.identity(),
 Collectors.counting()));
+
+               for (Map.Entry<ResourceProfile, Integer> resourceCount : 
resourceRequirements.getResourcesWithCount()) {
+                       
assertThat(resourceProfileCount.getOrDefault(resourceCount.getKey(), 0L), 
is((long) resourceCount.getValue()));
+               }
+       }
+
+       @Test
+       public void testReleaseSlotsRemovesSlots() throws InterruptedException {
+               final NewResourceRequirementsService 
notifyNewResourceRequirements = new NewResourceRequirementsService();
+               final DefaultDeclarativeSlotPool slotPool = 
createDefaultDeclarativeSlotPool(notifyNewResourceRequirements);
+
+               final LocalTaskManagerLocation taskManagerLocation = new 
LocalTaskManagerLocation();
+               increaseRequirementsAndOfferSlotsToSlotPool(slotPool, 
createResourceRequirements(), taskManagerLocation);
+
+               notifyNewResourceRequirements.takeResourceRequirements();
+
+               slotPool.releaseSlots(taskManagerLocation.getResourceID(), new 
FlinkException("Test failure"));
+               assertThat(slotPool.getAllSlotsInformation(), is(empty()));
+       }
+
+       @Test
+       public void testReleaseSlotsReturnsSlot() {
+               final DefaultDeclarativeSlotPool slotPool = 
DefaultDeclarativeSlotPoolBuilder.builder().build();
+
+               final ResourceCounter resourceRequirements = 
createResourceRequirements();
+
+               final LocalTaskManagerLocation taskManagerLocation = new 
LocalTaskManagerLocation();
+               final FreeSlotConsumer freeSlotConsumer = new 
FreeSlotConsumer();
+               final TestingTaskExecutorGateway testingTaskExecutorGateway = 
new TestingTaskExecutorGatewayBuilder()
+                       .setFreeSlotFunction(freeSlotConsumer)
+                       .createTestingTaskExecutorGateway();
+
+               final Collection<SlotOffer> slotOffers = 
increaseRequirementsAndOfferSlotsToSlotPool(
+                       slotPool,
+                       resourceRequirements,
+                       taskManagerLocation,
+                       testingTaskExecutorGateway);
+
+               slotPool.releaseSlots(taskManagerLocation.getResourceID(), new 
FlinkException("Test failure"));
+
+               final Collection<AllocationID> freedSlots = 
freeSlotConsumer.drainFreedSlots();
+
+               assertThat(freedSlots, 
containsInAnyOrder(slotOffers.stream().map(SlotOffer::getAllocationId).toArray()));
+       }
+
+       @Test
+       public void testFailSlotDecreasesResources() throws 
InterruptedException {
+               final NewSlotsService notifyNewSlots = new NewSlotsService();
+               final DefaultDeclarativeSlotPool slotPool = 
createDefaultDeclarativeSlotPool(notifyNewSlots);
+
+               final ResourceCounter resourceRequirements = 
createResourceRequirements();
+               increaseRequirementsAndOfferSlotsToSlotPool(slotPool, 
resourceRequirements, null);
+
+               final Collection<? extends PhysicalSlot> physicalSlots = 
notifyNewSlots.takeNewSlots();
+
+               final PhysicalSlot physicalSlot = 
physicalSlots.iterator().next();
+
+               slotPool.releaseSlot(physicalSlot.getAllocationId(), new 
FlinkException("Test failure"));
+
+               final ResourceCounter finalResourceRequirements = 
resourceRequirements.subtract(physicalSlot.getResourceProfile(), 1);
+               assertThat(slotPool.getFulfilledResourceRequirements(), 
is(finalResourceRequirements));
+       }
+
+       @Test
+       public void testFailSlotReturnsSlot() throws InterruptedException {

Review comment:
       `testReleaseSlotReturnsSlot`

##########
File path: 
flink-runtime/src/test/java/org/apache/flink/runtime/jobmaster/slotpool/DefaultDeclarativeSlotPoolTest.java
##########
@@ -0,0 +1,550 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.runtime.jobmaster.slotpool;
+
+import org.apache.flink.api.common.time.Time;
+import org.apache.flink.runtime.clusterframework.types.AllocationID;
+import org.apache.flink.runtime.clusterframework.types.ResourceProfile;
+import org.apache.flink.runtime.jobmanager.slots.TaskManagerGateway;
+import org.apache.flink.runtime.jobmaster.JobMasterId;
+import org.apache.flink.runtime.jobmaster.RpcTaskManagerGateway;
+import org.apache.flink.runtime.messages.Acknowledge;
+import org.apache.flink.runtime.slots.ResourceRequirement;
+import org.apache.flink.runtime.taskexecutor.TaskExecutorGateway;
+import org.apache.flink.runtime.taskexecutor.TestingTaskExecutorGateway;
+import org.apache.flink.runtime.taskexecutor.TestingTaskExecutorGatewayBuilder;
+import org.apache.flink.runtime.taskexecutor.slot.SlotOffer;
+import org.apache.flink.runtime.taskmanager.LocalTaskManagerLocation;
+import org.apache.flink.util.FlinkException;
+import org.apache.flink.util.TestLogger;
+
+import org.apache.flink.shaded.guava18.com.google.common.collect.Iterables;
+
+import org.hamcrest.Description;
+import org.hamcrest.TypeSafeMatcher;
+import org.junit.Test;
+
+import javax.annotation.Nonnull;
+import javax.annotation.Nullable;
+
+import java.util.ArrayList;
+import java.util.Collection;
+import java.util.HashMap;
+import java.util.Map;
+import java.util.concurrent.ArrayBlockingQueue;
+import java.util.concurrent.BlockingQueue;
+import java.util.concurrent.CompletableFuture;
+import java.util.function.BiFunction;
+import java.util.function.Consumer;
+import java.util.function.Function;
+import java.util.stream.Collectors;
+
+import static org.hamcrest.CoreMatchers.hasItems;
+import static org.hamcrest.CoreMatchers.is;
+import static org.hamcrest.CoreMatchers.not;
+import static org.hamcrest.CoreMatchers.sameInstance;
+import static org.hamcrest.Matchers.containsInAnyOrder;
+import static org.hamcrest.Matchers.empty;
+import static org.junit.Assert.assertFalse;
+import static org.junit.Assert.assertThat;
+import static org.junit.Assert.assertTrue;
+
+/**
+ * Tests for the {@link DefaultDeclarativeSlotPool}.
+ */
+public class DefaultDeclarativeSlotPoolTest extends TestLogger {
+
+       private static final ResourceProfile RESOURCE_PROFILE_1 = 
ResourceProfile.newBuilder().setCpuCores(1.7).build();
+       private static final ResourceProfile RESOURCE_PROFILE_2 = 
ResourceProfile.newBuilder().setManagedMemoryMB(100).build();
+
+       @Test
+       public void 
testIncreasingResourceRequirementsWillSendResourceRequirementNotification() 
throws InterruptedException {
+               final NewResourceRequirementsService requirementsListener = new 
NewResourceRequirementsService();
+               final DeclarativeSlotPool slotPool = 
createDefaultDeclarativeSlotPool(requirementsListener);
+
+               final ResourceCounter increment1 = 
ResourceCounter.withResource(RESOURCE_PROFILE_1, 1);
+               final ResourceCounter increment2 = createResourceRequirements();
+               slotPool.increaseResourceRequirementsBy(increment1);
+               slotPool.increaseResourceRequirementsBy(increment2);
+
+               assertThat(requirementsListener.takeResourceRequirements(), 
is(toResourceRequirements(increment1)));
+
+               final ResourceCounter totalResources = 
increment1.add(increment2);
+               assertThat(requirementsListener.takeResourceRequirements(), 
is(toResourceRequirements(totalResources)));
+               assertThat(requirementsListener.hasNextResourceRequirements(), 
is(false));
+       }
+
+       @Test
+       public void 
testDecreasingResourceRequirementsWillSendResourceRequirementNotification() 
throws InterruptedException {
+               final NewResourceRequirementsService requirementsListener = new 
NewResourceRequirementsService();
+               final DefaultDeclarativeSlotPool slotPool = 
createDefaultDeclarativeSlotPool(requirementsListener);
+
+               final ResourceCounter increment = 
ResourceCounter.withResource(RESOURCE_PROFILE_1, 3);
+               slotPool.increaseResourceRequirementsBy(increment);
+
+               requirementsListener.takeResourceRequirements();
+
+               final ResourceCounter decrement = 
ResourceCounter.withResource(RESOURCE_PROFILE_1, 2);
+               slotPool.decreaseResourceRequirementsBy(decrement);
+
+               final ResourceCounter totalResources = 
increment.subtract(decrement);
+               assertThat(requirementsListener.takeResourceRequirements(), 
is(toResourceRequirements(totalResources)));
+               assertThat(requirementsListener.hasNextResourceRequirements(), 
is(false));
+       }
+
+       @Test
+       public void testGetResourceRequirements() {
+               final DefaultDeclarativeSlotPool slotPool = 
DefaultDeclarativeSlotPoolBuilder.builder().build();
+
+               assertThat(slotPool.getResourceRequirements(), 
is(toResourceRequirements(ResourceCounter.empty())));
+
+               final ResourceCounter resourceRequirements = 
createResourceRequirements();
+
+               slotPool.increaseResourceRequirementsBy(resourceRequirements);
+
+               assertThat(slotPool.getResourceRequirements(), 
is(toResourceRequirements(resourceRequirements)));
+       }
+
+       @Test
+       public void testOfferSlots() throws InterruptedException {
+               final NewSlotsService notifyNewSlots = new NewSlotsService();
+               final DefaultDeclarativeSlotPool slotPool = 
createDefaultDeclarativeSlotPool(notifyNewSlots);
+
+               final ResourceCounter resourceRequirements = 
createResourceRequirements();
+
+               slotPool.increaseResourceRequirementsBy(resourceRequirements);
+
+               Collection<SlotOffer> slotOffers = 
createSlotOffersForResourceRequirements(resourceRequirements);
+
+               final Collection<SlotOffer> acceptedSlots = 
offerSlots(slotPool, slotOffers);
+
+               assertThat(acceptedSlots, 
containsInAnyOrder(slotOffers.toArray()));
+
+               final Collection<PhysicalSlot> newSlots = 
drainNewSlotService(notifyNewSlots);
+
+               assertThat(newSlots, 
containsInAnyOrder(slotOffers.stream().map(DefaultDeclarativeSlotPoolTest::matchesSlotOffer).collect(Collectors.toList())));
+               assertThat(slotPool.getAllSlotsInformation(), 
containsInAnyOrder(newSlots.stream().map(DefaultAllocatedSlotPoolTest::matchesPhysicalSlot).collect(Collectors.toList())));
+       }
+
+       @Test
+       public void testDuplicateSlotOfferings() throws InterruptedException {
+               final NewSlotsService notifyNewSlots = new NewSlotsService();
+               final DefaultDeclarativeSlotPool slotPool = 
createDefaultDeclarativeSlotPool(notifyNewSlots);
+
+               final ResourceCounter resourceRequirements = 
createResourceRequirements();
+
+               slotPool.increaseResourceRequirementsBy(resourceRequirements);
+
+               final Collection<SlotOffer> slotOffers = 
createSlotOffersForResourceRequirements(resourceRequirements);
+
+               offerSlots(slotPool, slotOffers);
+
+               drainNewSlotService(notifyNewSlots);
+
+               final Collection<SlotOffer> acceptedSlots = 
offerSlots(slotPool, slotOffers);
+
+               assertThat(acceptedSlots, 
containsInAnyOrder(slotOffers.toArray()));
+               // duplicate slots should not trigger notify new slots
+               assertFalse(notifyNewSlots.hasNextNewSlots());
+       }
+
+       @Test
+       public void testOfferingTooManySlots() {
+               final NewSlotsService notifyNewSlots = new NewSlotsService();
+               final DefaultDeclarativeSlotPool slotPool = 
createDefaultDeclarativeSlotPool(notifyNewSlots);
+
+               final ResourceCounter resourceRequirements = 
createResourceRequirements();
+
+               slotPool.increaseResourceRequirementsBy(resourceRequirements);
+
+               final ResourceCounter increasedRequirements = 
resourceRequirements.add(RESOURCE_PROFILE_1, 2);
+
+               final Collection<SlotOffer> slotOffers = 
createSlotOffersForResourceRequirements(increasedRequirements);
+
+               final Collection<SlotOffer> acceptedSlots = 
offerSlots(slotPool, slotOffers);
+
+               final Map<ResourceProfile, Long> resourceProfileCount = 
acceptedSlots.stream().map(SlotOffer::getResourceProfile).collect(Collectors.groupingBy(Function.identity(),
 Collectors.counting()));
+
+               for (Map.Entry<ResourceProfile, Integer> resourceCount : 
resourceRequirements.getResourcesWithCount()) {
+                       
assertThat(resourceProfileCount.getOrDefault(resourceCount.getKey(), 0L), 
is((long) resourceCount.getValue()));
+               }
+       }
+
+       @Test
+       public void testReleaseSlotsRemovesSlots() throws InterruptedException {
+               final NewResourceRequirementsService 
notifyNewResourceRequirements = new NewResourceRequirementsService();
+               final DefaultDeclarativeSlotPool slotPool = 
createDefaultDeclarativeSlotPool(notifyNewResourceRequirements);
+
+               final LocalTaskManagerLocation taskManagerLocation = new 
LocalTaskManagerLocation();
+               increaseRequirementsAndOfferSlotsToSlotPool(slotPool, 
createResourceRequirements(), taskManagerLocation);
+
+               notifyNewResourceRequirements.takeResourceRequirements();
+
+               slotPool.releaseSlots(taskManagerLocation.getResourceID(), new 
FlinkException("Test failure"));
+               assertThat(slotPool.getAllSlotsInformation(), is(empty()));
+       }
+
+       @Test
+       public void testReleaseSlotsReturnsSlot() {
+               final DefaultDeclarativeSlotPool slotPool = 
DefaultDeclarativeSlotPoolBuilder.builder().build();
+
+               final ResourceCounter resourceRequirements = 
createResourceRequirements();
+
+               final LocalTaskManagerLocation taskManagerLocation = new 
LocalTaskManagerLocation();
+               final FreeSlotConsumer freeSlotConsumer = new 
FreeSlotConsumer();
+               final TestingTaskExecutorGateway testingTaskExecutorGateway = 
new TestingTaskExecutorGatewayBuilder()
+                       .setFreeSlotFunction(freeSlotConsumer)
+                       .createTestingTaskExecutorGateway();
+
+               final Collection<SlotOffer> slotOffers = 
increaseRequirementsAndOfferSlotsToSlotPool(
+                       slotPool,
+                       resourceRequirements,
+                       taskManagerLocation,
+                       testingTaskExecutorGateway);
+
+               slotPool.releaseSlots(taskManagerLocation.getResourceID(), new 
FlinkException("Test failure"));
+
+               final Collection<AllocationID> freedSlots = 
freeSlotConsumer.drainFreedSlots();
+
+               assertThat(freedSlots, 
containsInAnyOrder(slotOffers.stream().map(SlotOffer::getAllocationId).toArray()));
+       }
+
+       @Test
+       public void testFailSlotDecreasesResources() throws 
InterruptedException {
+               final NewSlotsService notifyNewSlots = new NewSlotsService();
+               final DefaultDeclarativeSlotPool slotPool = 
createDefaultDeclarativeSlotPool(notifyNewSlots);
+
+               final ResourceCounter resourceRequirements = 
createResourceRequirements();
+               increaseRequirementsAndOfferSlotsToSlotPool(slotPool, 
resourceRequirements, null);
+
+               final Collection<? extends PhysicalSlot> physicalSlots = 
notifyNewSlots.takeNewSlots();
+
+               final PhysicalSlot physicalSlot = 
physicalSlots.iterator().next();
+
+               slotPool.releaseSlot(physicalSlot.getAllocationId(), new 
FlinkException("Test failure"));
+
+               final ResourceCounter finalResourceRequirements = 
resourceRequirements.subtract(physicalSlot.getResourceProfile(), 1);
+               assertThat(slotPool.getFulfilledResourceRequirements(), 
is(finalResourceRequirements));
+       }
+
+       @Test
+       public void testFailSlotReturnsSlot() throws InterruptedException {
+               final NewSlotsService notifyNewSlots = new NewSlotsService();
+               final DefaultDeclarativeSlotPool slotPool = 
createDefaultDeclarativeSlotPool(notifyNewSlots);
+
+               final ResourceCounter resourceRequirements = 
createResourceRequirements();
+               final FreeSlotConsumer freeSlotConsumer = new 
FreeSlotConsumer();
+               final TestingTaskExecutorGateway testingTaskExecutorGateway = 
new TestingTaskExecutorGatewayBuilder()
+                       .setFreeSlotFunction(freeSlotConsumer)
+                       .createTestingTaskExecutorGateway();
+
+               increaseRequirementsAndOfferSlotsToSlotPool(
+                       slotPool,
+                       resourceRequirements,
+                       new LocalTaskManagerLocation(),
+                       testingTaskExecutorGateway);
+
+               final Collection<? extends PhysicalSlot> physicalSlots = 
notifyNewSlots.takeNewSlots();
+
+               final PhysicalSlot physicalSlot = 
physicalSlots.iterator().next();
+
+               slotPool.releaseSlot(physicalSlot.getAllocationId(), new 
FlinkException("Test failure"));
+
+               final AllocationID freedSlot = 
Iterables.getOnlyElement(freeSlotConsumer.drainFreedSlots());
+
+               assertThat(freedSlot, is(physicalSlot.getAllocationId()));
+       }
+
+       @Test
+       public void testReturnIdleSlotsAfterTimeout() {
+               final Time idleSlotTimeout = Time.seconds(10);
+               final long offerTime = 0;
+               final DefaultDeclarativeSlotPool slotPool = 
DefaultDeclarativeSlotPoolBuilder.builder()
+                       .setIdleSlotTimeout(idleSlotTimeout)
+                       .build();
+
+               final ResourceCounter resourceRequirements = 
createResourceRequirements();
+               final FreeSlotConsumer freeSlotConsumer = new 
FreeSlotConsumer();
+               final TestingTaskExecutorGateway testingTaskExecutorGateway = 
new TestingTaskExecutorGatewayBuilder()
+                       .setFreeSlotFunction(freeSlotConsumer)
+                       .createTestingTaskExecutorGateway();
+
+               final Collection<SlotOffer> acceptedSlots = 
increaseRequirementsAndOfferSlotsToSlotPool(
+                       slotPool,
+                       resourceRequirements,
+                       new LocalTaskManagerLocation(),
+                       testingTaskExecutorGateway);
+
+               // decrease the resource requirements so that slots are no 
longer needed
+               slotPool.decreaseResourceRequirementsBy(resourceRequirements);
+
+               slotPool.releaseIdleSlots(offerTime + 
idleSlotTimeout.toMilliseconds());
+
+               final Collection<AllocationID> freedSlots = 
freeSlotConsumer.drainFreedSlots();
+
+               assertThat(acceptedSlots, is(not(empty())));
+               assertThat(freedSlots, 
containsInAnyOrder(acceptedSlots.stream().map(SlotOffer::getAllocationId).toArray()));
+               assertNoAvailableAndRequiredResources(slotPool);
+       }
+
+       private void 
assertNoAvailableAndRequiredResources(DefaultDeclarativeSlotPool slotPool) {
+               
assertTrue(slotPool.getFulfilledResourceRequirements().isEmpty());
+               assertTrue(slotPool.getResourceRequirements().isEmpty());
+               assertThat(slotPool.getAllSlotsInformation(), is(empty()));
+       }
+
+       @Test
+       public void testOnlyReturnExcessIdleSlots() {
+               final Time idleSlotTimeout = Time.seconds(10);
+               final long offerTime = 0;
+               final DefaultDeclarativeSlotPool slotPool = 
DefaultDeclarativeSlotPoolBuilder.builder()
+                       .setIdleSlotTimeout(idleSlotTimeout)
+                       .build();
+
+               final ResourceCounter resourceRequirements = 
createResourceRequirements();
+               final Collection<SlotOffer> slotOffers = 
createSlotOffersForResourceRequirements(resourceRequirements);
+
+               slotPool.increaseResourceRequirementsBy(resourceRequirements);
+               final Collection<SlotOffer> acceptedSlots = 
offerSlots(slotPool, slotOffers);
+
+               final ResourceCounter requiredResources = 
ResourceCounter.withResource(RESOURCE_PROFILE_1, 1);
+               final ResourceCounter excessRequirements = 
resourceRequirements.subtract(requiredResources);
+               slotPool.decreaseResourceRequirementsBy(excessRequirements);
+
+               slotPool.releaseIdleSlots(offerTime + 
idleSlotTimeout.toMilliseconds());
+
+               assertThat(acceptedSlots, is(not(empty())));
+               assertThat(slotPool.getFulfilledResourceRequirements(), 
is(requiredResources));
+       }
+
+       @Test
+       public void 
testReleasedSlotWillBeUsedToFulfillOutstandingResourceRequirements() throws 
InterruptedException {
+               final NewSlotsService notifyNewSlots = new NewSlotsService();
+               final DefaultDeclarativeSlotPool slotPool = 
createDefaultDeclarativeSlotPool(notifyNewSlots);
+
+               final ResourceCounter initialRequirements = 
ResourceCounter.withResource(RESOURCE_PROFILE_1, 1);
+
+               increaseRequirementsAndOfferSlotsToSlotPool(slotPool, 
initialRequirements, null);
+
+               final Collection<PhysicalSlot> newSlots = 
drainNewSlotService(notifyNewSlots);
+               final PhysicalSlot newSlot = Iterables.getOnlyElement(newSlots);
+
+               slotPool.reserveFreeSlot(newSlot.getAllocationId(), 
RESOURCE_PROFILE_1);
+               slotPool.freeReservedSlot(newSlot.getAllocationId(), null, 0);
+
+               final Collection<PhysicalSlot> recycledSlots = 
drainNewSlotService(notifyNewSlots);
+
+               assertThat(Iterables.getOnlyElement(recycledSlots), 
sameInstance(newSlot));
+
+               final Collection<SlotOffer> newSlotOffers = 
createSlotOffersForResourceRequirements(initialRequirements);
+
+               // the pending requirement should be fulfilled by the released 
slot --> rejecting new slot offers

Review comment:
       ```suggestion
                // the pending requirement should be fulfilled by the freed 
slot --> rejecting new slot offers
   ```

##########
File path: 
flink-runtime/src/main/java/org/apache/flink/runtime/jobmaster/slotpool/DefaultDeclarativeSlotPool.java
##########
@@ -0,0 +1,428 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.runtime.jobmaster.slotpool;
+
+import org.apache.flink.annotation.VisibleForTesting;
+import org.apache.flink.api.common.time.Time;
+import org.apache.flink.runtime.clusterframework.types.AllocationID;
+import org.apache.flink.runtime.clusterframework.types.ResourceID;
+import org.apache.flink.runtime.clusterframework.types.ResourceProfile;
+import org.apache.flink.runtime.jobmanager.slots.TaskManagerGateway;
+import org.apache.flink.runtime.jobmaster.SlotInfo;
+import org.apache.flink.runtime.messages.Acknowledge;
+import org.apache.flink.runtime.slots.DefaultRequirementMatcher;
+import org.apache.flink.runtime.slots.RequirementMatcher;
+import org.apache.flink.runtime.slots.ResourceRequirement;
+import org.apache.flink.runtime.taskexecutor.slot.SlotOffer;
+import org.apache.flink.runtime.taskmanager.TaskManagerLocation;
+import org.apache.flink.util.FlinkException;
+import org.apache.flink.util.Preconditions;
+
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import javax.annotation.Nonnull;
+import javax.annotation.Nullable;
+
+import java.util.ArrayList;
+import java.util.Collection;
+import java.util.Collections;
+import java.util.HashMap;
+import java.util.Iterator;
+import java.util.Map;
+import java.util.Optional;
+import java.util.concurrent.CompletableFuture;
+import java.util.function.Consumer;
+import java.util.stream.Collectors;
+
+/**
+ * Default {@link DeclarativeSlotPool} implementation.
+ *
+ * <p>The implementation collects the current resource requirements and 
declares them
+ * at the ResourceManager. Whenever new slots are offered, the slot pool 
compares the
+ * offered slots to the set of available and required resources and only 
accepts those
+ * slots which are required.
+ *
+ * <p>Slots which are released won't be returned directly to their owners. 
Instead,
+ * the slot pool implementation will only return them after the 
idleSlotTimeout has
+ * been exceeded by a free slot.
+ *
+ * <p>The slot pool will call {@link #notifyNewSlots} whenever newly offered 
slots are
+ * accepted or if an allocated slot should become free after it is being
+ * {@link #freeReservedSlot released}.
+ */
+public class DefaultDeclarativeSlotPool implements DeclarativeSlotPool {
+
+       private static final Logger LOG = 
LoggerFactory.getLogger(DefaultDeclarativeSlotPool.class);
+
+       private final Consumer<? super Collection<ResourceRequirement>> 
notifyNewResourceRequirements;
+
+       private final Consumer<? super Collection<? extends PhysicalSlot>> 
notifyNewSlots;
+
+       private final Time idleSlotTimeout;
+       private final Time rpcTimeout;
+
+       private final AllocatedSlotPool slotPool;
+
+       private final Map<AllocationID, ResourceProfile> 
slotToRequirementProfileMappings;
+
+       private ResourceCounter totalResourceRequirements;
+
+       private ResourceCounter fulfilledResourceRequirements;
+
+       private final RequirementMatcher requirementMatcher = new 
DefaultRequirementMatcher();
+
+       public DefaultDeclarativeSlotPool(
+               AllocatedSlotPool slotPool,
+               Consumer<? super Collection<ResourceRequirement>> 
notifyNewResourceRequirements,
+               Consumer<? super Collection<? extends PhysicalSlot>> 
notifyNewSlots,
+               Time idleSlotTimeout,
+               Time rpcTimeout) {
+
+               this.slotPool = slotPool;
+               this.notifyNewResourceRequirements = 
notifyNewResourceRequirements;
+               this.notifyNewSlots = notifyNewSlots;
+               this.idleSlotTimeout = idleSlotTimeout;
+               this.rpcTimeout = rpcTimeout;
+               this.totalResourceRequirements = ResourceCounter.empty();
+               this.fulfilledResourceRequirements = ResourceCounter.empty();
+               this.slotToRequirementProfileMappings = new HashMap<>();
+       }
+
+       @Override
+       public void increaseResourceRequirementsBy(ResourceCounter increment) {
+               totalResourceRequirements = 
totalResourceRequirements.add(increment);
+
+               declareResourceRequirements();
+       }
+
+       @Override
+       public void decreaseResourceRequirementsBy(ResourceCounter decrement) {
+               totalResourceRequirements = 
totalResourceRequirements.subtract(decrement);
+
+               declareResourceRequirements();
+       }
+
+       private void declareResourceRequirements() {
+               notifyNewResourceRequirements.accept(getResourceRequirements());
+       }
+
+       @Override
+       public Collection<ResourceRequirement> getResourceRequirements() {
+               final Collection<ResourceRequirement> 
currentResourceRequirements = new ArrayList<>();
+
+               for (Map.Entry<ResourceProfile, Integer> resourceRequirement : 
totalResourceRequirements.getResourcesWithCount()) {
+                       
currentResourceRequirements.add(ResourceRequirement.create(resourceRequirement.getKey(),
 resourceRequirement.getValue()));
+               }
+
+               return currentResourceRequirements;
+       }
+
+       @Override
+       public Collection<SlotOffer> offerSlots(
+               Collection<? extends SlotOffer> offers,
+               TaskManagerLocation taskManagerLocation,
+               TaskManagerGateway taskManagerGateway,
+               long currentTime) {
+
+               LOG.debug("Received {} slot offers from TaskExecutor {}.", 
offers.size(), taskManagerLocation);
+               final Collection<SlotOffer> acceptedSlotOffers = new 
ArrayList<>();
+               final Collection<AllocatedSlot> acceptedSlots = new 
ArrayList<>();
+
+               for (SlotOffer offer : offers) {
+                       if (slotPool.containsSlot(offer.getAllocationId())) {
+                               // we have already accepted this offer
+                               acceptedSlotOffers.add(offer);
+                       } else {
+                               Optional<AllocatedSlot> acceptedSlot = 
matchOfferWithOutstandingRequirements(offer, taskManagerLocation, 
taskManagerGateway);
+                               if (acceptedSlot.isPresent()) {
+                                       acceptedSlotOffers.add(offer);
+                                       acceptedSlots.add(acceptedSlot.get());
+                               }
+                       }
+               }
+
+               slotPool.addSlots(acceptedSlots, currentTime);
+
+               if (!acceptedSlots.isEmpty()) {
+                       notifyNewSlots.accept(acceptedSlots);
+               }
+
+               return acceptedSlotOffers;
+       }
+
+       private Optional<AllocatedSlot> matchOfferWithOutstandingRequirements(
+                       SlotOffer slotOffer,
+                       TaskManagerLocation taskManagerLocation,
+                       TaskManagerGateway taskManagerGateway) {
+
+               final Optional<ResourceProfile> match = 
requirementMatcher.match(
+                               slotOffer.getResourceProfile(),
+                               
totalResourceRequirements.getResourcesWithCount(),
+                               
fulfilledResourceRequirements::getResourceCount);
+
+               if (match.isPresent()) {
+                       
increaseAvailableResources(ResourceCounter.withResource(match.get(), 1));
+
+                       final AllocatedSlot allocatedSlot = createAllocatedSlot(
+                                       slotOffer,
+                                       taskManagerLocation,
+                                       taskManagerGateway);
+
+                       // store the ResourceProfile against which the given 
slot has matched for future book-keeping
+                       
slotToRequirementProfileMappings.put(allocatedSlot.getAllocationId(), 
match.get());
+
+                       return Optional.of(allocatedSlot);
+               }
+               return Optional.empty();
+       }
+
+       @VisibleForTesting
+       ResourceCounter calculateUnfulfilledResources() {
+               return 
totalResourceRequirements.subtract(fulfilledResourceRequirements);
+       }
+
+       private AllocatedSlot createAllocatedSlot(
+               SlotOffer slotOffer,
+               TaskManagerLocation taskManagerLocation,
+               TaskManagerGateway taskManagerGateway) {
+               return new AllocatedSlot(
+                       slotOffer.getAllocationId(),
+                       taskManagerLocation,
+                       slotOffer.getSlotIndex(),
+                       slotOffer.getResourceProfile(),
+                       taskManagerGateway);
+       }
+
+       private void increaseAvailableResources(ResourceCounter 
acceptedResources) {
+               fulfilledResourceRequirements = 
fulfilledResourceRequirements.add(acceptedResources);
+       }
+
+       @Nonnull
+       private ResourceProfile getMatchingResourceProfile(AllocationID 
allocationId) {
+               return 
Preconditions.checkNotNull(slotToRequirementProfileMappings.get(allocationId), 
"No matching resource profile found for %s", allocationId);
+       }
+
+       @Override
+       public PhysicalSlot reserveFreeSlot(AllocationID allocationId, 
ResourceProfile requiredSlotProfile) {
+               final AllocatedSlot allocatedSlot = 
slotPool.reserveFreeSlot(allocationId);
+
+               
Preconditions.checkState(allocatedSlot.getResourceProfile().isMatching(requiredSlotProfile),
 "");

Review comment:
       The error message is missing.

##########
File path: 
flink-runtime/src/test/java/org/apache/flink/runtime/jobmaster/slotpool/DefaultDeclarativeSlotPoolTest.java
##########
@@ -0,0 +1,550 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.runtime.jobmaster.slotpool;
+
+import org.apache.flink.api.common.time.Time;
+import org.apache.flink.runtime.clusterframework.types.AllocationID;
+import org.apache.flink.runtime.clusterframework.types.ResourceProfile;
+import org.apache.flink.runtime.jobmanager.slots.TaskManagerGateway;
+import org.apache.flink.runtime.jobmaster.JobMasterId;
+import org.apache.flink.runtime.jobmaster.RpcTaskManagerGateway;
+import org.apache.flink.runtime.messages.Acknowledge;
+import org.apache.flink.runtime.slots.ResourceRequirement;
+import org.apache.flink.runtime.taskexecutor.TaskExecutorGateway;
+import org.apache.flink.runtime.taskexecutor.TestingTaskExecutorGateway;
+import org.apache.flink.runtime.taskexecutor.TestingTaskExecutorGatewayBuilder;
+import org.apache.flink.runtime.taskexecutor.slot.SlotOffer;
+import org.apache.flink.runtime.taskmanager.LocalTaskManagerLocation;
+import org.apache.flink.util.FlinkException;
+import org.apache.flink.util.TestLogger;
+
+import org.apache.flink.shaded.guava18.com.google.common.collect.Iterables;
+
+import org.hamcrest.Description;
+import org.hamcrest.TypeSafeMatcher;
+import org.junit.Test;
+
+import javax.annotation.Nonnull;
+import javax.annotation.Nullable;
+
+import java.util.ArrayList;
+import java.util.Collection;
+import java.util.HashMap;
+import java.util.Map;
+import java.util.concurrent.ArrayBlockingQueue;
+import java.util.concurrent.BlockingQueue;
+import java.util.concurrent.CompletableFuture;
+import java.util.function.BiFunction;
+import java.util.function.Consumer;
+import java.util.function.Function;
+import java.util.stream.Collectors;
+
+import static org.hamcrest.CoreMatchers.hasItems;
+import static org.hamcrest.CoreMatchers.is;
+import static org.hamcrest.CoreMatchers.not;
+import static org.hamcrest.CoreMatchers.sameInstance;
+import static org.hamcrest.Matchers.containsInAnyOrder;
+import static org.hamcrest.Matchers.empty;
+import static org.junit.Assert.assertFalse;
+import static org.junit.Assert.assertThat;
+import static org.junit.Assert.assertTrue;
+
+/**
+ * Tests for the {@link DefaultDeclarativeSlotPool}.
+ */
+public class DefaultDeclarativeSlotPoolTest extends TestLogger {
+
+       private static final ResourceProfile RESOURCE_PROFILE_1 = 
ResourceProfile.newBuilder().setCpuCores(1.7).build();
+       private static final ResourceProfile RESOURCE_PROFILE_2 = 
ResourceProfile.newBuilder().setManagedMemoryMB(100).build();
+
+       @Test
+       public void 
testIncreasingResourceRequirementsWillSendResourceRequirementNotification() 
throws InterruptedException {
+               final NewResourceRequirementsService requirementsListener = new 
NewResourceRequirementsService();
+               final DeclarativeSlotPool slotPool = 
createDefaultDeclarativeSlotPool(requirementsListener);
+
+               final ResourceCounter increment1 = 
ResourceCounter.withResource(RESOURCE_PROFILE_1, 1);
+               final ResourceCounter increment2 = createResourceRequirements();
+               slotPool.increaseResourceRequirementsBy(increment1);
+               slotPool.increaseResourceRequirementsBy(increment2);
+
+               assertThat(requirementsListener.takeResourceRequirements(), 
is(toResourceRequirements(increment1)));
+
+               final ResourceCounter totalResources = 
increment1.add(increment2);
+               assertThat(requirementsListener.takeResourceRequirements(), 
is(toResourceRequirements(totalResources)));
+               assertThat(requirementsListener.hasNextResourceRequirements(), 
is(false));
+       }
+
+       @Test
+       public void 
testDecreasingResourceRequirementsWillSendResourceRequirementNotification() 
throws InterruptedException {
+               final NewResourceRequirementsService requirementsListener = new 
NewResourceRequirementsService();
+               final DefaultDeclarativeSlotPool slotPool = 
createDefaultDeclarativeSlotPool(requirementsListener);
+
+               final ResourceCounter increment = 
ResourceCounter.withResource(RESOURCE_PROFILE_1, 3);
+               slotPool.increaseResourceRequirementsBy(increment);
+
+               requirementsListener.takeResourceRequirements();
+
+               final ResourceCounter decrement = 
ResourceCounter.withResource(RESOURCE_PROFILE_1, 2);
+               slotPool.decreaseResourceRequirementsBy(decrement);
+
+               final ResourceCounter totalResources = 
increment.subtract(decrement);
+               assertThat(requirementsListener.takeResourceRequirements(), 
is(toResourceRequirements(totalResources)));
+               assertThat(requirementsListener.hasNextResourceRequirements(), 
is(false));
+       }
+
+       @Test
+       public void testGetResourceRequirements() {
+               final DefaultDeclarativeSlotPool slotPool = 
DefaultDeclarativeSlotPoolBuilder.builder().build();
+
+               assertThat(slotPool.getResourceRequirements(), 
is(toResourceRequirements(ResourceCounter.empty())));
+
+               final ResourceCounter resourceRequirements = 
createResourceRequirements();
+
+               slotPool.increaseResourceRequirementsBy(resourceRequirements);
+
+               assertThat(slotPool.getResourceRequirements(), 
is(toResourceRequirements(resourceRequirements)));
+       }
+
+       @Test
+       public void testOfferSlots() throws InterruptedException {
+               final NewSlotsService notifyNewSlots = new NewSlotsService();
+               final DefaultDeclarativeSlotPool slotPool = 
createDefaultDeclarativeSlotPool(notifyNewSlots);
+
+               final ResourceCounter resourceRequirements = 
createResourceRequirements();
+
+               slotPool.increaseResourceRequirementsBy(resourceRequirements);
+
+               Collection<SlotOffer> slotOffers = 
createSlotOffersForResourceRequirements(resourceRequirements);
+
+               final Collection<SlotOffer> acceptedSlots = 
offerSlots(slotPool, slotOffers);
+
+               assertThat(acceptedSlots, 
containsInAnyOrder(slotOffers.toArray()));
+
+               final Collection<PhysicalSlot> newSlots = 
drainNewSlotService(notifyNewSlots);
+
+               assertThat(newSlots, 
containsInAnyOrder(slotOffers.stream().map(DefaultDeclarativeSlotPoolTest::matchesSlotOffer).collect(Collectors.toList())));
+               assertThat(slotPool.getAllSlotsInformation(), 
containsInAnyOrder(newSlots.stream().map(DefaultAllocatedSlotPoolTest::matchesPhysicalSlot).collect(Collectors.toList())));
+       }
+
+       @Test
+       public void testDuplicateSlotOfferings() throws InterruptedException {
+               final NewSlotsService notifyNewSlots = new NewSlotsService();
+               final DefaultDeclarativeSlotPool slotPool = 
createDefaultDeclarativeSlotPool(notifyNewSlots);
+
+               final ResourceCounter resourceRequirements = 
createResourceRequirements();
+
+               slotPool.increaseResourceRequirementsBy(resourceRequirements);
+
+               final Collection<SlotOffer> slotOffers = 
createSlotOffersForResourceRequirements(resourceRequirements);
+
+               offerSlots(slotPool, slotOffers);
+
+               drainNewSlotService(notifyNewSlots);
+
+               final Collection<SlotOffer> acceptedSlots = 
offerSlots(slotPool, slotOffers);
+
+               assertThat(acceptedSlots, 
containsInAnyOrder(slotOffers.toArray()));
+               // duplicate slots should not trigger notify new slots
+               assertFalse(notifyNewSlots.hasNextNewSlots());
+       }
+
+       @Test
+       public void testOfferingTooManySlots() {
+               final NewSlotsService notifyNewSlots = new NewSlotsService();
+               final DefaultDeclarativeSlotPool slotPool = 
createDefaultDeclarativeSlotPool(notifyNewSlots);
+
+               final ResourceCounter resourceRequirements = 
createResourceRequirements();
+
+               slotPool.increaseResourceRequirementsBy(resourceRequirements);
+
+               final ResourceCounter increasedRequirements = 
resourceRequirements.add(RESOURCE_PROFILE_1, 2);
+
+               final Collection<SlotOffer> slotOffers = 
createSlotOffersForResourceRequirements(increasedRequirements);
+
+               final Collection<SlotOffer> acceptedSlots = 
offerSlots(slotPool, slotOffers);
+
+               final Map<ResourceProfile, Long> resourceProfileCount = 
acceptedSlots.stream().map(SlotOffer::getResourceProfile).collect(Collectors.groupingBy(Function.identity(),
 Collectors.counting()));
+
+               for (Map.Entry<ResourceProfile, Integer> resourceCount : 
resourceRequirements.getResourcesWithCount()) {
+                       
assertThat(resourceProfileCount.getOrDefault(resourceCount.getKey(), 0L), 
is((long) resourceCount.getValue()));
+               }
+       }
+
+       @Test
+       public void testReleaseSlotsRemovesSlots() throws InterruptedException {
+               final NewResourceRequirementsService 
notifyNewResourceRequirements = new NewResourceRequirementsService();
+               final DefaultDeclarativeSlotPool slotPool = 
createDefaultDeclarativeSlotPool(notifyNewResourceRequirements);
+
+               final LocalTaskManagerLocation taskManagerLocation = new 
LocalTaskManagerLocation();
+               increaseRequirementsAndOfferSlotsToSlotPool(slotPool, 
createResourceRequirements(), taskManagerLocation);
+
+               notifyNewResourceRequirements.takeResourceRequirements();
+
+               slotPool.releaseSlots(taskManagerLocation.getResourceID(), new 
FlinkException("Test failure"));
+               assertThat(slotPool.getAllSlotsInformation(), is(empty()));
+       }
+
+       @Test
+       public void testReleaseSlotsReturnsSlot() {
+               final DefaultDeclarativeSlotPool slotPool = 
DefaultDeclarativeSlotPoolBuilder.builder().build();
+
+               final ResourceCounter resourceRequirements = 
createResourceRequirements();
+
+               final LocalTaskManagerLocation taskManagerLocation = new 
LocalTaskManagerLocation();
+               final FreeSlotConsumer freeSlotConsumer = new 
FreeSlotConsumer();
+               final TestingTaskExecutorGateway testingTaskExecutorGateway = 
new TestingTaskExecutorGatewayBuilder()
+                       .setFreeSlotFunction(freeSlotConsumer)
+                       .createTestingTaskExecutorGateway();
+
+               final Collection<SlotOffer> slotOffers = 
increaseRequirementsAndOfferSlotsToSlotPool(
+                       slotPool,
+                       resourceRequirements,
+                       taskManagerLocation,
+                       testingTaskExecutorGateway);
+
+               slotPool.releaseSlots(taskManagerLocation.getResourceID(), new 
FlinkException("Test failure"));
+
+               final Collection<AllocationID> freedSlots = 
freeSlotConsumer.drainFreedSlots();
+
+               assertThat(freedSlots, 
containsInAnyOrder(slotOffers.stream().map(SlotOffer::getAllocationId).toArray()));
+       }
+
+       @Test
+       public void testFailSlotDecreasesResources() throws 
InterruptedException {

Review comment:
       Maybe better to call this test 
`testReleaseSlotDecreasesFulfilledResourceRequirements`.

##########
File path: 
flink-runtime/src/main/java/org/apache/flink/runtime/jobmaster/slotpool/DefaultDeclarativeSlotPool.java
##########
@@ -0,0 +1,428 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.runtime.jobmaster.slotpool;
+
+import org.apache.flink.annotation.VisibleForTesting;
+import org.apache.flink.api.common.time.Time;
+import org.apache.flink.runtime.clusterframework.types.AllocationID;
+import org.apache.flink.runtime.clusterframework.types.ResourceID;
+import org.apache.flink.runtime.clusterframework.types.ResourceProfile;
+import org.apache.flink.runtime.jobmanager.slots.TaskManagerGateway;
+import org.apache.flink.runtime.jobmaster.SlotInfo;
+import org.apache.flink.runtime.messages.Acknowledge;
+import org.apache.flink.runtime.slots.DefaultRequirementMatcher;
+import org.apache.flink.runtime.slots.RequirementMatcher;
+import org.apache.flink.runtime.slots.ResourceRequirement;
+import org.apache.flink.runtime.taskexecutor.slot.SlotOffer;
+import org.apache.flink.runtime.taskmanager.TaskManagerLocation;
+import org.apache.flink.util.FlinkException;
+import org.apache.flink.util.Preconditions;
+
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import javax.annotation.Nonnull;
+import javax.annotation.Nullable;
+
+import java.util.ArrayList;
+import java.util.Collection;
+import java.util.Collections;
+import java.util.HashMap;
+import java.util.Iterator;
+import java.util.Map;
+import java.util.Optional;
+import java.util.concurrent.CompletableFuture;
+import java.util.function.Consumer;
+import java.util.stream.Collectors;
+
+/**
+ * Default {@link DeclarativeSlotPool} implementation.
+ *
+ * <p>The implementation collects the current resource requirements and 
declares them
+ * at the ResourceManager. Whenever new slots are offered, the slot pool 
compares the
+ * offered slots to the set of available and required resources and only 
accepts those
+ * slots which are required.
+ *
+ * <p>Slots which are released won't be returned directly to their owners. 
Instead,
+ * the slot pool implementation will only return them after the 
idleSlotTimeout has
+ * been exceeded by a free slot.
+ *
+ * <p>The slot pool will call {@link #notifyNewSlots} whenever newly offered 
slots are
+ * accepted or if an allocated slot should become free after it is being
+ * {@link #freeReservedSlot released}.
+ */
+public class DefaultDeclarativeSlotPool implements DeclarativeSlotPool {
+
+       private static final Logger LOG = 
LoggerFactory.getLogger(DefaultDeclarativeSlotPool.class);
+
+       private final Consumer<? super Collection<ResourceRequirement>> 
notifyNewResourceRequirements;
+
+       private final Consumer<? super Collection<? extends PhysicalSlot>> 
notifyNewSlots;
+
+       private final Time idleSlotTimeout;
+       private final Time rpcTimeout;
+
+       private final AllocatedSlotPool slotPool;
+
+       private final Map<AllocationID, ResourceProfile> 
slotToRequirementProfileMappings;
+
+       private ResourceCounter totalResourceRequirements;
+
+       private ResourceCounter fulfilledResourceRequirements;
+
+       private final RequirementMatcher requirementMatcher = new 
DefaultRequirementMatcher();
+
+       public DefaultDeclarativeSlotPool(
+               AllocatedSlotPool slotPool,
+               Consumer<? super Collection<ResourceRequirement>> 
notifyNewResourceRequirements,
+               Consumer<? super Collection<? extends PhysicalSlot>> 
notifyNewSlots,
+               Time idleSlotTimeout,
+               Time rpcTimeout) {
+
+               this.slotPool = slotPool;
+               this.notifyNewResourceRequirements = 
notifyNewResourceRequirements;
+               this.notifyNewSlots = notifyNewSlots;
+               this.idleSlotTimeout = idleSlotTimeout;
+               this.rpcTimeout = rpcTimeout;
+               this.totalResourceRequirements = ResourceCounter.empty();
+               this.fulfilledResourceRequirements = ResourceCounter.empty();
+               this.slotToRequirementProfileMappings = new HashMap<>();
+       }
+
+       @Override
+       public void increaseResourceRequirementsBy(ResourceCounter increment) {
+               totalResourceRequirements = 
totalResourceRequirements.add(increment);
+
+               declareResourceRequirements();
+       }
+
+       @Override
+       public void decreaseResourceRequirementsBy(ResourceCounter decrement) {
+               totalResourceRequirements = 
totalResourceRequirements.subtract(decrement);
+
+               declareResourceRequirements();
+       }
+
+       private void declareResourceRequirements() {
+               notifyNewResourceRequirements.accept(getResourceRequirements());
+       }
+
+       @Override
+       public Collection<ResourceRequirement> getResourceRequirements() {
+               final Collection<ResourceRequirement> 
currentResourceRequirements = new ArrayList<>();
+
+               for (Map.Entry<ResourceProfile, Integer> resourceRequirement : 
totalResourceRequirements.getResourcesWithCount()) {
+                       
currentResourceRequirements.add(ResourceRequirement.create(resourceRequirement.getKey(),
 resourceRequirement.getValue()));
+               }
+
+               return currentResourceRequirements;
+       }
+
+       @Override
+       public Collection<SlotOffer> offerSlots(
+               Collection<? extends SlotOffer> offers,
+               TaskManagerLocation taskManagerLocation,
+               TaskManagerGateway taskManagerGateway,
+               long currentTime) {
+
+               LOG.debug("Received {} slot offers from TaskExecutor {}.", 
offers.size(), taskManagerLocation);
+               final Collection<SlotOffer> acceptedSlotOffers = new 
ArrayList<>();
+               final Collection<AllocatedSlot> acceptedSlots = new 
ArrayList<>();
+
+               for (SlotOffer offer : offers) {
+                       if (slotPool.containsSlot(offer.getAllocationId())) {
+                               // we have already accepted this offer
+                               acceptedSlotOffers.add(offer);
+                       } else {
+                               Optional<AllocatedSlot> acceptedSlot = 
matchOfferWithOutstandingRequirements(offer, taskManagerLocation, 
taskManagerGateway);
+                               if (acceptedSlot.isPresent()) {
+                                       acceptedSlotOffers.add(offer);
+                                       acceptedSlots.add(acceptedSlot.get());
+                               }
+                       }
+               }
+
+               slotPool.addSlots(acceptedSlots, currentTime);
+
+               if (!acceptedSlots.isEmpty()) {
+                       notifyNewSlots.accept(acceptedSlots);
+               }
+
+               return acceptedSlotOffers;
+       }
+
+       private Optional<AllocatedSlot> matchOfferWithOutstandingRequirements(
+                       SlotOffer slotOffer,
+                       TaskManagerLocation taskManagerLocation,
+                       TaskManagerGateway taskManagerGateway) {
+
+               final Optional<ResourceProfile> match = 
requirementMatcher.match(
+                               slotOffer.getResourceProfile(),
+                               
totalResourceRequirements.getResourcesWithCount(),
+                               
fulfilledResourceRequirements::getResourceCount);
+
+               if (match.isPresent()) {
+                       
increaseAvailableResources(ResourceCounter.withResource(match.get(), 1));
+
+                       final AllocatedSlot allocatedSlot = createAllocatedSlot(
+                                       slotOffer,
+                                       taskManagerLocation,
+                                       taskManagerGateway);
+
+                       // store the ResourceProfile against which the given 
slot has matched for future book-keeping
+                       
slotToRequirementProfileMappings.put(allocatedSlot.getAllocationId(), 
match.get());
+
+                       return Optional.of(allocatedSlot);
+               }
+               return Optional.empty();
+       }
+
+       @VisibleForTesting
+       ResourceCounter calculateUnfulfilledResources() {
+               return 
totalResourceRequirements.subtract(fulfilledResourceRequirements);
+       }
+
+       private AllocatedSlot createAllocatedSlot(
+               SlotOffer slotOffer,
+               TaskManagerLocation taskManagerLocation,
+               TaskManagerGateway taskManagerGateway) {
+               return new AllocatedSlot(
+                       slotOffer.getAllocationId(),
+                       taskManagerLocation,
+                       slotOffer.getSlotIndex(),
+                       slotOffer.getResourceProfile(),
+                       taskManagerGateway);
+       }
+
+       private void increaseAvailableResources(ResourceCounter 
acceptedResources) {
+               fulfilledResourceRequirements = 
fulfilledResourceRequirements.add(acceptedResources);
+       }
+
+       @Nonnull
+       private ResourceProfile getMatchingResourceProfile(AllocationID 
allocationId) {
+               return 
Preconditions.checkNotNull(slotToRequirementProfileMappings.get(allocationId), 
"No matching resource profile found for %s", allocationId);
+       }
+
+       @Override
+       public PhysicalSlot reserveFreeSlot(AllocationID allocationId, 
ResourceProfile requiredSlotProfile) {
+               final AllocatedSlot allocatedSlot = 
slotPool.reserveFreeSlot(allocationId);
+
+               
Preconditions.checkState(allocatedSlot.getResourceProfile().isMatching(requiredSlotProfile),
 "");
+
+               ResourceProfile previouslyMatchedResourceProfile = 
Preconditions.checkNotNull(slotToRequirementProfileMappings.get(allocationId));
+
+               if 
(!previouslyMatchedResourceProfile.equals(requiredSlotProfile)) {
+                       // slots can be reserved for a requirement that is not 
in line with the mapping we computed when the slot was
+                       // offered, so we have to update the mapping adjust the 
requirements accordingly to ensure we still request enough slots to
+                       // be able to fulfill the total requirements
+                       updateSlotToRequirementProfileMapping(allocationId, 
requiredSlotProfile);
+                       adjustRequirements(previouslyMatchedResourceProfile, 
requiredSlotProfile);
+               }
+
+               return allocatedSlot;
+       }
+
+       @Override
+       public ResourceCounter freeReservedSlot(AllocationID allocationId, 
@Nullable Throwable cause, long currentTime) {
+               LOG.debug("Release slot {}.", allocationId);
+
+               final Optional<AllocatedSlot> releasedSlot = 
slotPool.freeReservedSlot(allocationId, currentTime);

Review comment:
       ```suggestion
                final Optional<AllocatedSlot> freedSlot = 
slotPool.freeReservedSlot(allocationId, currentTime);
   ```

##########
File path: 
flink-runtime/src/main/java/org/apache/flink/runtime/jobmaster/slotpool/DefaultDeclarativeSlotPool.java
##########
@@ -0,0 +1,428 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.runtime.jobmaster.slotpool;
+
+import org.apache.flink.annotation.VisibleForTesting;
+import org.apache.flink.api.common.time.Time;
+import org.apache.flink.runtime.clusterframework.types.AllocationID;
+import org.apache.flink.runtime.clusterframework.types.ResourceID;
+import org.apache.flink.runtime.clusterframework.types.ResourceProfile;
+import org.apache.flink.runtime.jobmanager.slots.TaskManagerGateway;
+import org.apache.flink.runtime.jobmaster.SlotInfo;
+import org.apache.flink.runtime.messages.Acknowledge;
+import org.apache.flink.runtime.slots.DefaultRequirementMatcher;
+import org.apache.flink.runtime.slots.RequirementMatcher;
+import org.apache.flink.runtime.slots.ResourceRequirement;
+import org.apache.flink.runtime.taskexecutor.slot.SlotOffer;
+import org.apache.flink.runtime.taskmanager.TaskManagerLocation;
+import org.apache.flink.util.FlinkException;
+import org.apache.flink.util.Preconditions;
+
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import javax.annotation.Nonnull;
+import javax.annotation.Nullable;
+
+import java.util.ArrayList;
+import java.util.Collection;
+import java.util.Collections;
+import java.util.HashMap;
+import java.util.Iterator;
+import java.util.Map;
+import java.util.Optional;
+import java.util.concurrent.CompletableFuture;
+import java.util.function.Consumer;
+import java.util.stream.Collectors;
+
+/**
+ * Default {@link DeclarativeSlotPool} implementation.
+ *
+ * <p>The implementation collects the current resource requirements and 
declares them
+ * at the ResourceManager. Whenever new slots are offered, the slot pool 
compares the
+ * offered slots to the set of available and required resources and only 
accepts those
+ * slots which are required.
+ *
+ * <p>Slots which are released won't be returned directly to their owners. 
Instead,
+ * the slot pool implementation will only return them after the 
idleSlotTimeout has
+ * been exceeded by a free slot.
+ *
+ * <p>The slot pool will call {@link #notifyNewSlots} whenever newly offered 
slots are
+ * accepted or if an allocated slot should become free after it is being
+ * {@link #freeReservedSlot released}.
+ */
+public class DefaultDeclarativeSlotPool implements DeclarativeSlotPool {
+
+       private static final Logger LOG = 
LoggerFactory.getLogger(DefaultDeclarativeSlotPool.class);
+
+       private final Consumer<? super Collection<ResourceRequirement>> 
notifyNewResourceRequirements;
+
+       private final Consumer<? super Collection<? extends PhysicalSlot>> 
notifyNewSlots;
+
+       private final Time idleSlotTimeout;
+       private final Time rpcTimeout;
+
+       private final AllocatedSlotPool slotPool;
+
+       private final Map<AllocationID, ResourceProfile> 
slotToRequirementProfileMappings;
+
+       private ResourceCounter totalResourceRequirements;
+
+       private ResourceCounter fulfilledResourceRequirements;
+
+       private final RequirementMatcher requirementMatcher = new 
DefaultRequirementMatcher();
+
+       public DefaultDeclarativeSlotPool(
+               AllocatedSlotPool slotPool,
+               Consumer<? super Collection<ResourceRequirement>> 
notifyNewResourceRequirements,
+               Consumer<? super Collection<? extends PhysicalSlot>> 
notifyNewSlots,
+               Time idleSlotTimeout,
+               Time rpcTimeout) {
+
+               this.slotPool = slotPool;
+               this.notifyNewResourceRequirements = 
notifyNewResourceRequirements;
+               this.notifyNewSlots = notifyNewSlots;
+               this.idleSlotTimeout = idleSlotTimeout;
+               this.rpcTimeout = rpcTimeout;
+               this.totalResourceRequirements = ResourceCounter.empty();
+               this.fulfilledResourceRequirements = ResourceCounter.empty();
+               this.slotToRequirementProfileMappings = new HashMap<>();
+       }
+
+       @Override
+       public void increaseResourceRequirementsBy(ResourceCounter increment) {
+               totalResourceRequirements = 
totalResourceRequirements.add(increment);
+
+               declareResourceRequirements();
+       }
+
+       @Override
+       public void decreaseResourceRequirementsBy(ResourceCounter decrement) {
+               totalResourceRequirements = 
totalResourceRequirements.subtract(decrement);
+
+               declareResourceRequirements();
+       }
+
+       private void declareResourceRequirements() {
+               notifyNewResourceRequirements.accept(getResourceRequirements());
+       }
+
+       @Override
+       public Collection<ResourceRequirement> getResourceRequirements() {
+               final Collection<ResourceRequirement> 
currentResourceRequirements = new ArrayList<>();
+
+               for (Map.Entry<ResourceProfile, Integer> resourceRequirement : 
totalResourceRequirements.getResourcesWithCount()) {
+                       
currentResourceRequirements.add(ResourceRequirement.create(resourceRequirement.getKey(),
 resourceRequirement.getValue()));
+               }
+
+               return currentResourceRequirements;
+       }
+
+       @Override
+       public Collection<SlotOffer> offerSlots(
+               Collection<? extends SlotOffer> offers,
+               TaskManagerLocation taskManagerLocation,
+               TaskManagerGateway taskManagerGateway,
+               long currentTime) {
+
+               LOG.debug("Received {} slot offers from TaskExecutor {}.", 
offers.size(), taskManagerLocation);
+               final Collection<SlotOffer> acceptedSlotOffers = new 
ArrayList<>();
+               final Collection<AllocatedSlot> acceptedSlots = new 
ArrayList<>();
+
+               for (SlotOffer offer : offers) {
+                       if (slotPool.containsSlot(offer.getAllocationId())) {
+                               // we have already accepted this offer
+                               acceptedSlotOffers.add(offer);
+                       } else {
+                               Optional<AllocatedSlot> acceptedSlot = 
matchOfferWithOutstandingRequirements(offer, taskManagerLocation, 
taskManagerGateway);
+                               if (acceptedSlot.isPresent()) {
+                                       acceptedSlotOffers.add(offer);
+                                       acceptedSlots.add(acceptedSlot.get());
+                               }
+                       }
+               }
+
+               slotPool.addSlots(acceptedSlots, currentTime);
+
+               if (!acceptedSlots.isEmpty()) {
+                       notifyNewSlots.accept(acceptedSlots);
+               }
+
+               return acceptedSlotOffers;
+       }
+
+       private Optional<AllocatedSlot> matchOfferWithOutstandingRequirements(
+                       SlotOffer slotOffer,
+                       TaskManagerLocation taskManagerLocation,
+                       TaskManagerGateway taskManagerGateway) {
+
+               final Optional<ResourceProfile> match = 
requirementMatcher.match(
+                               slotOffer.getResourceProfile(),
+                               
totalResourceRequirements.getResourcesWithCount(),
+                               
fulfilledResourceRequirements::getResourceCount);
+
+               if (match.isPresent()) {
+                       
increaseAvailableResources(ResourceCounter.withResource(match.get(), 1));
+
+                       final AllocatedSlot allocatedSlot = createAllocatedSlot(
+                                       slotOffer,
+                                       taskManagerLocation,
+                                       taskManagerGateway);
+
+                       // store the ResourceProfile against which the given 
slot has matched for future book-keeping
+                       
slotToRequirementProfileMappings.put(allocatedSlot.getAllocationId(), 
match.get());
+
+                       return Optional.of(allocatedSlot);
+               }
+               return Optional.empty();
+       }
+
+       @VisibleForTesting
+       ResourceCounter calculateUnfulfilledResources() {
+               return 
totalResourceRequirements.subtract(fulfilledResourceRequirements);
+       }
+
+       private AllocatedSlot createAllocatedSlot(
+               SlotOffer slotOffer,
+               TaskManagerLocation taskManagerLocation,
+               TaskManagerGateway taskManagerGateway) {
+               return new AllocatedSlot(
+                       slotOffer.getAllocationId(),
+                       taskManagerLocation,
+                       slotOffer.getSlotIndex(),
+                       slotOffer.getResourceProfile(),
+                       taskManagerGateway);
+       }
+
+       private void increaseAvailableResources(ResourceCounter 
acceptedResources) {
+               fulfilledResourceRequirements = 
fulfilledResourceRequirements.add(acceptedResources);
+       }
+
+       @Nonnull
+       private ResourceProfile getMatchingResourceProfile(AllocationID 
allocationId) {
+               return 
Preconditions.checkNotNull(slotToRequirementProfileMappings.get(allocationId), 
"No matching resource profile found for %s", allocationId);
+       }
+
+       @Override
+       public PhysicalSlot reserveFreeSlot(AllocationID allocationId, 
ResourceProfile requiredSlotProfile) {
+               final AllocatedSlot allocatedSlot = 
slotPool.reserveFreeSlot(allocationId);
+
+               
Preconditions.checkState(allocatedSlot.getResourceProfile().isMatching(requiredSlotProfile),
 "");
+
+               ResourceProfile previouslyMatchedResourceProfile = 
Preconditions.checkNotNull(slotToRequirementProfileMappings.get(allocationId));
+
+               if 
(!previouslyMatchedResourceProfile.equals(requiredSlotProfile)) {
+                       // slots can be reserved for a requirement that is not 
in line with the mapping we computed when the slot was
+                       // offered, so we have to update the mapping adjust the 
requirements accordingly to ensure we still request enough slots to
+                       // be able to fulfill the total requirements
+                       updateSlotToRequirementProfileMapping(allocationId, 
requiredSlotProfile);
+                       adjustRequirements(previouslyMatchedResourceProfile, 
requiredSlotProfile);
+               }
+
+               return allocatedSlot;
+       }
+
+       @Override
+       public ResourceCounter freeReservedSlot(AllocationID allocationId, 
@Nullable Throwable cause, long currentTime) {
+               LOG.debug("Release slot {}.", allocationId);
+
+               final Optional<AllocatedSlot> releasedSlot = 
slotPool.freeReservedSlot(allocationId, currentTime);
+
+               Optional<ResourceCounter> previouslyFulfilledRequirement = 
releasedSlot.map(Collections::singleton).map(this::getFulfilledRequirements);
+
+               releasedSlot.ifPresent(allocatedSlot -> {
+                       releasePayload(Collections.singleton(allocatedSlot), 
cause);
+                       tryToFulfillResourceRequirement(allocatedSlot);
+                       
notifyNewSlots.accept(Collections.singletonList(allocatedSlot));
+               });
+
+               return 
previouslyFulfilledRequirement.orElseGet(ResourceCounter::empty);
+       }
+
+       private void tryToFulfillResourceRequirement(AllocatedSlot 
allocatedSlot) {
+               
matchOfferWithOutstandingRequirements(allocatedSlotToSlotOffer(allocatedSlot), 
allocatedSlot.getTaskManagerLocation(), allocatedSlot.getTaskManagerGateway());
+       }
+
+       private void updateSlotToRequirementProfileMapping(AllocationID 
allocationId, ResourceProfile matchedResourceProfile) {
+               final ResourceProfile oldResourceProfile = 
Preconditions.checkNotNull(slotToRequirementProfileMappings.put(allocationId, 
matchedResourceProfile), "Expected slot profile matching to be non-empty.");
+
+               fulfilledResourceRequirements = 
fulfilledResourceRequirements.add(matchedResourceProfile, 1);
+               fulfilledResourceRequirements = 
fulfilledResourceRequirements.subtract(oldResourceProfile, 1);
+       }
+
+       private void adjustRequirements(ResourceProfile oldResourceProfile, 
ResourceProfile newResourceProfile) {
+               // slots can be reserved for a requirement that is not in line 
with the mapping we computed when the slot was
+               // offered, so we have to adjust the requirements accordingly 
to ensure we still request enough slots to
+               // be able to fulfill the total requirements
+               
decreaseResourceRequirementsBy(ResourceCounter.withResource(newResourceProfile, 
1));
+               
increaseResourceRequirementsBy(ResourceCounter.withResource(oldResourceProfile, 
1));
+       }
+
+       @Nonnull
+       private SlotOffer allocatedSlotToSlotOffer(AllocatedSlot allocatedSlot) 
{
+               return new SlotOffer(allocatedSlot.getAllocationId(), 
allocatedSlot.getPhysicalSlotNumber(), allocatedSlot.getResourceProfile());
+       }
+
+       @Override
+       public ResourceCounter releaseSlots(ResourceID owner, Exception cause) {
+               final Collection<AllocatedSlot> removedSlots = 
slotPool.removeSlots(owner);
+
+               ResourceCounter previouslyFulfilledRequirements = 
getFulfilledRequirements(removedSlots);
+
+               releasePayload(removedSlots, cause);
+               releaseSlots(removedSlots, cause);
+
+               return previouslyFulfilledRequirements;
+       }
+
+       @Override
+       public ResourceCounter releaseSlot(AllocationID allocationId, Exception 
cause) {
+               final Optional<AllocatedSlot> removedSlot = 
slotPool.removeSlot(allocationId);
+
+               Optional<ResourceCounter> previouslyFulfilledRequirement = 
removedSlot.map(Collections::singleton).map(this::getFulfilledRequirements);
+
+               removedSlot.ifPresent(allocatedSlot -> {
+                       releasePayload(Collections.singleton(allocatedSlot), 
cause);
+                       releaseSlots(Collections.singleton(allocatedSlot), 
cause);
+               });
+
+               return 
previouslyFulfilledRequirement.orElseGet(ResourceCounter::empty);
+       }
+
+       private void releasePayload(Iterable<? extends AllocatedSlot> 
allocatedSlots, Throwable cause) {
+               for (AllocatedSlot allocatedSlot : allocatedSlots) {
+                       allocatedSlot.releasePayload(cause);
+               }
+       }
+
+       @Override
+       public void releaseIdleSlots(long currentTimeMillis) {
+               final Collection<AllocatedSlotPool.FreeSlotInfo> 
freeSlotsInformation = slotPool.getFreeSlotsInformation();
+
+               ResourceCounter excessResources = 
fulfilledResourceRequirements.subtract(totalResourceRequirements);
+
+               final Iterator<AllocatedSlotPool.FreeSlotInfo> freeSlotIterator 
= freeSlotsInformation.iterator();
+
+               final Collection<AllocatedSlot> slotsToReturnToOwner = new 
ArrayList<>();
+
+               while (!excessResources.isEmpty() && 
freeSlotIterator.hasNext()) {
+                       final AllocatedSlotPool.FreeSlotInfo idleSlot = 
freeSlotIterator.next();
+
+                       if (currentTimeMillis >= idleSlot.getFreeSince() + 
idleSlotTimeout.toMilliseconds()) {
+                               final ResourceProfile matchingProfile = 
getMatchingResourceProfile(idleSlot.getAllocationId());
+
+                               if 
(excessResources.containsResource(matchingProfile)) {
+                                       excessResources = 
excessResources.subtract(matchingProfile, 1);
+                                       final Optional<AllocatedSlot> 
removedSlot = slotPool.removeSlot(idleSlot.getAllocationId());
+
+                                       final AllocatedSlot allocatedSlot = 
removedSlot.orElseThrow(() -> new IllegalStateException(String.format("Could 
not find slot for allocation id %s.", idleSlot.getAllocationId())));
+                                       slotsToReturnToOwner.add(allocatedSlot);
+                               }
+                       }
+               }
+
+               releaseSlots(slotsToReturnToOwner, new 
FlinkException("Returning idle slots to their owners."));
+       }
+
+       private void releaseSlots(Iterable<AllocatedSlot> slotsToReturnToOwner, 
Throwable cause) {
+               for (AllocatedSlot slotToReturn : slotsToReturnToOwner) {
+                       Preconditions.checkState(!slotToReturn.isUsed(), "Free 
slot must not be used.");
+
+                       LOG.info("Releasing slot [{}].", 
slotToReturn.getAllocationId());

Review comment:
       maybe log on debug the `cause`.

##########
File path: 
flink-runtime/src/test/java/org/apache/flink/runtime/jobmaster/slotpool/DefaultDeclarativeSlotPoolTest.java
##########
@@ -0,0 +1,550 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.runtime.jobmaster.slotpool;
+
+import org.apache.flink.api.common.time.Time;
+import org.apache.flink.runtime.clusterframework.types.AllocationID;
+import org.apache.flink.runtime.clusterframework.types.ResourceProfile;
+import org.apache.flink.runtime.jobmanager.slots.TaskManagerGateway;
+import org.apache.flink.runtime.jobmaster.JobMasterId;
+import org.apache.flink.runtime.jobmaster.RpcTaskManagerGateway;
+import org.apache.flink.runtime.messages.Acknowledge;
+import org.apache.flink.runtime.slots.ResourceRequirement;
+import org.apache.flink.runtime.taskexecutor.TaskExecutorGateway;
+import org.apache.flink.runtime.taskexecutor.TestingTaskExecutorGateway;
+import org.apache.flink.runtime.taskexecutor.TestingTaskExecutorGatewayBuilder;
+import org.apache.flink.runtime.taskexecutor.slot.SlotOffer;
+import org.apache.flink.runtime.taskmanager.LocalTaskManagerLocation;
+import org.apache.flink.util.FlinkException;
+import org.apache.flink.util.TestLogger;
+
+import org.apache.flink.shaded.guava18.com.google.common.collect.Iterables;
+
+import org.hamcrest.Description;
+import org.hamcrest.TypeSafeMatcher;
+import org.junit.Test;
+
+import javax.annotation.Nonnull;
+import javax.annotation.Nullable;
+
+import java.util.ArrayList;
+import java.util.Collection;
+import java.util.HashMap;
+import java.util.Map;
+import java.util.concurrent.ArrayBlockingQueue;
+import java.util.concurrent.BlockingQueue;
+import java.util.concurrent.CompletableFuture;
+import java.util.function.BiFunction;
+import java.util.function.Consumer;
+import java.util.function.Function;
+import java.util.stream.Collectors;
+
+import static org.hamcrest.CoreMatchers.hasItems;
+import static org.hamcrest.CoreMatchers.is;
+import static org.hamcrest.CoreMatchers.not;
+import static org.hamcrest.CoreMatchers.sameInstance;
+import static org.hamcrest.Matchers.containsInAnyOrder;
+import static org.hamcrest.Matchers.empty;
+import static org.junit.Assert.assertFalse;
+import static org.junit.Assert.assertThat;
+import static org.junit.Assert.assertTrue;
+
+/**
+ * Tests for the {@link DefaultDeclarativeSlotPool}.
+ */
+public class DefaultDeclarativeSlotPoolTest extends TestLogger {
+
+       private static final ResourceProfile RESOURCE_PROFILE_1 = 
ResourceProfile.newBuilder().setCpuCores(1.7).build();
+       private static final ResourceProfile RESOURCE_PROFILE_2 = 
ResourceProfile.newBuilder().setManagedMemoryMB(100).build();
+
+       @Test
+       public void 
testIncreasingResourceRequirementsWillSendResourceRequirementNotification() 
throws InterruptedException {
+               final NewResourceRequirementsService requirementsListener = new 
NewResourceRequirementsService();
+               final DeclarativeSlotPool slotPool = 
createDefaultDeclarativeSlotPool(requirementsListener);
+
+               final ResourceCounter increment1 = 
ResourceCounter.withResource(RESOURCE_PROFILE_1, 1);
+               final ResourceCounter increment2 = createResourceRequirements();
+               slotPool.increaseResourceRequirementsBy(increment1);
+               slotPool.increaseResourceRequirementsBy(increment2);
+
+               assertThat(requirementsListener.takeResourceRequirements(), 
is(toResourceRequirements(increment1)));
+
+               final ResourceCounter totalResources = 
increment1.add(increment2);
+               assertThat(requirementsListener.takeResourceRequirements(), 
is(toResourceRequirements(totalResources)));
+               assertThat(requirementsListener.hasNextResourceRequirements(), 
is(false));
+       }
+
+       @Test
+       public void 
testDecreasingResourceRequirementsWillSendResourceRequirementNotification() 
throws InterruptedException {
+               final NewResourceRequirementsService requirementsListener = new 
NewResourceRequirementsService();
+               final DefaultDeclarativeSlotPool slotPool = 
createDefaultDeclarativeSlotPool(requirementsListener);
+
+               final ResourceCounter increment = 
ResourceCounter.withResource(RESOURCE_PROFILE_1, 3);
+               slotPool.increaseResourceRequirementsBy(increment);
+
+               requirementsListener.takeResourceRequirements();
+
+               final ResourceCounter decrement = 
ResourceCounter.withResource(RESOURCE_PROFILE_1, 2);
+               slotPool.decreaseResourceRequirementsBy(decrement);
+
+               final ResourceCounter totalResources = 
increment.subtract(decrement);
+               assertThat(requirementsListener.takeResourceRequirements(), 
is(toResourceRequirements(totalResources)));
+               assertThat(requirementsListener.hasNextResourceRequirements(), 
is(false));
+       }
+
+       @Test
+       public void testGetResourceRequirements() {
+               final DefaultDeclarativeSlotPool slotPool = 
DefaultDeclarativeSlotPoolBuilder.builder().build();
+
+               assertThat(slotPool.getResourceRequirements(), 
is(toResourceRequirements(ResourceCounter.empty())));
+
+               final ResourceCounter resourceRequirements = 
createResourceRequirements();
+
+               slotPool.increaseResourceRequirementsBy(resourceRequirements);
+
+               assertThat(slotPool.getResourceRequirements(), 
is(toResourceRequirements(resourceRequirements)));
+       }
+
+       @Test
+       public void testOfferSlots() throws InterruptedException {
+               final NewSlotsService notifyNewSlots = new NewSlotsService();
+               final DefaultDeclarativeSlotPool slotPool = 
createDefaultDeclarativeSlotPool(notifyNewSlots);
+
+               final ResourceCounter resourceRequirements = 
createResourceRequirements();
+
+               slotPool.increaseResourceRequirementsBy(resourceRequirements);
+
+               Collection<SlotOffer> slotOffers = 
createSlotOffersForResourceRequirements(resourceRequirements);
+
+               final Collection<SlotOffer> acceptedSlots = 
offerSlots(slotPool, slotOffers);
+
+               assertThat(acceptedSlots, 
containsInAnyOrder(slotOffers.toArray()));
+
+               final Collection<PhysicalSlot> newSlots = 
drainNewSlotService(notifyNewSlots);
+
+               assertThat(newSlots, 
containsInAnyOrder(slotOffers.stream().map(DefaultDeclarativeSlotPoolTest::matchesSlotOffer).collect(Collectors.toList())));
+               assertThat(slotPool.getAllSlotsInformation(), 
containsInAnyOrder(newSlots.stream().map(DefaultAllocatedSlotPoolTest::matchesPhysicalSlot).collect(Collectors.toList())));
+       }
+
+       @Test
+       public void testDuplicateSlotOfferings() throws InterruptedException {
+               final NewSlotsService notifyNewSlots = new NewSlotsService();
+               final DefaultDeclarativeSlotPool slotPool = 
createDefaultDeclarativeSlotPool(notifyNewSlots);
+
+               final ResourceCounter resourceRequirements = 
createResourceRequirements();
+
+               slotPool.increaseResourceRequirementsBy(resourceRequirements);
+
+               final Collection<SlotOffer> slotOffers = 
createSlotOffersForResourceRequirements(resourceRequirements);
+
+               offerSlots(slotPool, slotOffers);
+
+               drainNewSlotService(notifyNewSlots);
+
+               final Collection<SlotOffer> acceptedSlots = 
offerSlots(slotPool, slotOffers);
+
+               assertThat(acceptedSlots, 
containsInAnyOrder(slotOffers.toArray()));
+               // duplicate slots should not trigger notify new slots
+               assertFalse(notifyNewSlots.hasNextNewSlots());
+       }
+
+       @Test
+       public void testOfferingTooManySlots() {
+               final NewSlotsService notifyNewSlots = new NewSlotsService();
+               final DefaultDeclarativeSlotPool slotPool = 
createDefaultDeclarativeSlotPool(notifyNewSlots);
+
+               final ResourceCounter resourceRequirements = 
createResourceRequirements();
+
+               slotPool.increaseResourceRequirementsBy(resourceRequirements);
+
+               final ResourceCounter increasedRequirements = 
resourceRequirements.add(RESOURCE_PROFILE_1, 2);
+
+               final Collection<SlotOffer> slotOffers = 
createSlotOffersForResourceRequirements(increasedRequirements);
+
+               final Collection<SlotOffer> acceptedSlots = 
offerSlots(slotPool, slotOffers);
+
+               final Map<ResourceProfile, Long> resourceProfileCount = 
acceptedSlots.stream().map(SlotOffer::getResourceProfile).collect(Collectors.groupingBy(Function.identity(),
 Collectors.counting()));
+
+               for (Map.Entry<ResourceProfile, Integer> resourceCount : 
resourceRequirements.getResourcesWithCount()) {
+                       
assertThat(resourceProfileCount.getOrDefault(resourceCount.getKey(), 0L), 
is((long) resourceCount.getValue()));
+               }
+       }
+
+       @Test
+       public void testReleaseSlotsRemovesSlots() throws InterruptedException {
+               final NewResourceRequirementsService 
notifyNewResourceRequirements = new NewResourceRequirementsService();
+               final DefaultDeclarativeSlotPool slotPool = 
createDefaultDeclarativeSlotPool(notifyNewResourceRequirements);
+
+               final LocalTaskManagerLocation taskManagerLocation = new 
LocalTaskManagerLocation();
+               increaseRequirementsAndOfferSlotsToSlotPool(slotPool, 
createResourceRequirements(), taskManagerLocation);
+
+               notifyNewResourceRequirements.takeResourceRequirements();
+
+               slotPool.releaseSlots(taskManagerLocation.getResourceID(), new 
FlinkException("Test failure"));
+               assertThat(slotPool.getAllSlotsInformation(), is(empty()));
+       }
+
+       @Test
+       public void testReleaseSlotsReturnsSlot() {
+               final DefaultDeclarativeSlotPool slotPool = 
DefaultDeclarativeSlotPoolBuilder.builder().build();
+
+               final ResourceCounter resourceRequirements = 
createResourceRequirements();
+
+               final LocalTaskManagerLocation taskManagerLocation = new 
LocalTaskManagerLocation();
+               final FreeSlotConsumer freeSlotConsumer = new 
FreeSlotConsumer();
+               final TestingTaskExecutorGateway testingTaskExecutorGateway = 
new TestingTaskExecutorGatewayBuilder()
+                       .setFreeSlotFunction(freeSlotConsumer)
+                       .createTestingTaskExecutorGateway();
+
+               final Collection<SlotOffer> slotOffers = 
increaseRequirementsAndOfferSlotsToSlotPool(
+                       slotPool,
+                       resourceRequirements,
+                       taskManagerLocation,
+                       testingTaskExecutorGateway);
+
+               slotPool.releaseSlots(taskManagerLocation.getResourceID(), new 
FlinkException("Test failure"));
+
+               final Collection<AllocationID> freedSlots = 
freeSlotConsumer.drainFreedSlots();
+
+               assertThat(freedSlots, 
containsInAnyOrder(slotOffers.stream().map(SlotOffer::getAllocationId).toArray()));
+       }
+
+       @Test
+       public void testFailSlotDecreasesResources() throws 
InterruptedException {
+               final NewSlotsService notifyNewSlots = new NewSlotsService();
+               final DefaultDeclarativeSlotPool slotPool = 
createDefaultDeclarativeSlotPool(notifyNewSlots);
+
+               final ResourceCounter resourceRequirements = 
createResourceRequirements();
+               increaseRequirementsAndOfferSlotsToSlotPool(slotPool, 
resourceRequirements, null);
+
+               final Collection<? extends PhysicalSlot> physicalSlots = 
notifyNewSlots.takeNewSlots();
+
+               final PhysicalSlot physicalSlot = 
physicalSlots.iterator().next();
+
+               slotPool.releaseSlot(physicalSlot.getAllocationId(), new 
FlinkException("Test failure"));
+
+               final ResourceCounter finalResourceRequirements = 
resourceRequirements.subtract(physicalSlot.getResourceProfile(), 1);
+               assertThat(slotPool.getFulfilledResourceRequirements(), 
is(finalResourceRequirements));
+       }
+
+       @Test
+       public void testFailSlotReturnsSlot() throws InterruptedException {
+               final NewSlotsService notifyNewSlots = new NewSlotsService();
+               final DefaultDeclarativeSlotPool slotPool = 
createDefaultDeclarativeSlotPool(notifyNewSlots);
+
+               final ResourceCounter resourceRequirements = 
createResourceRequirements();
+               final FreeSlotConsumer freeSlotConsumer = new 
FreeSlotConsumer();
+               final TestingTaskExecutorGateway testingTaskExecutorGateway = 
new TestingTaskExecutorGatewayBuilder()
+                       .setFreeSlotFunction(freeSlotConsumer)
+                       .createTestingTaskExecutorGateway();
+
+               increaseRequirementsAndOfferSlotsToSlotPool(
+                       slotPool,
+                       resourceRequirements,
+                       new LocalTaskManagerLocation(),
+                       testingTaskExecutorGateway);
+
+               final Collection<? extends PhysicalSlot> physicalSlots = 
notifyNewSlots.takeNewSlots();
+
+               final PhysicalSlot physicalSlot = 
physicalSlots.iterator().next();
+
+               slotPool.releaseSlot(physicalSlot.getAllocationId(), new 
FlinkException("Test failure"));
+
+               final AllocationID freedSlot = 
Iterables.getOnlyElement(freeSlotConsumer.drainFreedSlots());
+
+               assertThat(freedSlot, is(physicalSlot.getAllocationId()));
+       }
+
+       @Test
+       public void testReturnIdleSlotsAfterTimeout() {
+               final Time idleSlotTimeout = Time.seconds(10);
+               final long offerTime = 0;
+               final DefaultDeclarativeSlotPool slotPool = 
DefaultDeclarativeSlotPoolBuilder.builder()
+                       .setIdleSlotTimeout(idleSlotTimeout)
+                       .build();
+
+               final ResourceCounter resourceRequirements = 
createResourceRequirements();
+               final FreeSlotConsumer freeSlotConsumer = new 
FreeSlotConsumer();
+               final TestingTaskExecutorGateway testingTaskExecutorGateway = 
new TestingTaskExecutorGatewayBuilder()
+                       .setFreeSlotFunction(freeSlotConsumer)
+                       .createTestingTaskExecutorGateway();
+
+               final Collection<SlotOffer> acceptedSlots = 
increaseRequirementsAndOfferSlotsToSlotPool(
+                       slotPool,
+                       resourceRequirements,
+                       new LocalTaskManagerLocation(),
+                       testingTaskExecutorGateway);
+
+               // decrease the resource requirements so that slots are no 
longer needed
+               slotPool.decreaseResourceRequirementsBy(resourceRequirements);
+
+               slotPool.releaseIdleSlots(offerTime + 
idleSlotTimeout.toMilliseconds());
+
+               final Collection<AllocationID> freedSlots = 
freeSlotConsumer.drainFreedSlots();
+
+               assertThat(acceptedSlots, is(not(empty())));
+               assertThat(freedSlots, 
containsInAnyOrder(acceptedSlots.stream().map(SlotOffer::getAllocationId).toArray()));
+               assertNoAvailableAndRequiredResources(slotPool);
+       }
+
+       private void 
assertNoAvailableAndRequiredResources(DefaultDeclarativeSlotPool slotPool) {
+               
assertTrue(slotPool.getFulfilledResourceRequirements().isEmpty());
+               assertTrue(slotPool.getResourceRequirements().isEmpty());
+               assertThat(slotPool.getAllSlotsInformation(), is(empty()));
+       }
+
+       @Test
+       public void testOnlyReturnExcessIdleSlots() {
+               final Time idleSlotTimeout = Time.seconds(10);
+               final long offerTime = 0;
+               final DefaultDeclarativeSlotPool slotPool = 
DefaultDeclarativeSlotPoolBuilder.builder()
+                       .setIdleSlotTimeout(idleSlotTimeout)
+                       .build();
+
+               final ResourceCounter resourceRequirements = 
createResourceRequirements();
+               final Collection<SlotOffer> slotOffers = 
createSlotOffersForResourceRequirements(resourceRequirements);
+
+               slotPool.increaseResourceRequirementsBy(resourceRequirements);
+               final Collection<SlotOffer> acceptedSlots = 
offerSlots(slotPool, slotOffers);
+
+               final ResourceCounter requiredResources = 
ResourceCounter.withResource(RESOURCE_PROFILE_1, 1);
+               final ResourceCounter excessRequirements = 
resourceRequirements.subtract(requiredResources);
+               slotPool.decreaseResourceRequirementsBy(excessRequirements);
+
+               slotPool.releaseIdleSlots(offerTime + 
idleSlotTimeout.toMilliseconds());
+
+               assertThat(acceptedSlots, is(not(empty())));
+               assertThat(slotPool.getFulfilledResourceRequirements(), 
is(requiredResources));
+       }
+
+       @Test
+       public void 
testReleasedSlotWillBeUsedToFulfillOutstandingResourceRequirements() throws 
InterruptedException {

Review comment:
       ```suggestion
        public void 
testFreedSlotWillBeUsedToFulfillOutstandingResourceRequirements() throws 
InterruptedException {
   ```




----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
[email protected]


Reply via email to