tillrohrmann commented on a change in pull request #13722: URL: https://github.com/apache/flink/pull/13722#discussion_r524345947
########## File path: flink-runtime/src/main/java/org/apache/flink/runtime/jobmaster/slotpool/DefaultDeclarativeSlotPool.java ########## @@ -0,0 +1,428 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.flink.runtime.jobmaster.slotpool; + +import org.apache.flink.annotation.VisibleForTesting; +import org.apache.flink.api.common.time.Time; +import org.apache.flink.runtime.clusterframework.types.AllocationID; +import org.apache.flink.runtime.clusterframework.types.ResourceID; +import org.apache.flink.runtime.clusterframework.types.ResourceProfile; +import org.apache.flink.runtime.jobmanager.slots.TaskManagerGateway; +import org.apache.flink.runtime.jobmaster.SlotInfo; +import org.apache.flink.runtime.messages.Acknowledge; +import org.apache.flink.runtime.slots.DefaultRequirementMatcher; +import org.apache.flink.runtime.slots.RequirementMatcher; +import org.apache.flink.runtime.slots.ResourceRequirement; +import org.apache.flink.runtime.taskexecutor.slot.SlotOffer; +import org.apache.flink.runtime.taskmanager.TaskManagerLocation; +import org.apache.flink.util.FlinkException; +import org.apache.flink.util.Preconditions; + +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +import javax.annotation.Nonnull; +import javax.annotation.Nullable; + +import java.util.ArrayList; +import java.util.Collection; +import java.util.Collections; +import java.util.HashMap; +import java.util.Iterator; +import java.util.Map; +import java.util.Optional; +import java.util.concurrent.CompletableFuture; +import java.util.function.Consumer; +import java.util.stream.Collectors; + +/** + * Default {@link DeclarativeSlotPool} implementation. + * + * <p>The implementation collects the current resource requirements and declares them + * at the ResourceManager. Whenever new slots are offered, the slot pool compares the + * offered slots to the set of available and required resources and only accepts those + * slots which are required. + * + * <p>Slots which are released won't be returned directly to their owners. Instead, + * the slot pool implementation will only return them after the idleSlotTimeout has + * been exceeded by a free slot. + * + * <p>The slot pool will call {@link #notifyNewSlots} whenever newly offered slots are + * accepted or if an allocated slot should become free after it is being + * {@link #freeReservedSlot released}. Review comment: ```suggestion * {@link #freeReservedSlot freed}. ``` bc release is used in the API and means that we remove the slot from the pool. ########## File path: flink-runtime/src/main/java/org/apache/flink/runtime/jobmaster/slotpool/DefaultDeclarativeSlotPool.java ########## @@ -0,0 +1,428 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.flink.runtime.jobmaster.slotpool; + +import org.apache.flink.annotation.VisibleForTesting; +import org.apache.flink.api.common.time.Time; +import org.apache.flink.runtime.clusterframework.types.AllocationID; +import org.apache.flink.runtime.clusterframework.types.ResourceID; +import org.apache.flink.runtime.clusterframework.types.ResourceProfile; +import org.apache.flink.runtime.jobmanager.slots.TaskManagerGateway; +import org.apache.flink.runtime.jobmaster.SlotInfo; +import org.apache.flink.runtime.messages.Acknowledge; +import org.apache.flink.runtime.slots.DefaultRequirementMatcher; +import org.apache.flink.runtime.slots.RequirementMatcher; +import org.apache.flink.runtime.slots.ResourceRequirement; +import org.apache.flink.runtime.taskexecutor.slot.SlotOffer; +import org.apache.flink.runtime.taskmanager.TaskManagerLocation; +import org.apache.flink.util.FlinkException; +import org.apache.flink.util.Preconditions; + +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +import javax.annotation.Nonnull; +import javax.annotation.Nullable; + +import java.util.ArrayList; +import java.util.Collection; +import java.util.Collections; +import java.util.HashMap; +import java.util.Iterator; +import java.util.Map; +import java.util.Optional; +import java.util.concurrent.CompletableFuture; +import java.util.function.Consumer; +import java.util.stream.Collectors; + +/** + * Default {@link DeclarativeSlotPool} implementation. + * + * <p>The implementation collects the current resource requirements and declares them + * at the ResourceManager. Whenever new slots are offered, the slot pool compares the + * offered slots to the set of available and required resources and only accepts those + * slots which are required. + * + * <p>Slots which are released won't be returned directly to their owners. Instead, + * the slot pool implementation will only return them after the idleSlotTimeout has + * been exceeded by a free slot. + * + * <p>The slot pool will call {@link #notifyNewSlots} whenever newly offered slots are + * accepted or if an allocated slot should become free after it is being + * {@link #freeReservedSlot released}. + */ +public class DefaultDeclarativeSlotPool implements DeclarativeSlotPool { + + private static final Logger LOG = LoggerFactory.getLogger(DefaultDeclarativeSlotPool.class); + + private final Consumer<? super Collection<ResourceRequirement>> notifyNewResourceRequirements; + + private final Consumer<? super Collection<? extends PhysicalSlot>> notifyNewSlots; + + private final Time idleSlotTimeout; + private final Time rpcTimeout; + + private final AllocatedSlotPool slotPool; + + private final Map<AllocationID, ResourceProfile> slotToRequirementProfileMappings; + + private ResourceCounter totalResourceRequirements; + + private ResourceCounter fulfilledResourceRequirements; + + private final RequirementMatcher requirementMatcher = new DefaultRequirementMatcher(); + + public DefaultDeclarativeSlotPool( + AllocatedSlotPool slotPool, + Consumer<? super Collection<ResourceRequirement>> notifyNewResourceRequirements, + Consumer<? super Collection<? extends PhysicalSlot>> notifyNewSlots, + Time idleSlotTimeout, + Time rpcTimeout) { + + this.slotPool = slotPool; + this.notifyNewResourceRequirements = notifyNewResourceRequirements; + this.notifyNewSlots = notifyNewSlots; + this.idleSlotTimeout = idleSlotTimeout; + this.rpcTimeout = rpcTimeout; + this.totalResourceRequirements = ResourceCounter.empty(); + this.fulfilledResourceRequirements = ResourceCounter.empty(); + this.slotToRequirementProfileMappings = new HashMap<>(); + } + + @Override + public void increaseResourceRequirementsBy(ResourceCounter increment) { + totalResourceRequirements = totalResourceRequirements.add(increment); + + declareResourceRequirements(); + } + + @Override + public void decreaseResourceRequirementsBy(ResourceCounter decrement) { + totalResourceRequirements = totalResourceRequirements.subtract(decrement); + + declareResourceRequirements(); + } + + private void declareResourceRequirements() { + notifyNewResourceRequirements.accept(getResourceRequirements()); + } + + @Override + public Collection<ResourceRequirement> getResourceRequirements() { + final Collection<ResourceRequirement> currentResourceRequirements = new ArrayList<>(); + + for (Map.Entry<ResourceProfile, Integer> resourceRequirement : totalResourceRequirements.getResourcesWithCount()) { + currentResourceRequirements.add(ResourceRequirement.create(resourceRequirement.getKey(), resourceRequirement.getValue())); + } + + return currentResourceRequirements; + } + + @Override + public Collection<SlotOffer> offerSlots( + Collection<? extends SlotOffer> offers, + TaskManagerLocation taskManagerLocation, + TaskManagerGateway taskManagerGateway, + long currentTime) { + + LOG.debug("Received {} slot offers from TaskExecutor {}.", offers.size(), taskManagerLocation); + final Collection<SlotOffer> acceptedSlotOffers = new ArrayList<>(); + final Collection<AllocatedSlot> acceptedSlots = new ArrayList<>(); + + for (SlotOffer offer : offers) { + if (slotPool.containsSlot(offer.getAllocationId())) { + // we have already accepted this offer + acceptedSlotOffers.add(offer); + } else { + Optional<AllocatedSlot> acceptedSlot = matchOfferWithOutstandingRequirements(offer, taskManagerLocation, taskManagerGateway); + if (acceptedSlot.isPresent()) { + acceptedSlotOffers.add(offer); Review comment: We could log on debug which offers are matching with which requirements. ########## File path: flink-runtime/src/main/java/org/apache/flink/runtime/jobmaster/slotpool/DefaultDeclarativeSlotPool.java ########## @@ -0,0 +1,428 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.flink.runtime.jobmaster.slotpool; + +import org.apache.flink.annotation.VisibleForTesting; +import org.apache.flink.api.common.time.Time; +import org.apache.flink.runtime.clusterframework.types.AllocationID; +import org.apache.flink.runtime.clusterframework.types.ResourceID; +import org.apache.flink.runtime.clusterframework.types.ResourceProfile; +import org.apache.flink.runtime.jobmanager.slots.TaskManagerGateway; +import org.apache.flink.runtime.jobmaster.SlotInfo; +import org.apache.flink.runtime.messages.Acknowledge; +import org.apache.flink.runtime.slots.DefaultRequirementMatcher; +import org.apache.flink.runtime.slots.RequirementMatcher; +import org.apache.flink.runtime.slots.ResourceRequirement; +import org.apache.flink.runtime.taskexecutor.slot.SlotOffer; +import org.apache.flink.runtime.taskmanager.TaskManagerLocation; +import org.apache.flink.util.FlinkException; +import org.apache.flink.util.Preconditions; + +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +import javax.annotation.Nonnull; +import javax.annotation.Nullable; + +import java.util.ArrayList; +import java.util.Collection; +import java.util.Collections; +import java.util.HashMap; +import java.util.Iterator; +import java.util.Map; +import java.util.Optional; +import java.util.concurrent.CompletableFuture; +import java.util.function.Consumer; +import java.util.stream.Collectors; + +/** + * Default {@link DeclarativeSlotPool} implementation. + * + * <p>The implementation collects the current resource requirements and declares them + * at the ResourceManager. Whenever new slots are offered, the slot pool compares the + * offered slots to the set of available and required resources and only accepts those + * slots which are required. + * + * <p>Slots which are released won't be returned directly to their owners. Instead, + * the slot pool implementation will only return them after the idleSlotTimeout has + * been exceeded by a free slot. + * + * <p>The slot pool will call {@link #notifyNewSlots} whenever newly offered slots are + * accepted or if an allocated slot should become free after it is being + * {@link #freeReservedSlot released}. + */ +public class DefaultDeclarativeSlotPool implements DeclarativeSlotPool { + + private static final Logger LOG = LoggerFactory.getLogger(DefaultDeclarativeSlotPool.class); + + private final Consumer<? super Collection<ResourceRequirement>> notifyNewResourceRequirements; + + private final Consumer<? super Collection<? extends PhysicalSlot>> notifyNewSlots; + + private final Time idleSlotTimeout; + private final Time rpcTimeout; + + private final AllocatedSlotPool slotPool; + + private final Map<AllocationID, ResourceProfile> slotToRequirementProfileMappings; + + private ResourceCounter totalResourceRequirements; + + private ResourceCounter fulfilledResourceRequirements; + + private final RequirementMatcher requirementMatcher = new DefaultRequirementMatcher(); + + public DefaultDeclarativeSlotPool( + AllocatedSlotPool slotPool, + Consumer<? super Collection<ResourceRequirement>> notifyNewResourceRequirements, + Consumer<? super Collection<? extends PhysicalSlot>> notifyNewSlots, + Time idleSlotTimeout, + Time rpcTimeout) { + + this.slotPool = slotPool; + this.notifyNewResourceRequirements = notifyNewResourceRequirements; + this.notifyNewSlots = notifyNewSlots; + this.idleSlotTimeout = idleSlotTimeout; + this.rpcTimeout = rpcTimeout; + this.totalResourceRequirements = ResourceCounter.empty(); + this.fulfilledResourceRequirements = ResourceCounter.empty(); + this.slotToRequirementProfileMappings = new HashMap<>(); + } + + @Override + public void increaseResourceRequirementsBy(ResourceCounter increment) { + totalResourceRequirements = totalResourceRequirements.add(increment); + + declareResourceRequirements(); + } + + @Override + public void decreaseResourceRequirementsBy(ResourceCounter decrement) { + totalResourceRequirements = totalResourceRequirements.subtract(decrement); + + declareResourceRequirements(); + } + + private void declareResourceRequirements() { + notifyNewResourceRequirements.accept(getResourceRequirements()); + } + + @Override + public Collection<ResourceRequirement> getResourceRequirements() { + final Collection<ResourceRequirement> currentResourceRequirements = new ArrayList<>(); + + for (Map.Entry<ResourceProfile, Integer> resourceRequirement : totalResourceRequirements.getResourcesWithCount()) { + currentResourceRequirements.add(ResourceRequirement.create(resourceRequirement.getKey(), resourceRequirement.getValue())); + } + + return currentResourceRequirements; + } + + @Override + public Collection<SlotOffer> offerSlots( + Collection<? extends SlotOffer> offers, + TaskManagerLocation taskManagerLocation, + TaskManagerGateway taskManagerGateway, + long currentTime) { + + LOG.debug("Received {} slot offers from TaskExecutor {}.", offers.size(), taskManagerLocation); + final Collection<SlotOffer> acceptedSlotOffers = new ArrayList<>(); + final Collection<AllocatedSlot> acceptedSlots = new ArrayList<>(); + + for (SlotOffer offer : offers) { + if (slotPool.containsSlot(offer.getAllocationId())) { + // we have already accepted this offer + acceptedSlotOffers.add(offer); + } else { + Optional<AllocatedSlot> acceptedSlot = matchOfferWithOutstandingRequirements(offer, taskManagerLocation, taskManagerGateway); + if (acceptedSlot.isPresent()) { + acceptedSlotOffers.add(offer); + acceptedSlots.add(acceptedSlot.get()); + } + } + } + + slotPool.addSlots(acceptedSlots, currentTime); + + if (!acceptedSlots.isEmpty()) { + notifyNewSlots.accept(acceptedSlots); + } + + return acceptedSlotOffers; + } + + private Optional<AllocatedSlot> matchOfferWithOutstandingRequirements( + SlotOffer slotOffer, + TaskManagerLocation taskManagerLocation, + TaskManagerGateway taskManagerGateway) { + + final Optional<ResourceProfile> match = requirementMatcher.match( + slotOffer.getResourceProfile(), + totalResourceRequirements.getResourcesWithCount(), + fulfilledResourceRequirements::getResourceCount); + + if (match.isPresent()) { + increaseAvailableResources(ResourceCounter.withResource(match.get(), 1)); + + final AllocatedSlot allocatedSlot = createAllocatedSlot( + slotOffer, + taskManagerLocation, + taskManagerGateway); + + // store the ResourceProfile against which the given slot has matched for future book-keeping + slotToRequirementProfileMappings.put(allocatedSlot.getAllocationId(), match.get()); + + return Optional.of(allocatedSlot); + } + return Optional.empty(); + } + + @VisibleForTesting + ResourceCounter calculateUnfulfilledResources() { + return totalResourceRequirements.subtract(fulfilledResourceRequirements); + } + + private AllocatedSlot createAllocatedSlot( + SlotOffer slotOffer, + TaskManagerLocation taskManagerLocation, + TaskManagerGateway taskManagerGateway) { + return new AllocatedSlot( + slotOffer.getAllocationId(), + taskManagerLocation, + slotOffer.getSlotIndex(), + slotOffer.getResourceProfile(), + taskManagerGateway); + } + + private void increaseAvailableResources(ResourceCounter acceptedResources) { + fulfilledResourceRequirements = fulfilledResourceRequirements.add(acceptedResources); + } + + @Nonnull + private ResourceProfile getMatchingResourceProfile(AllocationID allocationId) { + return Preconditions.checkNotNull(slotToRequirementProfileMappings.get(allocationId), "No matching resource profile found for %s", allocationId); + } + + @Override + public PhysicalSlot reserveFreeSlot(AllocationID allocationId, ResourceProfile requiredSlotProfile) { + final AllocatedSlot allocatedSlot = slotPool.reserveFreeSlot(allocationId); + + Preconditions.checkState(allocatedSlot.getResourceProfile().isMatching(requiredSlotProfile), ""); + + ResourceProfile previouslyMatchedResourceProfile = Preconditions.checkNotNull(slotToRequirementProfileMappings.get(allocationId)); + + if (!previouslyMatchedResourceProfile.equals(requiredSlotProfile)) { + // slots can be reserved for a requirement that is not in line with the mapping we computed when the slot was + // offered, so we have to update the mapping adjust the requirements accordingly to ensure we still request enough slots to + // be able to fulfill the total requirements Review comment: Add debug logging which tells about the requirements adjustment. ########## File path: flink-runtime/src/main/java/org/apache/flink/runtime/jobmaster/slotpool/DefaultDeclarativeSlotPool.java ########## @@ -0,0 +1,428 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.flink.runtime.jobmaster.slotpool; + +import org.apache.flink.annotation.VisibleForTesting; +import org.apache.flink.api.common.time.Time; +import org.apache.flink.runtime.clusterframework.types.AllocationID; +import org.apache.flink.runtime.clusterframework.types.ResourceID; +import org.apache.flink.runtime.clusterframework.types.ResourceProfile; +import org.apache.flink.runtime.jobmanager.slots.TaskManagerGateway; +import org.apache.flink.runtime.jobmaster.SlotInfo; +import org.apache.flink.runtime.messages.Acknowledge; +import org.apache.flink.runtime.slots.DefaultRequirementMatcher; +import org.apache.flink.runtime.slots.RequirementMatcher; +import org.apache.flink.runtime.slots.ResourceRequirement; +import org.apache.flink.runtime.taskexecutor.slot.SlotOffer; +import org.apache.flink.runtime.taskmanager.TaskManagerLocation; +import org.apache.flink.util.FlinkException; +import org.apache.flink.util.Preconditions; + +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +import javax.annotation.Nonnull; +import javax.annotation.Nullable; + +import java.util.ArrayList; +import java.util.Collection; +import java.util.Collections; +import java.util.HashMap; +import java.util.Iterator; +import java.util.Map; +import java.util.Optional; +import java.util.concurrent.CompletableFuture; +import java.util.function.Consumer; +import java.util.stream.Collectors; + +/** + * Default {@link DeclarativeSlotPool} implementation. + * + * <p>The implementation collects the current resource requirements and declares them + * at the ResourceManager. Whenever new slots are offered, the slot pool compares the + * offered slots to the set of available and required resources and only accepts those + * slots which are required. + * + * <p>Slots which are released won't be returned directly to their owners. Instead, + * the slot pool implementation will only return them after the idleSlotTimeout has + * been exceeded by a free slot. + * + * <p>The slot pool will call {@link #notifyNewSlots} whenever newly offered slots are + * accepted or if an allocated slot should become free after it is being + * {@link #freeReservedSlot released}. + */ +public class DefaultDeclarativeSlotPool implements DeclarativeSlotPool { + + private static final Logger LOG = LoggerFactory.getLogger(DefaultDeclarativeSlotPool.class); + + private final Consumer<? super Collection<ResourceRequirement>> notifyNewResourceRequirements; + + private final Consumer<? super Collection<? extends PhysicalSlot>> notifyNewSlots; + + private final Time idleSlotTimeout; + private final Time rpcTimeout; + + private final AllocatedSlotPool slotPool; + + private final Map<AllocationID, ResourceProfile> slotToRequirementProfileMappings; + + private ResourceCounter totalResourceRequirements; + + private ResourceCounter fulfilledResourceRequirements; + + private final RequirementMatcher requirementMatcher = new DefaultRequirementMatcher(); + + public DefaultDeclarativeSlotPool( + AllocatedSlotPool slotPool, + Consumer<? super Collection<ResourceRequirement>> notifyNewResourceRequirements, + Consumer<? super Collection<? extends PhysicalSlot>> notifyNewSlots, + Time idleSlotTimeout, + Time rpcTimeout) { + + this.slotPool = slotPool; + this.notifyNewResourceRequirements = notifyNewResourceRequirements; + this.notifyNewSlots = notifyNewSlots; + this.idleSlotTimeout = idleSlotTimeout; + this.rpcTimeout = rpcTimeout; + this.totalResourceRequirements = ResourceCounter.empty(); + this.fulfilledResourceRequirements = ResourceCounter.empty(); + this.slotToRequirementProfileMappings = new HashMap<>(); + } + + @Override + public void increaseResourceRequirementsBy(ResourceCounter increment) { + totalResourceRequirements = totalResourceRequirements.add(increment); + + declareResourceRequirements(); + } + + @Override + public void decreaseResourceRequirementsBy(ResourceCounter decrement) { + totalResourceRequirements = totalResourceRequirements.subtract(decrement); + + declareResourceRequirements(); + } + + private void declareResourceRequirements() { + notifyNewResourceRequirements.accept(getResourceRequirements()); + } + + @Override + public Collection<ResourceRequirement> getResourceRequirements() { + final Collection<ResourceRequirement> currentResourceRequirements = new ArrayList<>(); + + for (Map.Entry<ResourceProfile, Integer> resourceRequirement : totalResourceRequirements.getResourcesWithCount()) { + currentResourceRequirements.add(ResourceRequirement.create(resourceRequirement.getKey(), resourceRequirement.getValue())); + } + + return currentResourceRequirements; + } + + @Override + public Collection<SlotOffer> offerSlots( + Collection<? extends SlotOffer> offers, + TaskManagerLocation taskManagerLocation, + TaskManagerGateway taskManagerGateway, + long currentTime) { + + LOG.debug("Received {} slot offers from TaskExecutor {}.", offers.size(), taskManagerLocation); + final Collection<SlotOffer> acceptedSlotOffers = new ArrayList<>(); + final Collection<AllocatedSlot> acceptedSlots = new ArrayList<>(); + + for (SlotOffer offer : offers) { + if (slotPool.containsSlot(offer.getAllocationId())) { + // we have already accepted this offer + acceptedSlotOffers.add(offer); + } else { + Optional<AllocatedSlot> acceptedSlot = matchOfferWithOutstandingRequirements(offer, taskManagerLocation, taskManagerGateway); + if (acceptedSlot.isPresent()) { + acceptedSlotOffers.add(offer); + acceptedSlots.add(acceptedSlot.get()); + } + } + } + + slotPool.addSlots(acceptedSlots, currentTime); + + if (!acceptedSlots.isEmpty()) { + notifyNewSlots.accept(acceptedSlots); + } + + return acceptedSlotOffers; + } + + private Optional<AllocatedSlot> matchOfferWithOutstandingRequirements( + SlotOffer slotOffer, + TaskManagerLocation taskManagerLocation, + TaskManagerGateway taskManagerGateway) { + + final Optional<ResourceProfile> match = requirementMatcher.match( + slotOffer.getResourceProfile(), + totalResourceRequirements.getResourcesWithCount(), + fulfilledResourceRequirements::getResourceCount); + + if (match.isPresent()) { + increaseAvailableResources(ResourceCounter.withResource(match.get(), 1)); + + final AllocatedSlot allocatedSlot = createAllocatedSlot( + slotOffer, + taskManagerLocation, + taskManagerGateway); + + // store the ResourceProfile against which the given slot has matched for future book-keeping + slotToRequirementProfileMappings.put(allocatedSlot.getAllocationId(), match.get()); + + return Optional.of(allocatedSlot); + } + return Optional.empty(); + } + + @VisibleForTesting + ResourceCounter calculateUnfulfilledResources() { + return totalResourceRequirements.subtract(fulfilledResourceRequirements); + } + + private AllocatedSlot createAllocatedSlot( + SlotOffer slotOffer, + TaskManagerLocation taskManagerLocation, + TaskManagerGateway taskManagerGateway) { + return new AllocatedSlot( + slotOffer.getAllocationId(), + taskManagerLocation, + slotOffer.getSlotIndex(), + slotOffer.getResourceProfile(), + taskManagerGateway); + } + + private void increaseAvailableResources(ResourceCounter acceptedResources) { + fulfilledResourceRequirements = fulfilledResourceRequirements.add(acceptedResources); + } + + @Nonnull + private ResourceProfile getMatchingResourceProfile(AllocationID allocationId) { + return Preconditions.checkNotNull(slotToRequirementProfileMappings.get(allocationId), "No matching resource profile found for %s", allocationId); + } + + @Override + public PhysicalSlot reserveFreeSlot(AllocationID allocationId, ResourceProfile requiredSlotProfile) { + final AllocatedSlot allocatedSlot = slotPool.reserveFreeSlot(allocationId); + + Preconditions.checkState(allocatedSlot.getResourceProfile().isMatching(requiredSlotProfile), ""); + + ResourceProfile previouslyMatchedResourceProfile = Preconditions.checkNotNull(slotToRequirementProfileMappings.get(allocationId)); + + if (!previouslyMatchedResourceProfile.equals(requiredSlotProfile)) { + // slots can be reserved for a requirement that is not in line with the mapping we computed when the slot was + // offered, so we have to update the mapping adjust the requirements accordingly to ensure we still request enough slots to + // be able to fulfill the total requirements + updateSlotToRequirementProfileMapping(allocationId, requiredSlotProfile); + adjustRequirements(previouslyMatchedResourceProfile, requiredSlotProfile); + } + + return allocatedSlot; + } + + @Override + public ResourceCounter freeReservedSlot(AllocationID allocationId, @Nullable Throwable cause, long currentTime) { + LOG.debug("Release slot {}.", allocationId); Review comment: Let's be consistent with the terms and use `Free reserved slot {}.` here. ########## File path: flink-runtime/src/test/java/org/apache/flink/runtime/jobmaster/slotpool/DefaultDeclarativeSlotPoolTest.java ########## @@ -0,0 +1,550 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.flink.runtime.jobmaster.slotpool; + +import org.apache.flink.api.common.time.Time; +import org.apache.flink.runtime.clusterframework.types.AllocationID; +import org.apache.flink.runtime.clusterframework.types.ResourceProfile; +import org.apache.flink.runtime.jobmanager.slots.TaskManagerGateway; +import org.apache.flink.runtime.jobmaster.JobMasterId; +import org.apache.flink.runtime.jobmaster.RpcTaskManagerGateway; +import org.apache.flink.runtime.messages.Acknowledge; +import org.apache.flink.runtime.slots.ResourceRequirement; +import org.apache.flink.runtime.taskexecutor.TaskExecutorGateway; +import org.apache.flink.runtime.taskexecutor.TestingTaskExecutorGateway; +import org.apache.flink.runtime.taskexecutor.TestingTaskExecutorGatewayBuilder; +import org.apache.flink.runtime.taskexecutor.slot.SlotOffer; +import org.apache.flink.runtime.taskmanager.LocalTaskManagerLocation; +import org.apache.flink.util.FlinkException; +import org.apache.flink.util.TestLogger; + +import org.apache.flink.shaded.guava18.com.google.common.collect.Iterables; + +import org.hamcrest.Description; +import org.hamcrest.TypeSafeMatcher; +import org.junit.Test; + +import javax.annotation.Nonnull; +import javax.annotation.Nullable; + +import java.util.ArrayList; +import java.util.Collection; +import java.util.HashMap; +import java.util.Map; +import java.util.concurrent.ArrayBlockingQueue; +import java.util.concurrent.BlockingQueue; +import java.util.concurrent.CompletableFuture; +import java.util.function.BiFunction; +import java.util.function.Consumer; +import java.util.function.Function; +import java.util.stream.Collectors; + +import static org.hamcrest.CoreMatchers.hasItems; +import static org.hamcrest.CoreMatchers.is; +import static org.hamcrest.CoreMatchers.not; +import static org.hamcrest.CoreMatchers.sameInstance; +import static org.hamcrest.Matchers.containsInAnyOrder; +import static org.hamcrest.Matchers.empty; +import static org.junit.Assert.assertFalse; +import static org.junit.Assert.assertThat; +import static org.junit.Assert.assertTrue; + +/** + * Tests for the {@link DefaultDeclarativeSlotPool}. + */ +public class DefaultDeclarativeSlotPoolTest extends TestLogger { + + private static final ResourceProfile RESOURCE_PROFILE_1 = ResourceProfile.newBuilder().setCpuCores(1.7).build(); + private static final ResourceProfile RESOURCE_PROFILE_2 = ResourceProfile.newBuilder().setManagedMemoryMB(100).build(); + + @Test + public void testIncreasingResourceRequirementsWillSendResourceRequirementNotification() throws InterruptedException { + final NewResourceRequirementsService requirementsListener = new NewResourceRequirementsService(); + final DeclarativeSlotPool slotPool = createDefaultDeclarativeSlotPool(requirementsListener); + + final ResourceCounter increment1 = ResourceCounter.withResource(RESOURCE_PROFILE_1, 1); + final ResourceCounter increment2 = createResourceRequirements(); + slotPool.increaseResourceRequirementsBy(increment1); + slotPool.increaseResourceRequirementsBy(increment2); + + assertThat(requirementsListener.takeResourceRequirements(), is(toResourceRequirements(increment1))); + + final ResourceCounter totalResources = increment1.add(increment2); + assertThat(requirementsListener.takeResourceRequirements(), is(toResourceRequirements(totalResources))); + assertThat(requirementsListener.hasNextResourceRequirements(), is(false)); + } + + @Test + public void testDecreasingResourceRequirementsWillSendResourceRequirementNotification() throws InterruptedException { + final NewResourceRequirementsService requirementsListener = new NewResourceRequirementsService(); + final DefaultDeclarativeSlotPool slotPool = createDefaultDeclarativeSlotPool(requirementsListener); + + final ResourceCounter increment = ResourceCounter.withResource(RESOURCE_PROFILE_1, 3); + slotPool.increaseResourceRequirementsBy(increment); + + requirementsListener.takeResourceRequirements(); + + final ResourceCounter decrement = ResourceCounter.withResource(RESOURCE_PROFILE_1, 2); + slotPool.decreaseResourceRequirementsBy(decrement); + + final ResourceCounter totalResources = increment.subtract(decrement); + assertThat(requirementsListener.takeResourceRequirements(), is(toResourceRequirements(totalResources))); + assertThat(requirementsListener.hasNextResourceRequirements(), is(false)); + } + + @Test + public void testGetResourceRequirements() { + final DefaultDeclarativeSlotPool slotPool = DefaultDeclarativeSlotPoolBuilder.builder().build(); + + assertThat(slotPool.getResourceRequirements(), is(toResourceRequirements(ResourceCounter.empty()))); + + final ResourceCounter resourceRequirements = createResourceRequirements(); + + slotPool.increaseResourceRequirementsBy(resourceRequirements); + + assertThat(slotPool.getResourceRequirements(), is(toResourceRequirements(resourceRequirements))); + } + + @Test + public void testOfferSlots() throws InterruptedException { + final NewSlotsService notifyNewSlots = new NewSlotsService(); + final DefaultDeclarativeSlotPool slotPool = createDefaultDeclarativeSlotPool(notifyNewSlots); + + final ResourceCounter resourceRequirements = createResourceRequirements(); + + slotPool.increaseResourceRequirementsBy(resourceRequirements); + + Collection<SlotOffer> slotOffers = createSlotOffersForResourceRequirements(resourceRequirements); + + final Collection<SlotOffer> acceptedSlots = offerSlots(slotPool, slotOffers); + + assertThat(acceptedSlots, containsInAnyOrder(slotOffers.toArray())); + + final Collection<PhysicalSlot> newSlots = drainNewSlotService(notifyNewSlots); + + assertThat(newSlots, containsInAnyOrder(slotOffers.stream().map(DefaultDeclarativeSlotPoolTest::matchesSlotOffer).collect(Collectors.toList()))); + assertThat(slotPool.getAllSlotsInformation(), containsInAnyOrder(newSlots.stream().map(DefaultAllocatedSlotPoolTest::matchesPhysicalSlot).collect(Collectors.toList()))); + } + + @Test + public void testDuplicateSlotOfferings() throws InterruptedException { + final NewSlotsService notifyNewSlots = new NewSlotsService(); + final DefaultDeclarativeSlotPool slotPool = createDefaultDeclarativeSlotPool(notifyNewSlots); + + final ResourceCounter resourceRequirements = createResourceRequirements(); + + slotPool.increaseResourceRequirementsBy(resourceRequirements); + + final Collection<SlotOffer> slotOffers = createSlotOffersForResourceRequirements(resourceRequirements); + + offerSlots(slotPool, slotOffers); + + drainNewSlotService(notifyNewSlots); + + final Collection<SlotOffer> acceptedSlots = offerSlots(slotPool, slotOffers); + + assertThat(acceptedSlots, containsInAnyOrder(slotOffers.toArray())); + // duplicate slots should not trigger notify new slots + assertFalse(notifyNewSlots.hasNextNewSlots()); + } + + @Test + public void testOfferingTooManySlots() { + final NewSlotsService notifyNewSlots = new NewSlotsService(); + final DefaultDeclarativeSlotPool slotPool = createDefaultDeclarativeSlotPool(notifyNewSlots); + + final ResourceCounter resourceRequirements = createResourceRequirements(); + + slotPool.increaseResourceRequirementsBy(resourceRequirements); + + final ResourceCounter increasedRequirements = resourceRequirements.add(RESOURCE_PROFILE_1, 2); + + final Collection<SlotOffer> slotOffers = createSlotOffersForResourceRequirements(increasedRequirements); + + final Collection<SlotOffer> acceptedSlots = offerSlots(slotPool, slotOffers); + + final Map<ResourceProfile, Long> resourceProfileCount = acceptedSlots.stream().map(SlotOffer::getResourceProfile).collect(Collectors.groupingBy(Function.identity(), Collectors.counting())); + + for (Map.Entry<ResourceProfile, Integer> resourceCount : resourceRequirements.getResourcesWithCount()) { + assertThat(resourceProfileCount.getOrDefault(resourceCount.getKey(), 0L), is((long) resourceCount.getValue())); + } + } + + @Test + public void testReleaseSlotsRemovesSlots() throws InterruptedException { + final NewResourceRequirementsService notifyNewResourceRequirements = new NewResourceRequirementsService(); + final DefaultDeclarativeSlotPool slotPool = createDefaultDeclarativeSlotPool(notifyNewResourceRequirements); + + final LocalTaskManagerLocation taskManagerLocation = new LocalTaskManagerLocation(); + increaseRequirementsAndOfferSlotsToSlotPool(slotPool, createResourceRequirements(), taskManagerLocation); + + notifyNewResourceRequirements.takeResourceRequirements(); + + slotPool.releaseSlots(taskManagerLocation.getResourceID(), new FlinkException("Test failure")); + assertThat(slotPool.getAllSlotsInformation(), is(empty())); + } + + @Test + public void testReleaseSlotsReturnsSlot() { + final DefaultDeclarativeSlotPool slotPool = DefaultDeclarativeSlotPoolBuilder.builder().build(); + + final ResourceCounter resourceRequirements = createResourceRequirements(); + + final LocalTaskManagerLocation taskManagerLocation = new LocalTaskManagerLocation(); + final FreeSlotConsumer freeSlotConsumer = new FreeSlotConsumer(); + final TestingTaskExecutorGateway testingTaskExecutorGateway = new TestingTaskExecutorGatewayBuilder() + .setFreeSlotFunction(freeSlotConsumer) + .createTestingTaskExecutorGateway(); + + final Collection<SlotOffer> slotOffers = increaseRequirementsAndOfferSlotsToSlotPool( + slotPool, + resourceRequirements, + taskManagerLocation, + testingTaskExecutorGateway); + + slotPool.releaseSlots(taskManagerLocation.getResourceID(), new FlinkException("Test failure")); + + final Collection<AllocationID> freedSlots = freeSlotConsumer.drainFreedSlots(); + + assertThat(freedSlots, containsInAnyOrder(slotOffers.stream().map(SlotOffer::getAllocationId).toArray())); + } + + @Test + public void testFailSlotDecreasesResources() throws InterruptedException { + final NewSlotsService notifyNewSlots = new NewSlotsService(); + final DefaultDeclarativeSlotPool slotPool = createDefaultDeclarativeSlotPool(notifyNewSlots); + + final ResourceCounter resourceRequirements = createResourceRequirements(); + increaseRequirementsAndOfferSlotsToSlotPool(slotPool, resourceRequirements, null); + + final Collection<? extends PhysicalSlot> physicalSlots = notifyNewSlots.takeNewSlots(); + + final PhysicalSlot physicalSlot = physicalSlots.iterator().next(); + + slotPool.releaseSlot(physicalSlot.getAllocationId(), new FlinkException("Test failure")); + + final ResourceCounter finalResourceRequirements = resourceRequirements.subtract(physicalSlot.getResourceProfile(), 1); + assertThat(slotPool.getFulfilledResourceRequirements(), is(finalResourceRequirements)); + } + + @Test + public void testFailSlotReturnsSlot() throws InterruptedException { Review comment: `testReleaseSlotReturnsSlot` ########## File path: flink-runtime/src/test/java/org/apache/flink/runtime/jobmaster/slotpool/DefaultDeclarativeSlotPoolTest.java ########## @@ -0,0 +1,550 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.flink.runtime.jobmaster.slotpool; + +import org.apache.flink.api.common.time.Time; +import org.apache.flink.runtime.clusterframework.types.AllocationID; +import org.apache.flink.runtime.clusterframework.types.ResourceProfile; +import org.apache.flink.runtime.jobmanager.slots.TaskManagerGateway; +import org.apache.flink.runtime.jobmaster.JobMasterId; +import org.apache.flink.runtime.jobmaster.RpcTaskManagerGateway; +import org.apache.flink.runtime.messages.Acknowledge; +import org.apache.flink.runtime.slots.ResourceRequirement; +import org.apache.flink.runtime.taskexecutor.TaskExecutorGateway; +import org.apache.flink.runtime.taskexecutor.TestingTaskExecutorGateway; +import org.apache.flink.runtime.taskexecutor.TestingTaskExecutorGatewayBuilder; +import org.apache.flink.runtime.taskexecutor.slot.SlotOffer; +import org.apache.flink.runtime.taskmanager.LocalTaskManagerLocation; +import org.apache.flink.util.FlinkException; +import org.apache.flink.util.TestLogger; + +import org.apache.flink.shaded.guava18.com.google.common.collect.Iterables; + +import org.hamcrest.Description; +import org.hamcrest.TypeSafeMatcher; +import org.junit.Test; + +import javax.annotation.Nonnull; +import javax.annotation.Nullable; + +import java.util.ArrayList; +import java.util.Collection; +import java.util.HashMap; +import java.util.Map; +import java.util.concurrent.ArrayBlockingQueue; +import java.util.concurrent.BlockingQueue; +import java.util.concurrent.CompletableFuture; +import java.util.function.BiFunction; +import java.util.function.Consumer; +import java.util.function.Function; +import java.util.stream.Collectors; + +import static org.hamcrest.CoreMatchers.hasItems; +import static org.hamcrest.CoreMatchers.is; +import static org.hamcrest.CoreMatchers.not; +import static org.hamcrest.CoreMatchers.sameInstance; +import static org.hamcrest.Matchers.containsInAnyOrder; +import static org.hamcrest.Matchers.empty; +import static org.junit.Assert.assertFalse; +import static org.junit.Assert.assertThat; +import static org.junit.Assert.assertTrue; + +/** + * Tests for the {@link DefaultDeclarativeSlotPool}. + */ +public class DefaultDeclarativeSlotPoolTest extends TestLogger { + + private static final ResourceProfile RESOURCE_PROFILE_1 = ResourceProfile.newBuilder().setCpuCores(1.7).build(); + private static final ResourceProfile RESOURCE_PROFILE_2 = ResourceProfile.newBuilder().setManagedMemoryMB(100).build(); + + @Test + public void testIncreasingResourceRequirementsWillSendResourceRequirementNotification() throws InterruptedException { + final NewResourceRequirementsService requirementsListener = new NewResourceRequirementsService(); + final DeclarativeSlotPool slotPool = createDefaultDeclarativeSlotPool(requirementsListener); + + final ResourceCounter increment1 = ResourceCounter.withResource(RESOURCE_PROFILE_1, 1); + final ResourceCounter increment2 = createResourceRequirements(); + slotPool.increaseResourceRequirementsBy(increment1); + slotPool.increaseResourceRequirementsBy(increment2); + + assertThat(requirementsListener.takeResourceRequirements(), is(toResourceRequirements(increment1))); + + final ResourceCounter totalResources = increment1.add(increment2); + assertThat(requirementsListener.takeResourceRequirements(), is(toResourceRequirements(totalResources))); + assertThat(requirementsListener.hasNextResourceRequirements(), is(false)); + } + + @Test + public void testDecreasingResourceRequirementsWillSendResourceRequirementNotification() throws InterruptedException { + final NewResourceRequirementsService requirementsListener = new NewResourceRequirementsService(); + final DefaultDeclarativeSlotPool slotPool = createDefaultDeclarativeSlotPool(requirementsListener); + + final ResourceCounter increment = ResourceCounter.withResource(RESOURCE_PROFILE_1, 3); + slotPool.increaseResourceRequirementsBy(increment); + + requirementsListener.takeResourceRequirements(); + + final ResourceCounter decrement = ResourceCounter.withResource(RESOURCE_PROFILE_1, 2); + slotPool.decreaseResourceRequirementsBy(decrement); + + final ResourceCounter totalResources = increment.subtract(decrement); + assertThat(requirementsListener.takeResourceRequirements(), is(toResourceRequirements(totalResources))); + assertThat(requirementsListener.hasNextResourceRequirements(), is(false)); + } + + @Test + public void testGetResourceRequirements() { + final DefaultDeclarativeSlotPool slotPool = DefaultDeclarativeSlotPoolBuilder.builder().build(); + + assertThat(slotPool.getResourceRequirements(), is(toResourceRequirements(ResourceCounter.empty()))); + + final ResourceCounter resourceRequirements = createResourceRequirements(); + + slotPool.increaseResourceRequirementsBy(resourceRequirements); + + assertThat(slotPool.getResourceRequirements(), is(toResourceRequirements(resourceRequirements))); + } + + @Test + public void testOfferSlots() throws InterruptedException { + final NewSlotsService notifyNewSlots = new NewSlotsService(); + final DefaultDeclarativeSlotPool slotPool = createDefaultDeclarativeSlotPool(notifyNewSlots); + + final ResourceCounter resourceRequirements = createResourceRequirements(); + + slotPool.increaseResourceRequirementsBy(resourceRequirements); + + Collection<SlotOffer> slotOffers = createSlotOffersForResourceRequirements(resourceRequirements); + + final Collection<SlotOffer> acceptedSlots = offerSlots(slotPool, slotOffers); + + assertThat(acceptedSlots, containsInAnyOrder(slotOffers.toArray())); + + final Collection<PhysicalSlot> newSlots = drainNewSlotService(notifyNewSlots); + + assertThat(newSlots, containsInAnyOrder(slotOffers.stream().map(DefaultDeclarativeSlotPoolTest::matchesSlotOffer).collect(Collectors.toList()))); + assertThat(slotPool.getAllSlotsInformation(), containsInAnyOrder(newSlots.stream().map(DefaultAllocatedSlotPoolTest::matchesPhysicalSlot).collect(Collectors.toList()))); + } + + @Test + public void testDuplicateSlotOfferings() throws InterruptedException { + final NewSlotsService notifyNewSlots = new NewSlotsService(); + final DefaultDeclarativeSlotPool slotPool = createDefaultDeclarativeSlotPool(notifyNewSlots); + + final ResourceCounter resourceRequirements = createResourceRequirements(); + + slotPool.increaseResourceRequirementsBy(resourceRequirements); + + final Collection<SlotOffer> slotOffers = createSlotOffersForResourceRequirements(resourceRequirements); + + offerSlots(slotPool, slotOffers); + + drainNewSlotService(notifyNewSlots); + + final Collection<SlotOffer> acceptedSlots = offerSlots(slotPool, slotOffers); + + assertThat(acceptedSlots, containsInAnyOrder(slotOffers.toArray())); + // duplicate slots should not trigger notify new slots + assertFalse(notifyNewSlots.hasNextNewSlots()); + } + + @Test + public void testOfferingTooManySlots() { + final NewSlotsService notifyNewSlots = new NewSlotsService(); + final DefaultDeclarativeSlotPool slotPool = createDefaultDeclarativeSlotPool(notifyNewSlots); + + final ResourceCounter resourceRequirements = createResourceRequirements(); + + slotPool.increaseResourceRequirementsBy(resourceRequirements); + + final ResourceCounter increasedRequirements = resourceRequirements.add(RESOURCE_PROFILE_1, 2); + + final Collection<SlotOffer> slotOffers = createSlotOffersForResourceRequirements(increasedRequirements); + + final Collection<SlotOffer> acceptedSlots = offerSlots(slotPool, slotOffers); + + final Map<ResourceProfile, Long> resourceProfileCount = acceptedSlots.stream().map(SlotOffer::getResourceProfile).collect(Collectors.groupingBy(Function.identity(), Collectors.counting())); + + for (Map.Entry<ResourceProfile, Integer> resourceCount : resourceRequirements.getResourcesWithCount()) { + assertThat(resourceProfileCount.getOrDefault(resourceCount.getKey(), 0L), is((long) resourceCount.getValue())); + } + } + + @Test + public void testReleaseSlotsRemovesSlots() throws InterruptedException { + final NewResourceRequirementsService notifyNewResourceRequirements = new NewResourceRequirementsService(); + final DefaultDeclarativeSlotPool slotPool = createDefaultDeclarativeSlotPool(notifyNewResourceRequirements); + + final LocalTaskManagerLocation taskManagerLocation = new LocalTaskManagerLocation(); + increaseRequirementsAndOfferSlotsToSlotPool(slotPool, createResourceRequirements(), taskManagerLocation); + + notifyNewResourceRequirements.takeResourceRequirements(); + + slotPool.releaseSlots(taskManagerLocation.getResourceID(), new FlinkException("Test failure")); + assertThat(slotPool.getAllSlotsInformation(), is(empty())); + } + + @Test + public void testReleaseSlotsReturnsSlot() { + final DefaultDeclarativeSlotPool slotPool = DefaultDeclarativeSlotPoolBuilder.builder().build(); + + final ResourceCounter resourceRequirements = createResourceRequirements(); + + final LocalTaskManagerLocation taskManagerLocation = new LocalTaskManagerLocation(); + final FreeSlotConsumer freeSlotConsumer = new FreeSlotConsumer(); + final TestingTaskExecutorGateway testingTaskExecutorGateway = new TestingTaskExecutorGatewayBuilder() + .setFreeSlotFunction(freeSlotConsumer) + .createTestingTaskExecutorGateway(); + + final Collection<SlotOffer> slotOffers = increaseRequirementsAndOfferSlotsToSlotPool( + slotPool, + resourceRequirements, + taskManagerLocation, + testingTaskExecutorGateway); + + slotPool.releaseSlots(taskManagerLocation.getResourceID(), new FlinkException("Test failure")); + + final Collection<AllocationID> freedSlots = freeSlotConsumer.drainFreedSlots(); + + assertThat(freedSlots, containsInAnyOrder(slotOffers.stream().map(SlotOffer::getAllocationId).toArray())); + } + + @Test + public void testFailSlotDecreasesResources() throws InterruptedException { + final NewSlotsService notifyNewSlots = new NewSlotsService(); + final DefaultDeclarativeSlotPool slotPool = createDefaultDeclarativeSlotPool(notifyNewSlots); + + final ResourceCounter resourceRequirements = createResourceRequirements(); + increaseRequirementsAndOfferSlotsToSlotPool(slotPool, resourceRequirements, null); + + final Collection<? extends PhysicalSlot> physicalSlots = notifyNewSlots.takeNewSlots(); + + final PhysicalSlot physicalSlot = physicalSlots.iterator().next(); + + slotPool.releaseSlot(physicalSlot.getAllocationId(), new FlinkException("Test failure")); + + final ResourceCounter finalResourceRequirements = resourceRequirements.subtract(physicalSlot.getResourceProfile(), 1); + assertThat(slotPool.getFulfilledResourceRequirements(), is(finalResourceRequirements)); + } + + @Test + public void testFailSlotReturnsSlot() throws InterruptedException { + final NewSlotsService notifyNewSlots = new NewSlotsService(); + final DefaultDeclarativeSlotPool slotPool = createDefaultDeclarativeSlotPool(notifyNewSlots); + + final ResourceCounter resourceRequirements = createResourceRequirements(); + final FreeSlotConsumer freeSlotConsumer = new FreeSlotConsumer(); + final TestingTaskExecutorGateway testingTaskExecutorGateway = new TestingTaskExecutorGatewayBuilder() + .setFreeSlotFunction(freeSlotConsumer) + .createTestingTaskExecutorGateway(); + + increaseRequirementsAndOfferSlotsToSlotPool( + slotPool, + resourceRequirements, + new LocalTaskManagerLocation(), + testingTaskExecutorGateway); + + final Collection<? extends PhysicalSlot> physicalSlots = notifyNewSlots.takeNewSlots(); + + final PhysicalSlot physicalSlot = physicalSlots.iterator().next(); + + slotPool.releaseSlot(physicalSlot.getAllocationId(), new FlinkException("Test failure")); + + final AllocationID freedSlot = Iterables.getOnlyElement(freeSlotConsumer.drainFreedSlots()); + + assertThat(freedSlot, is(physicalSlot.getAllocationId())); + } + + @Test + public void testReturnIdleSlotsAfterTimeout() { + final Time idleSlotTimeout = Time.seconds(10); + final long offerTime = 0; + final DefaultDeclarativeSlotPool slotPool = DefaultDeclarativeSlotPoolBuilder.builder() + .setIdleSlotTimeout(idleSlotTimeout) + .build(); + + final ResourceCounter resourceRequirements = createResourceRequirements(); + final FreeSlotConsumer freeSlotConsumer = new FreeSlotConsumer(); + final TestingTaskExecutorGateway testingTaskExecutorGateway = new TestingTaskExecutorGatewayBuilder() + .setFreeSlotFunction(freeSlotConsumer) + .createTestingTaskExecutorGateway(); + + final Collection<SlotOffer> acceptedSlots = increaseRequirementsAndOfferSlotsToSlotPool( + slotPool, + resourceRequirements, + new LocalTaskManagerLocation(), + testingTaskExecutorGateway); + + // decrease the resource requirements so that slots are no longer needed + slotPool.decreaseResourceRequirementsBy(resourceRequirements); + + slotPool.releaseIdleSlots(offerTime + idleSlotTimeout.toMilliseconds()); + + final Collection<AllocationID> freedSlots = freeSlotConsumer.drainFreedSlots(); + + assertThat(acceptedSlots, is(not(empty()))); + assertThat(freedSlots, containsInAnyOrder(acceptedSlots.stream().map(SlotOffer::getAllocationId).toArray())); + assertNoAvailableAndRequiredResources(slotPool); + } + + private void assertNoAvailableAndRequiredResources(DefaultDeclarativeSlotPool slotPool) { + assertTrue(slotPool.getFulfilledResourceRequirements().isEmpty()); + assertTrue(slotPool.getResourceRequirements().isEmpty()); + assertThat(slotPool.getAllSlotsInformation(), is(empty())); + } + + @Test + public void testOnlyReturnExcessIdleSlots() { + final Time idleSlotTimeout = Time.seconds(10); + final long offerTime = 0; + final DefaultDeclarativeSlotPool slotPool = DefaultDeclarativeSlotPoolBuilder.builder() + .setIdleSlotTimeout(idleSlotTimeout) + .build(); + + final ResourceCounter resourceRequirements = createResourceRequirements(); + final Collection<SlotOffer> slotOffers = createSlotOffersForResourceRequirements(resourceRequirements); + + slotPool.increaseResourceRequirementsBy(resourceRequirements); + final Collection<SlotOffer> acceptedSlots = offerSlots(slotPool, slotOffers); + + final ResourceCounter requiredResources = ResourceCounter.withResource(RESOURCE_PROFILE_1, 1); + final ResourceCounter excessRequirements = resourceRequirements.subtract(requiredResources); + slotPool.decreaseResourceRequirementsBy(excessRequirements); + + slotPool.releaseIdleSlots(offerTime + idleSlotTimeout.toMilliseconds()); + + assertThat(acceptedSlots, is(not(empty()))); + assertThat(slotPool.getFulfilledResourceRequirements(), is(requiredResources)); + } + + @Test + public void testReleasedSlotWillBeUsedToFulfillOutstandingResourceRequirements() throws InterruptedException { + final NewSlotsService notifyNewSlots = new NewSlotsService(); + final DefaultDeclarativeSlotPool slotPool = createDefaultDeclarativeSlotPool(notifyNewSlots); + + final ResourceCounter initialRequirements = ResourceCounter.withResource(RESOURCE_PROFILE_1, 1); + + increaseRequirementsAndOfferSlotsToSlotPool(slotPool, initialRequirements, null); + + final Collection<PhysicalSlot> newSlots = drainNewSlotService(notifyNewSlots); + final PhysicalSlot newSlot = Iterables.getOnlyElement(newSlots); + + slotPool.reserveFreeSlot(newSlot.getAllocationId(), RESOURCE_PROFILE_1); + slotPool.freeReservedSlot(newSlot.getAllocationId(), null, 0); + + final Collection<PhysicalSlot> recycledSlots = drainNewSlotService(notifyNewSlots); + + assertThat(Iterables.getOnlyElement(recycledSlots), sameInstance(newSlot)); + + final Collection<SlotOffer> newSlotOffers = createSlotOffersForResourceRequirements(initialRequirements); + + // the pending requirement should be fulfilled by the released slot --> rejecting new slot offers Review comment: ```suggestion // the pending requirement should be fulfilled by the freed slot --> rejecting new slot offers ``` ########## File path: flink-runtime/src/main/java/org/apache/flink/runtime/jobmaster/slotpool/DefaultDeclarativeSlotPool.java ########## @@ -0,0 +1,428 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.flink.runtime.jobmaster.slotpool; + +import org.apache.flink.annotation.VisibleForTesting; +import org.apache.flink.api.common.time.Time; +import org.apache.flink.runtime.clusterframework.types.AllocationID; +import org.apache.flink.runtime.clusterframework.types.ResourceID; +import org.apache.flink.runtime.clusterframework.types.ResourceProfile; +import org.apache.flink.runtime.jobmanager.slots.TaskManagerGateway; +import org.apache.flink.runtime.jobmaster.SlotInfo; +import org.apache.flink.runtime.messages.Acknowledge; +import org.apache.flink.runtime.slots.DefaultRequirementMatcher; +import org.apache.flink.runtime.slots.RequirementMatcher; +import org.apache.flink.runtime.slots.ResourceRequirement; +import org.apache.flink.runtime.taskexecutor.slot.SlotOffer; +import org.apache.flink.runtime.taskmanager.TaskManagerLocation; +import org.apache.flink.util.FlinkException; +import org.apache.flink.util.Preconditions; + +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +import javax.annotation.Nonnull; +import javax.annotation.Nullable; + +import java.util.ArrayList; +import java.util.Collection; +import java.util.Collections; +import java.util.HashMap; +import java.util.Iterator; +import java.util.Map; +import java.util.Optional; +import java.util.concurrent.CompletableFuture; +import java.util.function.Consumer; +import java.util.stream.Collectors; + +/** + * Default {@link DeclarativeSlotPool} implementation. + * + * <p>The implementation collects the current resource requirements and declares them + * at the ResourceManager. Whenever new slots are offered, the slot pool compares the + * offered slots to the set of available and required resources and only accepts those + * slots which are required. + * + * <p>Slots which are released won't be returned directly to their owners. Instead, + * the slot pool implementation will only return them after the idleSlotTimeout has + * been exceeded by a free slot. + * + * <p>The slot pool will call {@link #notifyNewSlots} whenever newly offered slots are + * accepted or if an allocated slot should become free after it is being + * {@link #freeReservedSlot released}. + */ +public class DefaultDeclarativeSlotPool implements DeclarativeSlotPool { + + private static final Logger LOG = LoggerFactory.getLogger(DefaultDeclarativeSlotPool.class); + + private final Consumer<? super Collection<ResourceRequirement>> notifyNewResourceRequirements; + + private final Consumer<? super Collection<? extends PhysicalSlot>> notifyNewSlots; + + private final Time idleSlotTimeout; + private final Time rpcTimeout; + + private final AllocatedSlotPool slotPool; + + private final Map<AllocationID, ResourceProfile> slotToRequirementProfileMappings; + + private ResourceCounter totalResourceRequirements; + + private ResourceCounter fulfilledResourceRequirements; + + private final RequirementMatcher requirementMatcher = new DefaultRequirementMatcher(); + + public DefaultDeclarativeSlotPool( + AllocatedSlotPool slotPool, + Consumer<? super Collection<ResourceRequirement>> notifyNewResourceRequirements, + Consumer<? super Collection<? extends PhysicalSlot>> notifyNewSlots, + Time idleSlotTimeout, + Time rpcTimeout) { + + this.slotPool = slotPool; + this.notifyNewResourceRequirements = notifyNewResourceRequirements; + this.notifyNewSlots = notifyNewSlots; + this.idleSlotTimeout = idleSlotTimeout; + this.rpcTimeout = rpcTimeout; + this.totalResourceRequirements = ResourceCounter.empty(); + this.fulfilledResourceRequirements = ResourceCounter.empty(); + this.slotToRequirementProfileMappings = new HashMap<>(); + } + + @Override + public void increaseResourceRequirementsBy(ResourceCounter increment) { + totalResourceRequirements = totalResourceRequirements.add(increment); + + declareResourceRequirements(); + } + + @Override + public void decreaseResourceRequirementsBy(ResourceCounter decrement) { + totalResourceRequirements = totalResourceRequirements.subtract(decrement); + + declareResourceRequirements(); + } + + private void declareResourceRequirements() { + notifyNewResourceRequirements.accept(getResourceRequirements()); + } + + @Override + public Collection<ResourceRequirement> getResourceRequirements() { + final Collection<ResourceRequirement> currentResourceRequirements = new ArrayList<>(); + + for (Map.Entry<ResourceProfile, Integer> resourceRequirement : totalResourceRequirements.getResourcesWithCount()) { + currentResourceRequirements.add(ResourceRequirement.create(resourceRequirement.getKey(), resourceRequirement.getValue())); + } + + return currentResourceRequirements; + } + + @Override + public Collection<SlotOffer> offerSlots( + Collection<? extends SlotOffer> offers, + TaskManagerLocation taskManagerLocation, + TaskManagerGateway taskManagerGateway, + long currentTime) { + + LOG.debug("Received {} slot offers from TaskExecutor {}.", offers.size(), taskManagerLocation); + final Collection<SlotOffer> acceptedSlotOffers = new ArrayList<>(); + final Collection<AllocatedSlot> acceptedSlots = new ArrayList<>(); + + for (SlotOffer offer : offers) { + if (slotPool.containsSlot(offer.getAllocationId())) { + // we have already accepted this offer + acceptedSlotOffers.add(offer); + } else { + Optional<AllocatedSlot> acceptedSlot = matchOfferWithOutstandingRequirements(offer, taskManagerLocation, taskManagerGateway); + if (acceptedSlot.isPresent()) { + acceptedSlotOffers.add(offer); + acceptedSlots.add(acceptedSlot.get()); + } + } + } + + slotPool.addSlots(acceptedSlots, currentTime); + + if (!acceptedSlots.isEmpty()) { + notifyNewSlots.accept(acceptedSlots); + } + + return acceptedSlotOffers; + } + + private Optional<AllocatedSlot> matchOfferWithOutstandingRequirements( + SlotOffer slotOffer, + TaskManagerLocation taskManagerLocation, + TaskManagerGateway taskManagerGateway) { + + final Optional<ResourceProfile> match = requirementMatcher.match( + slotOffer.getResourceProfile(), + totalResourceRequirements.getResourcesWithCount(), + fulfilledResourceRequirements::getResourceCount); + + if (match.isPresent()) { + increaseAvailableResources(ResourceCounter.withResource(match.get(), 1)); + + final AllocatedSlot allocatedSlot = createAllocatedSlot( + slotOffer, + taskManagerLocation, + taskManagerGateway); + + // store the ResourceProfile against which the given slot has matched for future book-keeping + slotToRequirementProfileMappings.put(allocatedSlot.getAllocationId(), match.get()); + + return Optional.of(allocatedSlot); + } + return Optional.empty(); + } + + @VisibleForTesting + ResourceCounter calculateUnfulfilledResources() { + return totalResourceRequirements.subtract(fulfilledResourceRequirements); + } + + private AllocatedSlot createAllocatedSlot( + SlotOffer slotOffer, + TaskManagerLocation taskManagerLocation, + TaskManagerGateway taskManagerGateway) { + return new AllocatedSlot( + slotOffer.getAllocationId(), + taskManagerLocation, + slotOffer.getSlotIndex(), + slotOffer.getResourceProfile(), + taskManagerGateway); + } + + private void increaseAvailableResources(ResourceCounter acceptedResources) { + fulfilledResourceRequirements = fulfilledResourceRequirements.add(acceptedResources); + } + + @Nonnull + private ResourceProfile getMatchingResourceProfile(AllocationID allocationId) { + return Preconditions.checkNotNull(slotToRequirementProfileMappings.get(allocationId), "No matching resource profile found for %s", allocationId); + } + + @Override + public PhysicalSlot reserveFreeSlot(AllocationID allocationId, ResourceProfile requiredSlotProfile) { + final AllocatedSlot allocatedSlot = slotPool.reserveFreeSlot(allocationId); + + Preconditions.checkState(allocatedSlot.getResourceProfile().isMatching(requiredSlotProfile), ""); Review comment: The error message is missing. ########## File path: flink-runtime/src/test/java/org/apache/flink/runtime/jobmaster/slotpool/DefaultDeclarativeSlotPoolTest.java ########## @@ -0,0 +1,550 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.flink.runtime.jobmaster.slotpool; + +import org.apache.flink.api.common.time.Time; +import org.apache.flink.runtime.clusterframework.types.AllocationID; +import org.apache.flink.runtime.clusterframework.types.ResourceProfile; +import org.apache.flink.runtime.jobmanager.slots.TaskManagerGateway; +import org.apache.flink.runtime.jobmaster.JobMasterId; +import org.apache.flink.runtime.jobmaster.RpcTaskManagerGateway; +import org.apache.flink.runtime.messages.Acknowledge; +import org.apache.flink.runtime.slots.ResourceRequirement; +import org.apache.flink.runtime.taskexecutor.TaskExecutorGateway; +import org.apache.flink.runtime.taskexecutor.TestingTaskExecutorGateway; +import org.apache.flink.runtime.taskexecutor.TestingTaskExecutorGatewayBuilder; +import org.apache.flink.runtime.taskexecutor.slot.SlotOffer; +import org.apache.flink.runtime.taskmanager.LocalTaskManagerLocation; +import org.apache.flink.util.FlinkException; +import org.apache.flink.util.TestLogger; + +import org.apache.flink.shaded.guava18.com.google.common.collect.Iterables; + +import org.hamcrest.Description; +import org.hamcrest.TypeSafeMatcher; +import org.junit.Test; + +import javax.annotation.Nonnull; +import javax.annotation.Nullable; + +import java.util.ArrayList; +import java.util.Collection; +import java.util.HashMap; +import java.util.Map; +import java.util.concurrent.ArrayBlockingQueue; +import java.util.concurrent.BlockingQueue; +import java.util.concurrent.CompletableFuture; +import java.util.function.BiFunction; +import java.util.function.Consumer; +import java.util.function.Function; +import java.util.stream.Collectors; + +import static org.hamcrest.CoreMatchers.hasItems; +import static org.hamcrest.CoreMatchers.is; +import static org.hamcrest.CoreMatchers.not; +import static org.hamcrest.CoreMatchers.sameInstance; +import static org.hamcrest.Matchers.containsInAnyOrder; +import static org.hamcrest.Matchers.empty; +import static org.junit.Assert.assertFalse; +import static org.junit.Assert.assertThat; +import static org.junit.Assert.assertTrue; + +/** + * Tests for the {@link DefaultDeclarativeSlotPool}. + */ +public class DefaultDeclarativeSlotPoolTest extends TestLogger { + + private static final ResourceProfile RESOURCE_PROFILE_1 = ResourceProfile.newBuilder().setCpuCores(1.7).build(); + private static final ResourceProfile RESOURCE_PROFILE_2 = ResourceProfile.newBuilder().setManagedMemoryMB(100).build(); + + @Test + public void testIncreasingResourceRequirementsWillSendResourceRequirementNotification() throws InterruptedException { + final NewResourceRequirementsService requirementsListener = new NewResourceRequirementsService(); + final DeclarativeSlotPool slotPool = createDefaultDeclarativeSlotPool(requirementsListener); + + final ResourceCounter increment1 = ResourceCounter.withResource(RESOURCE_PROFILE_1, 1); + final ResourceCounter increment2 = createResourceRequirements(); + slotPool.increaseResourceRequirementsBy(increment1); + slotPool.increaseResourceRequirementsBy(increment2); + + assertThat(requirementsListener.takeResourceRequirements(), is(toResourceRequirements(increment1))); + + final ResourceCounter totalResources = increment1.add(increment2); + assertThat(requirementsListener.takeResourceRequirements(), is(toResourceRequirements(totalResources))); + assertThat(requirementsListener.hasNextResourceRequirements(), is(false)); + } + + @Test + public void testDecreasingResourceRequirementsWillSendResourceRequirementNotification() throws InterruptedException { + final NewResourceRequirementsService requirementsListener = new NewResourceRequirementsService(); + final DefaultDeclarativeSlotPool slotPool = createDefaultDeclarativeSlotPool(requirementsListener); + + final ResourceCounter increment = ResourceCounter.withResource(RESOURCE_PROFILE_1, 3); + slotPool.increaseResourceRequirementsBy(increment); + + requirementsListener.takeResourceRequirements(); + + final ResourceCounter decrement = ResourceCounter.withResource(RESOURCE_PROFILE_1, 2); + slotPool.decreaseResourceRequirementsBy(decrement); + + final ResourceCounter totalResources = increment.subtract(decrement); + assertThat(requirementsListener.takeResourceRequirements(), is(toResourceRequirements(totalResources))); + assertThat(requirementsListener.hasNextResourceRequirements(), is(false)); + } + + @Test + public void testGetResourceRequirements() { + final DefaultDeclarativeSlotPool slotPool = DefaultDeclarativeSlotPoolBuilder.builder().build(); + + assertThat(slotPool.getResourceRequirements(), is(toResourceRequirements(ResourceCounter.empty()))); + + final ResourceCounter resourceRequirements = createResourceRequirements(); + + slotPool.increaseResourceRequirementsBy(resourceRequirements); + + assertThat(slotPool.getResourceRequirements(), is(toResourceRequirements(resourceRequirements))); + } + + @Test + public void testOfferSlots() throws InterruptedException { + final NewSlotsService notifyNewSlots = new NewSlotsService(); + final DefaultDeclarativeSlotPool slotPool = createDefaultDeclarativeSlotPool(notifyNewSlots); + + final ResourceCounter resourceRequirements = createResourceRequirements(); + + slotPool.increaseResourceRequirementsBy(resourceRequirements); + + Collection<SlotOffer> slotOffers = createSlotOffersForResourceRequirements(resourceRequirements); + + final Collection<SlotOffer> acceptedSlots = offerSlots(slotPool, slotOffers); + + assertThat(acceptedSlots, containsInAnyOrder(slotOffers.toArray())); + + final Collection<PhysicalSlot> newSlots = drainNewSlotService(notifyNewSlots); + + assertThat(newSlots, containsInAnyOrder(slotOffers.stream().map(DefaultDeclarativeSlotPoolTest::matchesSlotOffer).collect(Collectors.toList()))); + assertThat(slotPool.getAllSlotsInformation(), containsInAnyOrder(newSlots.stream().map(DefaultAllocatedSlotPoolTest::matchesPhysicalSlot).collect(Collectors.toList()))); + } + + @Test + public void testDuplicateSlotOfferings() throws InterruptedException { + final NewSlotsService notifyNewSlots = new NewSlotsService(); + final DefaultDeclarativeSlotPool slotPool = createDefaultDeclarativeSlotPool(notifyNewSlots); + + final ResourceCounter resourceRequirements = createResourceRequirements(); + + slotPool.increaseResourceRequirementsBy(resourceRequirements); + + final Collection<SlotOffer> slotOffers = createSlotOffersForResourceRequirements(resourceRequirements); + + offerSlots(slotPool, slotOffers); + + drainNewSlotService(notifyNewSlots); + + final Collection<SlotOffer> acceptedSlots = offerSlots(slotPool, slotOffers); + + assertThat(acceptedSlots, containsInAnyOrder(slotOffers.toArray())); + // duplicate slots should not trigger notify new slots + assertFalse(notifyNewSlots.hasNextNewSlots()); + } + + @Test + public void testOfferingTooManySlots() { + final NewSlotsService notifyNewSlots = new NewSlotsService(); + final DefaultDeclarativeSlotPool slotPool = createDefaultDeclarativeSlotPool(notifyNewSlots); + + final ResourceCounter resourceRequirements = createResourceRequirements(); + + slotPool.increaseResourceRequirementsBy(resourceRequirements); + + final ResourceCounter increasedRequirements = resourceRequirements.add(RESOURCE_PROFILE_1, 2); + + final Collection<SlotOffer> slotOffers = createSlotOffersForResourceRequirements(increasedRequirements); + + final Collection<SlotOffer> acceptedSlots = offerSlots(slotPool, slotOffers); + + final Map<ResourceProfile, Long> resourceProfileCount = acceptedSlots.stream().map(SlotOffer::getResourceProfile).collect(Collectors.groupingBy(Function.identity(), Collectors.counting())); + + for (Map.Entry<ResourceProfile, Integer> resourceCount : resourceRequirements.getResourcesWithCount()) { + assertThat(resourceProfileCount.getOrDefault(resourceCount.getKey(), 0L), is((long) resourceCount.getValue())); + } + } + + @Test + public void testReleaseSlotsRemovesSlots() throws InterruptedException { + final NewResourceRequirementsService notifyNewResourceRequirements = new NewResourceRequirementsService(); + final DefaultDeclarativeSlotPool slotPool = createDefaultDeclarativeSlotPool(notifyNewResourceRequirements); + + final LocalTaskManagerLocation taskManagerLocation = new LocalTaskManagerLocation(); + increaseRequirementsAndOfferSlotsToSlotPool(slotPool, createResourceRequirements(), taskManagerLocation); + + notifyNewResourceRequirements.takeResourceRequirements(); + + slotPool.releaseSlots(taskManagerLocation.getResourceID(), new FlinkException("Test failure")); + assertThat(slotPool.getAllSlotsInformation(), is(empty())); + } + + @Test + public void testReleaseSlotsReturnsSlot() { + final DefaultDeclarativeSlotPool slotPool = DefaultDeclarativeSlotPoolBuilder.builder().build(); + + final ResourceCounter resourceRequirements = createResourceRequirements(); + + final LocalTaskManagerLocation taskManagerLocation = new LocalTaskManagerLocation(); + final FreeSlotConsumer freeSlotConsumer = new FreeSlotConsumer(); + final TestingTaskExecutorGateway testingTaskExecutorGateway = new TestingTaskExecutorGatewayBuilder() + .setFreeSlotFunction(freeSlotConsumer) + .createTestingTaskExecutorGateway(); + + final Collection<SlotOffer> slotOffers = increaseRequirementsAndOfferSlotsToSlotPool( + slotPool, + resourceRequirements, + taskManagerLocation, + testingTaskExecutorGateway); + + slotPool.releaseSlots(taskManagerLocation.getResourceID(), new FlinkException("Test failure")); + + final Collection<AllocationID> freedSlots = freeSlotConsumer.drainFreedSlots(); + + assertThat(freedSlots, containsInAnyOrder(slotOffers.stream().map(SlotOffer::getAllocationId).toArray())); + } + + @Test + public void testFailSlotDecreasesResources() throws InterruptedException { Review comment: Maybe better to call this test `testReleaseSlotDecreasesFulfilledResourceRequirements`. ########## File path: flink-runtime/src/main/java/org/apache/flink/runtime/jobmaster/slotpool/DefaultDeclarativeSlotPool.java ########## @@ -0,0 +1,428 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.flink.runtime.jobmaster.slotpool; + +import org.apache.flink.annotation.VisibleForTesting; +import org.apache.flink.api.common.time.Time; +import org.apache.flink.runtime.clusterframework.types.AllocationID; +import org.apache.flink.runtime.clusterframework.types.ResourceID; +import org.apache.flink.runtime.clusterframework.types.ResourceProfile; +import org.apache.flink.runtime.jobmanager.slots.TaskManagerGateway; +import org.apache.flink.runtime.jobmaster.SlotInfo; +import org.apache.flink.runtime.messages.Acknowledge; +import org.apache.flink.runtime.slots.DefaultRequirementMatcher; +import org.apache.flink.runtime.slots.RequirementMatcher; +import org.apache.flink.runtime.slots.ResourceRequirement; +import org.apache.flink.runtime.taskexecutor.slot.SlotOffer; +import org.apache.flink.runtime.taskmanager.TaskManagerLocation; +import org.apache.flink.util.FlinkException; +import org.apache.flink.util.Preconditions; + +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +import javax.annotation.Nonnull; +import javax.annotation.Nullable; + +import java.util.ArrayList; +import java.util.Collection; +import java.util.Collections; +import java.util.HashMap; +import java.util.Iterator; +import java.util.Map; +import java.util.Optional; +import java.util.concurrent.CompletableFuture; +import java.util.function.Consumer; +import java.util.stream.Collectors; + +/** + * Default {@link DeclarativeSlotPool} implementation. + * + * <p>The implementation collects the current resource requirements and declares them + * at the ResourceManager. Whenever new slots are offered, the slot pool compares the + * offered slots to the set of available and required resources and only accepts those + * slots which are required. + * + * <p>Slots which are released won't be returned directly to their owners. Instead, + * the slot pool implementation will only return them after the idleSlotTimeout has + * been exceeded by a free slot. + * + * <p>The slot pool will call {@link #notifyNewSlots} whenever newly offered slots are + * accepted or if an allocated slot should become free after it is being + * {@link #freeReservedSlot released}. + */ +public class DefaultDeclarativeSlotPool implements DeclarativeSlotPool { + + private static final Logger LOG = LoggerFactory.getLogger(DefaultDeclarativeSlotPool.class); + + private final Consumer<? super Collection<ResourceRequirement>> notifyNewResourceRequirements; + + private final Consumer<? super Collection<? extends PhysicalSlot>> notifyNewSlots; + + private final Time idleSlotTimeout; + private final Time rpcTimeout; + + private final AllocatedSlotPool slotPool; + + private final Map<AllocationID, ResourceProfile> slotToRequirementProfileMappings; + + private ResourceCounter totalResourceRequirements; + + private ResourceCounter fulfilledResourceRequirements; + + private final RequirementMatcher requirementMatcher = new DefaultRequirementMatcher(); + + public DefaultDeclarativeSlotPool( + AllocatedSlotPool slotPool, + Consumer<? super Collection<ResourceRequirement>> notifyNewResourceRequirements, + Consumer<? super Collection<? extends PhysicalSlot>> notifyNewSlots, + Time idleSlotTimeout, + Time rpcTimeout) { + + this.slotPool = slotPool; + this.notifyNewResourceRequirements = notifyNewResourceRequirements; + this.notifyNewSlots = notifyNewSlots; + this.idleSlotTimeout = idleSlotTimeout; + this.rpcTimeout = rpcTimeout; + this.totalResourceRequirements = ResourceCounter.empty(); + this.fulfilledResourceRequirements = ResourceCounter.empty(); + this.slotToRequirementProfileMappings = new HashMap<>(); + } + + @Override + public void increaseResourceRequirementsBy(ResourceCounter increment) { + totalResourceRequirements = totalResourceRequirements.add(increment); + + declareResourceRequirements(); + } + + @Override + public void decreaseResourceRequirementsBy(ResourceCounter decrement) { + totalResourceRequirements = totalResourceRequirements.subtract(decrement); + + declareResourceRequirements(); + } + + private void declareResourceRequirements() { + notifyNewResourceRequirements.accept(getResourceRequirements()); + } + + @Override + public Collection<ResourceRequirement> getResourceRequirements() { + final Collection<ResourceRequirement> currentResourceRequirements = new ArrayList<>(); + + for (Map.Entry<ResourceProfile, Integer> resourceRequirement : totalResourceRequirements.getResourcesWithCount()) { + currentResourceRequirements.add(ResourceRequirement.create(resourceRequirement.getKey(), resourceRequirement.getValue())); + } + + return currentResourceRequirements; + } + + @Override + public Collection<SlotOffer> offerSlots( + Collection<? extends SlotOffer> offers, + TaskManagerLocation taskManagerLocation, + TaskManagerGateway taskManagerGateway, + long currentTime) { + + LOG.debug("Received {} slot offers from TaskExecutor {}.", offers.size(), taskManagerLocation); + final Collection<SlotOffer> acceptedSlotOffers = new ArrayList<>(); + final Collection<AllocatedSlot> acceptedSlots = new ArrayList<>(); + + for (SlotOffer offer : offers) { + if (slotPool.containsSlot(offer.getAllocationId())) { + // we have already accepted this offer + acceptedSlotOffers.add(offer); + } else { + Optional<AllocatedSlot> acceptedSlot = matchOfferWithOutstandingRequirements(offer, taskManagerLocation, taskManagerGateway); + if (acceptedSlot.isPresent()) { + acceptedSlotOffers.add(offer); + acceptedSlots.add(acceptedSlot.get()); + } + } + } + + slotPool.addSlots(acceptedSlots, currentTime); + + if (!acceptedSlots.isEmpty()) { + notifyNewSlots.accept(acceptedSlots); + } + + return acceptedSlotOffers; + } + + private Optional<AllocatedSlot> matchOfferWithOutstandingRequirements( + SlotOffer slotOffer, + TaskManagerLocation taskManagerLocation, + TaskManagerGateway taskManagerGateway) { + + final Optional<ResourceProfile> match = requirementMatcher.match( + slotOffer.getResourceProfile(), + totalResourceRequirements.getResourcesWithCount(), + fulfilledResourceRequirements::getResourceCount); + + if (match.isPresent()) { + increaseAvailableResources(ResourceCounter.withResource(match.get(), 1)); + + final AllocatedSlot allocatedSlot = createAllocatedSlot( + slotOffer, + taskManagerLocation, + taskManagerGateway); + + // store the ResourceProfile against which the given slot has matched for future book-keeping + slotToRequirementProfileMappings.put(allocatedSlot.getAllocationId(), match.get()); + + return Optional.of(allocatedSlot); + } + return Optional.empty(); + } + + @VisibleForTesting + ResourceCounter calculateUnfulfilledResources() { + return totalResourceRequirements.subtract(fulfilledResourceRequirements); + } + + private AllocatedSlot createAllocatedSlot( + SlotOffer slotOffer, + TaskManagerLocation taskManagerLocation, + TaskManagerGateway taskManagerGateway) { + return new AllocatedSlot( + slotOffer.getAllocationId(), + taskManagerLocation, + slotOffer.getSlotIndex(), + slotOffer.getResourceProfile(), + taskManagerGateway); + } + + private void increaseAvailableResources(ResourceCounter acceptedResources) { + fulfilledResourceRequirements = fulfilledResourceRequirements.add(acceptedResources); + } + + @Nonnull + private ResourceProfile getMatchingResourceProfile(AllocationID allocationId) { + return Preconditions.checkNotNull(slotToRequirementProfileMappings.get(allocationId), "No matching resource profile found for %s", allocationId); + } + + @Override + public PhysicalSlot reserveFreeSlot(AllocationID allocationId, ResourceProfile requiredSlotProfile) { + final AllocatedSlot allocatedSlot = slotPool.reserveFreeSlot(allocationId); + + Preconditions.checkState(allocatedSlot.getResourceProfile().isMatching(requiredSlotProfile), ""); + + ResourceProfile previouslyMatchedResourceProfile = Preconditions.checkNotNull(slotToRequirementProfileMappings.get(allocationId)); + + if (!previouslyMatchedResourceProfile.equals(requiredSlotProfile)) { + // slots can be reserved for a requirement that is not in line with the mapping we computed when the slot was + // offered, so we have to update the mapping adjust the requirements accordingly to ensure we still request enough slots to + // be able to fulfill the total requirements + updateSlotToRequirementProfileMapping(allocationId, requiredSlotProfile); + adjustRequirements(previouslyMatchedResourceProfile, requiredSlotProfile); + } + + return allocatedSlot; + } + + @Override + public ResourceCounter freeReservedSlot(AllocationID allocationId, @Nullable Throwable cause, long currentTime) { + LOG.debug("Release slot {}.", allocationId); + + final Optional<AllocatedSlot> releasedSlot = slotPool.freeReservedSlot(allocationId, currentTime); Review comment: ```suggestion final Optional<AllocatedSlot> freedSlot = slotPool.freeReservedSlot(allocationId, currentTime); ``` ########## File path: flink-runtime/src/main/java/org/apache/flink/runtime/jobmaster/slotpool/DefaultDeclarativeSlotPool.java ########## @@ -0,0 +1,428 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.flink.runtime.jobmaster.slotpool; + +import org.apache.flink.annotation.VisibleForTesting; +import org.apache.flink.api.common.time.Time; +import org.apache.flink.runtime.clusterframework.types.AllocationID; +import org.apache.flink.runtime.clusterframework.types.ResourceID; +import org.apache.flink.runtime.clusterframework.types.ResourceProfile; +import org.apache.flink.runtime.jobmanager.slots.TaskManagerGateway; +import org.apache.flink.runtime.jobmaster.SlotInfo; +import org.apache.flink.runtime.messages.Acknowledge; +import org.apache.flink.runtime.slots.DefaultRequirementMatcher; +import org.apache.flink.runtime.slots.RequirementMatcher; +import org.apache.flink.runtime.slots.ResourceRequirement; +import org.apache.flink.runtime.taskexecutor.slot.SlotOffer; +import org.apache.flink.runtime.taskmanager.TaskManagerLocation; +import org.apache.flink.util.FlinkException; +import org.apache.flink.util.Preconditions; + +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +import javax.annotation.Nonnull; +import javax.annotation.Nullable; + +import java.util.ArrayList; +import java.util.Collection; +import java.util.Collections; +import java.util.HashMap; +import java.util.Iterator; +import java.util.Map; +import java.util.Optional; +import java.util.concurrent.CompletableFuture; +import java.util.function.Consumer; +import java.util.stream.Collectors; + +/** + * Default {@link DeclarativeSlotPool} implementation. + * + * <p>The implementation collects the current resource requirements and declares them + * at the ResourceManager. Whenever new slots are offered, the slot pool compares the + * offered slots to the set of available and required resources and only accepts those + * slots which are required. + * + * <p>Slots which are released won't be returned directly to their owners. Instead, + * the slot pool implementation will only return them after the idleSlotTimeout has + * been exceeded by a free slot. + * + * <p>The slot pool will call {@link #notifyNewSlots} whenever newly offered slots are + * accepted or if an allocated slot should become free after it is being + * {@link #freeReservedSlot released}. + */ +public class DefaultDeclarativeSlotPool implements DeclarativeSlotPool { + + private static final Logger LOG = LoggerFactory.getLogger(DefaultDeclarativeSlotPool.class); + + private final Consumer<? super Collection<ResourceRequirement>> notifyNewResourceRequirements; + + private final Consumer<? super Collection<? extends PhysicalSlot>> notifyNewSlots; + + private final Time idleSlotTimeout; + private final Time rpcTimeout; + + private final AllocatedSlotPool slotPool; + + private final Map<AllocationID, ResourceProfile> slotToRequirementProfileMappings; + + private ResourceCounter totalResourceRequirements; + + private ResourceCounter fulfilledResourceRequirements; + + private final RequirementMatcher requirementMatcher = new DefaultRequirementMatcher(); + + public DefaultDeclarativeSlotPool( + AllocatedSlotPool slotPool, + Consumer<? super Collection<ResourceRequirement>> notifyNewResourceRequirements, + Consumer<? super Collection<? extends PhysicalSlot>> notifyNewSlots, + Time idleSlotTimeout, + Time rpcTimeout) { + + this.slotPool = slotPool; + this.notifyNewResourceRequirements = notifyNewResourceRequirements; + this.notifyNewSlots = notifyNewSlots; + this.idleSlotTimeout = idleSlotTimeout; + this.rpcTimeout = rpcTimeout; + this.totalResourceRequirements = ResourceCounter.empty(); + this.fulfilledResourceRequirements = ResourceCounter.empty(); + this.slotToRequirementProfileMappings = new HashMap<>(); + } + + @Override + public void increaseResourceRequirementsBy(ResourceCounter increment) { + totalResourceRequirements = totalResourceRequirements.add(increment); + + declareResourceRequirements(); + } + + @Override + public void decreaseResourceRequirementsBy(ResourceCounter decrement) { + totalResourceRequirements = totalResourceRequirements.subtract(decrement); + + declareResourceRequirements(); + } + + private void declareResourceRequirements() { + notifyNewResourceRequirements.accept(getResourceRequirements()); + } + + @Override + public Collection<ResourceRequirement> getResourceRequirements() { + final Collection<ResourceRequirement> currentResourceRequirements = new ArrayList<>(); + + for (Map.Entry<ResourceProfile, Integer> resourceRequirement : totalResourceRequirements.getResourcesWithCount()) { + currentResourceRequirements.add(ResourceRequirement.create(resourceRequirement.getKey(), resourceRequirement.getValue())); + } + + return currentResourceRequirements; + } + + @Override + public Collection<SlotOffer> offerSlots( + Collection<? extends SlotOffer> offers, + TaskManagerLocation taskManagerLocation, + TaskManagerGateway taskManagerGateway, + long currentTime) { + + LOG.debug("Received {} slot offers from TaskExecutor {}.", offers.size(), taskManagerLocation); + final Collection<SlotOffer> acceptedSlotOffers = new ArrayList<>(); + final Collection<AllocatedSlot> acceptedSlots = new ArrayList<>(); + + for (SlotOffer offer : offers) { + if (slotPool.containsSlot(offer.getAllocationId())) { + // we have already accepted this offer + acceptedSlotOffers.add(offer); + } else { + Optional<AllocatedSlot> acceptedSlot = matchOfferWithOutstandingRequirements(offer, taskManagerLocation, taskManagerGateway); + if (acceptedSlot.isPresent()) { + acceptedSlotOffers.add(offer); + acceptedSlots.add(acceptedSlot.get()); + } + } + } + + slotPool.addSlots(acceptedSlots, currentTime); + + if (!acceptedSlots.isEmpty()) { + notifyNewSlots.accept(acceptedSlots); + } + + return acceptedSlotOffers; + } + + private Optional<AllocatedSlot> matchOfferWithOutstandingRequirements( + SlotOffer slotOffer, + TaskManagerLocation taskManagerLocation, + TaskManagerGateway taskManagerGateway) { + + final Optional<ResourceProfile> match = requirementMatcher.match( + slotOffer.getResourceProfile(), + totalResourceRequirements.getResourcesWithCount(), + fulfilledResourceRequirements::getResourceCount); + + if (match.isPresent()) { + increaseAvailableResources(ResourceCounter.withResource(match.get(), 1)); + + final AllocatedSlot allocatedSlot = createAllocatedSlot( + slotOffer, + taskManagerLocation, + taskManagerGateway); + + // store the ResourceProfile against which the given slot has matched for future book-keeping + slotToRequirementProfileMappings.put(allocatedSlot.getAllocationId(), match.get()); + + return Optional.of(allocatedSlot); + } + return Optional.empty(); + } + + @VisibleForTesting + ResourceCounter calculateUnfulfilledResources() { + return totalResourceRequirements.subtract(fulfilledResourceRequirements); + } + + private AllocatedSlot createAllocatedSlot( + SlotOffer slotOffer, + TaskManagerLocation taskManagerLocation, + TaskManagerGateway taskManagerGateway) { + return new AllocatedSlot( + slotOffer.getAllocationId(), + taskManagerLocation, + slotOffer.getSlotIndex(), + slotOffer.getResourceProfile(), + taskManagerGateway); + } + + private void increaseAvailableResources(ResourceCounter acceptedResources) { + fulfilledResourceRequirements = fulfilledResourceRequirements.add(acceptedResources); + } + + @Nonnull + private ResourceProfile getMatchingResourceProfile(AllocationID allocationId) { + return Preconditions.checkNotNull(slotToRequirementProfileMappings.get(allocationId), "No matching resource profile found for %s", allocationId); + } + + @Override + public PhysicalSlot reserveFreeSlot(AllocationID allocationId, ResourceProfile requiredSlotProfile) { + final AllocatedSlot allocatedSlot = slotPool.reserveFreeSlot(allocationId); + + Preconditions.checkState(allocatedSlot.getResourceProfile().isMatching(requiredSlotProfile), ""); + + ResourceProfile previouslyMatchedResourceProfile = Preconditions.checkNotNull(slotToRequirementProfileMappings.get(allocationId)); + + if (!previouslyMatchedResourceProfile.equals(requiredSlotProfile)) { + // slots can be reserved for a requirement that is not in line with the mapping we computed when the slot was + // offered, so we have to update the mapping adjust the requirements accordingly to ensure we still request enough slots to + // be able to fulfill the total requirements + updateSlotToRequirementProfileMapping(allocationId, requiredSlotProfile); + adjustRequirements(previouslyMatchedResourceProfile, requiredSlotProfile); + } + + return allocatedSlot; + } + + @Override + public ResourceCounter freeReservedSlot(AllocationID allocationId, @Nullable Throwable cause, long currentTime) { + LOG.debug("Release slot {}.", allocationId); + + final Optional<AllocatedSlot> releasedSlot = slotPool.freeReservedSlot(allocationId, currentTime); + + Optional<ResourceCounter> previouslyFulfilledRequirement = releasedSlot.map(Collections::singleton).map(this::getFulfilledRequirements); + + releasedSlot.ifPresent(allocatedSlot -> { + releasePayload(Collections.singleton(allocatedSlot), cause); + tryToFulfillResourceRequirement(allocatedSlot); + notifyNewSlots.accept(Collections.singletonList(allocatedSlot)); + }); + + return previouslyFulfilledRequirement.orElseGet(ResourceCounter::empty); + } + + private void tryToFulfillResourceRequirement(AllocatedSlot allocatedSlot) { + matchOfferWithOutstandingRequirements(allocatedSlotToSlotOffer(allocatedSlot), allocatedSlot.getTaskManagerLocation(), allocatedSlot.getTaskManagerGateway()); + } + + private void updateSlotToRequirementProfileMapping(AllocationID allocationId, ResourceProfile matchedResourceProfile) { + final ResourceProfile oldResourceProfile = Preconditions.checkNotNull(slotToRequirementProfileMappings.put(allocationId, matchedResourceProfile), "Expected slot profile matching to be non-empty."); + + fulfilledResourceRequirements = fulfilledResourceRequirements.add(matchedResourceProfile, 1); + fulfilledResourceRequirements = fulfilledResourceRequirements.subtract(oldResourceProfile, 1); + } + + private void adjustRequirements(ResourceProfile oldResourceProfile, ResourceProfile newResourceProfile) { + // slots can be reserved for a requirement that is not in line with the mapping we computed when the slot was + // offered, so we have to adjust the requirements accordingly to ensure we still request enough slots to + // be able to fulfill the total requirements + decreaseResourceRequirementsBy(ResourceCounter.withResource(newResourceProfile, 1)); + increaseResourceRequirementsBy(ResourceCounter.withResource(oldResourceProfile, 1)); + } + + @Nonnull + private SlotOffer allocatedSlotToSlotOffer(AllocatedSlot allocatedSlot) { + return new SlotOffer(allocatedSlot.getAllocationId(), allocatedSlot.getPhysicalSlotNumber(), allocatedSlot.getResourceProfile()); + } + + @Override + public ResourceCounter releaseSlots(ResourceID owner, Exception cause) { + final Collection<AllocatedSlot> removedSlots = slotPool.removeSlots(owner); + + ResourceCounter previouslyFulfilledRequirements = getFulfilledRequirements(removedSlots); + + releasePayload(removedSlots, cause); + releaseSlots(removedSlots, cause); + + return previouslyFulfilledRequirements; + } + + @Override + public ResourceCounter releaseSlot(AllocationID allocationId, Exception cause) { + final Optional<AllocatedSlot> removedSlot = slotPool.removeSlot(allocationId); + + Optional<ResourceCounter> previouslyFulfilledRequirement = removedSlot.map(Collections::singleton).map(this::getFulfilledRequirements); + + removedSlot.ifPresent(allocatedSlot -> { + releasePayload(Collections.singleton(allocatedSlot), cause); + releaseSlots(Collections.singleton(allocatedSlot), cause); + }); + + return previouslyFulfilledRequirement.orElseGet(ResourceCounter::empty); + } + + private void releasePayload(Iterable<? extends AllocatedSlot> allocatedSlots, Throwable cause) { + for (AllocatedSlot allocatedSlot : allocatedSlots) { + allocatedSlot.releasePayload(cause); + } + } + + @Override + public void releaseIdleSlots(long currentTimeMillis) { + final Collection<AllocatedSlotPool.FreeSlotInfo> freeSlotsInformation = slotPool.getFreeSlotsInformation(); + + ResourceCounter excessResources = fulfilledResourceRequirements.subtract(totalResourceRequirements); + + final Iterator<AllocatedSlotPool.FreeSlotInfo> freeSlotIterator = freeSlotsInformation.iterator(); + + final Collection<AllocatedSlot> slotsToReturnToOwner = new ArrayList<>(); + + while (!excessResources.isEmpty() && freeSlotIterator.hasNext()) { + final AllocatedSlotPool.FreeSlotInfo idleSlot = freeSlotIterator.next(); + + if (currentTimeMillis >= idleSlot.getFreeSince() + idleSlotTimeout.toMilliseconds()) { + final ResourceProfile matchingProfile = getMatchingResourceProfile(idleSlot.getAllocationId()); + + if (excessResources.containsResource(matchingProfile)) { + excessResources = excessResources.subtract(matchingProfile, 1); + final Optional<AllocatedSlot> removedSlot = slotPool.removeSlot(idleSlot.getAllocationId()); + + final AllocatedSlot allocatedSlot = removedSlot.orElseThrow(() -> new IllegalStateException(String.format("Could not find slot for allocation id %s.", idleSlot.getAllocationId()))); + slotsToReturnToOwner.add(allocatedSlot); + } + } + } + + releaseSlots(slotsToReturnToOwner, new FlinkException("Returning idle slots to their owners.")); + } + + private void releaseSlots(Iterable<AllocatedSlot> slotsToReturnToOwner, Throwable cause) { + for (AllocatedSlot slotToReturn : slotsToReturnToOwner) { + Preconditions.checkState(!slotToReturn.isUsed(), "Free slot must not be used."); + + LOG.info("Releasing slot [{}].", slotToReturn.getAllocationId()); Review comment: maybe log on debug the `cause`. ########## File path: flink-runtime/src/test/java/org/apache/flink/runtime/jobmaster/slotpool/DefaultDeclarativeSlotPoolTest.java ########## @@ -0,0 +1,550 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.flink.runtime.jobmaster.slotpool; + +import org.apache.flink.api.common.time.Time; +import org.apache.flink.runtime.clusterframework.types.AllocationID; +import org.apache.flink.runtime.clusterframework.types.ResourceProfile; +import org.apache.flink.runtime.jobmanager.slots.TaskManagerGateway; +import org.apache.flink.runtime.jobmaster.JobMasterId; +import org.apache.flink.runtime.jobmaster.RpcTaskManagerGateway; +import org.apache.flink.runtime.messages.Acknowledge; +import org.apache.flink.runtime.slots.ResourceRequirement; +import org.apache.flink.runtime.taskexecutor.TaskExecutorGateway; +import org.apache.flink.runtime.taskexecutor.TestingTaskExecutorGateway; +import org.apache.flink.runtime.taskexecutor.TestingTaskExecutorGatewayBuilder; +import org.apache.flink.runtime.taskexecutor.slot.SlotOffer; +import org.apache.flink.runtime.taskmanager.LocalTaskManagerLocation; +import org.apache.flink.util.FlinkException; +import org.apache.flink.util.TestLogger; + +import org.apache.flink.shaded.guava18.com.google.common.collect.Iterables; + +import org.hamcrest.Description; +import org.hamcrest.TypeSafeMatcher; +import org.junit.Test; + +import javax.annotation.Nonnull; +import javax.annotation.Nullable; + +import java.util.ArrayList; +import java.util.Collection; +import java.util.HashMap; +import java.util.Map; +import java.util.concurrent.ArrayBlockingQueue; +import java.util.concurrent.BlockingQueue; +import java.util.concurrent.CompletableFuture; +import java.util.function.BiFunction; +import java.util.function.Consumer; +import java.util.function.Function; +import java.util.stream.Collectors; + +import static org.hamcrest.CoreMatchers.hasItems; +import static org.hamcrest.CoreMatchers.is; +import static org.hamcrest.CoreMatchers.not; +import static org.hamcrest.CoreMatchers.sameInstance; +import static org.hamcrest.Matchers.containsInAnyOrder; +import static org.hamcrest.Matchers.empty; +import static org.junit.Assert.assertFalse; +import static org.junit.Assert.assertThat; +import static org.junit.Assert.assertTrue; + +/** + * Tests for the {@link DefaultDeclarativeSlotPool}. + */ +public class DefaultDeclarativeSlotPoolTest extends TestLogger { + + private static final ResourceProfile RESOURCE_PROFILE_1 = ResourceProfile.newBuilder().setCpuCores(1.7).build(); + private static final ResourceProfile RESOURCE_PROFILE_2 = ResourceProfile.newBuilder().setManagedMemoryMB(100).build(); + + @Test + public void testIncreasingResourceRequirementsWillSendResourceRequirementNotification() throws InterruptedException { + final NewResourceRequirementsService requirementsListener = new NewResourceRequirementsService(); + final DeclarativeSlotPool slotPool = createDefaultDeclarativeSlotPool(requirementsListener); + + final ResourceCounter increment1 = ResourceCounter.withResource(RESOURCE_PROFILE_1, 1); + final ResourceCounter increment2 = createResourceRequirements(); + slotPool.increaseResourceRequirementsBy(increment1); + slotPool.increaseResourceRequirementsBy(increment2); + + assertThat(requirementsListener.takeResourceRequirements(), is(toResourceRequirements(increment1))); + + final ResourceCounter totalResources = increment1.add(increment2); + assertThat(requirementsListener.takeResourceRequirements(), is(toResourceRequirements(totalResources))); + assertThat(requirementsListener.hasNextResourceRequirements(), is(false)); + } + + @Test + public void testDecreasingResourceRequirementsWillSendResourceRequirementNotification() throws InterruptedException { + final NewResourceRequirementsService requirementsListener = new NewResourceRequirementsService(); + final DefaultDeclarativeSlotPool slotPool = createDefaultDeclarativeSlotPool(requirementsListener); + + final ResourceCounter increment = ResourceCounter.withResource(RESOURCE_PROFILE_1, 3); + slotPool.increaseResourceRequirementsBy(increment); + + requirementsListener.takeResourceRequirements(); + + final ResourceCounter decrement = ResourceCounter.withResource(RESOURCE_PROFILE_1, 2); + slotPool.decreaseResourceRequirementsBy(decrement); + + final ResourceCounter totalResources = increment.subtract(decrement); + assertThat(requirementsListener.takeResourceRequirements(), is(toResourceRequirements(totalResources))); + assertThat(requirementsListener.hasNextResourceRequirements(), is(false)); + } + + @Test + public void testGetResourceRequirements() { + final DefaultDeclarativeSlotPool slotPool = DefaultDeclarativeSlotPoolBuilder.builder().build(); + + assertThat(slotPool.getResourceRequirements(), is(toResourceRequirements(ResourceCounter.empty()))); + + final ResourceCounter resourceRequirements = createResourceRequirements(); + + slotPool.increaseResourceRequirementsBy(resourceRequirements); + + assertThat(slotPool.getResourceRequirements(), is(toResourceRequirements(resourceRequirements))); + } + + @Test + public void testOfferSlots() throws InterruptedException { + final NewSlotsService notifyNewSlots = new NewSlotsService(); + final DefaultDeclarativeSlotPool slotPool = createDefaultDeclarativeSlotPool(notifyNewSlots); + + final ResourceCounter resourceRequirements = createResourceRequirements(); + + slotPool.increaseResourceRequirementsBy(resourceRequirements); + + Collection<SlotOffer> slotOffers = createSlotOffersForResourceRequirements(resourceRequirements); + + final Collection<SlotOffer> acceptedSlots = offerSlots(slotPool, slotOffers); + + assertThat(acceptedSlots, containsInAnyOrder(slotOffers.toArray())); + + final Collection<PhysicalSlot> newSlots = drainNewSlotService(notifyNewSlots); + + assertThat(newSlots, containsInAnyOrder(slotOffers.stream().map(DefaultDeclarativeSlotPoolTest::matchesSlotOffer).collect(Collectors.toList()))); + assertThat(slotPool.getAllSlotsInformation(), containsInAnyOrder(newSlots.stream().map(DefaultAllocatedSlotPoolTest::matchesPhysicalSlot).collect(Collectors.toList()))); + } + + @Test + public void testDuplicateSlotOfferings() throws InterruptedException { + final NewSlotsService notifyNewSlots = new NewSlotsService(); + final DefaultDeclarativeSlotPool slotPool = createDefaultDeclarativeSlotPool(notifyNewSlots); + + final ResourceCounter resourceRequirements = createResourceRequirements(); + + slotPool.increaseResourceRequirementsBy(resourceRequirements); + + final Collection<SlotOffer> slotOffers = createSlotOffersForResourceRequirements(resourceRequirements); + + offerSlots(slotPool, slotOffers); + + drainNewSlotService(notifyNewSlots); + + final Collection<SlotOffer> acceptedSlots = offerSlots(slotPool, slotOffers); + + assertThat(acceptedSlots, containsInAnyOrder(slotOffers.toArray())); + // duplicate slots should not trigger notify new slots + assertFalse(notifyNewSlots.hasNextNewSlots()); + } + + @Test + public void testOfferingTooManySlots() { + final NewSlotsService notifyNewSlots = new NewSlotsService(); + final DefaultDeclarativeSlotPool slotPool = createDefaultDeclarativeSlotPool(notifyNewSlots); + + final ResourceCounter resourceRequirements = createResourceRequirements(); + + slotPool.increaseResourceRequirementsBy(resourceRequirements); + + final ResourceCounter increasedRequirements = resourceRequirements.add(RESOURCE_PROFILE_1, 2); + + final Collection<SlotOffer> slotOffers = createSlotOffersForResourceRequirements(increasedRequirements); + + final Collection<SlotOffer> acceptedSlots = offerSlots(slotPool, slotOffers); + + final Map<ResourceProfile, Long> resourceProfileCount = acceptedSlots.stream().map(SlotOffer::getResourceProfile).collect(Collectors.groupingBy(Function.identity(), Collectors.counting())); + + for (Map.Entry<ResourceProfile, Integer> resourceCount : resourceRequirements.getResourcesWithCount()) { + assertThat(resourceProfileCount.getOrDefault(resourceCount.getKey(), 0L), is((long) resourceCount.getValue())); + } + } + + @Test + public void testReleaseSlotsRemovesSlots() throws InterruptedException { + final NewResourceRequirementsService notifyNewResourceRequirements = new NewResourceRequirementsService(); + final DefaultDeclarativeSlotPool slotPool = createDefaultDeclarativeSlotPool(notifyNewResourceRequirements); + + final LocalTaskManagerLocation taskManagerLocation = new LocalTaskManagerLocation(); + increaseRequirementsAndOfferSlotsToSlotPool(slotPool, createResourceRequirements(), taskManagerLocation); + + notifyNewResourceRequirements.takeResourceRequirements(); + + slotPool.releaseSlots(taskManagerLocation.getResourceID(), new FlinkException("Test failure")); + assertThat(slotPool.getAllSlotsInformation(), is(empty())); + } + + @Test + public void testReleaseSlotsReturnsSlot() { + final DefaultDeclarativeSlotPool slotPool = DefaultDeclarativeSlotPoolBuilder.builder().build(); + + final ResourceCounter resourceRequirements = createResourceRequirements(); + + final LocalTaskManagerLocation taskManagerLocation = new LocalTaskManagerLocation(); + final FreeSlotConsumer freeSlotConsumer = new FreeSlotConsumer(); + final TestingTaskExecutorGateway testingTaskExecutorGateway = new TestingTaskExecutorGatewayBuilder() + .setFreeSlotFunction(freeSlotConsumer) + .createTestingTaskExecutorGateway(); + + final Collection<SlotOffer> slotOffers = increaseRequirementsAndOfferSlotsToSlotPool( + slotPool, + resourceRequirements, + taskManagerLocation, + testingTaskExecutorGateway); + + slotPool.releaseSlots(taskManagerLocation.getResourceID(), new FlinkException("Test failure")); + + final Collection<AllocationID> freedSlots = freeSlotConsumer.drainFreedSlots(); + + assertThat(freedSlots, containsInAnyOrder(slotOffers.stream().map(SlotOffer::getAllocationId).toArray())); + } + + @Test + public void testFailSlotDecreasesResources() throws InterruptedException { + final NewSlotsService notifyNewSlots = new NewSlotsService(); + final DefaultDeclarativeSlotPool slotPool = createDefaultDeclarativeSlotPool(notifyNewSlots); + + final ResourceCounter resourceRequirements = createResourceRequirements(); + increaseRequirementsAndOfferSlotsToSlotPool(slotPool, resourceRequirements, null); + + final Collection<? extends PhysicalSlot> physicalSlots = notifyNewSlots.takeNewSlots(); + + final PhysicalSlot physicalSlot = physicalSlots.iterator().next(); + + slotPool.releaseSlot(physicalSlot.getAllocationId(), new FlinkException("Test failure")); + + final ResourceCounter finalResourceRequirements = resourceRequirements.subtract(physicalSlot.getResourceProfile(), 1); + assertThat(slotPool.getFulfilledResourceRequirements(), is(finalResourceRequirements)); + } + + @Test + public void testFailSlotReturnsSlot() throws InterruptedException { + final NewSlotsService notifyNewSlots = new NewSlotsService(); + final DefaultDeclarativeSlotPool slotPool = createDefaultDeclarativeSlotPool(notifyNewSlots); + + final ResourceCounter resourceRequirements = createResourceRequirements(); + final FreeSlotConsumer freeSlotConsumer = new FreeSlotConsumer(); + final TestingTaskExecutorGateway testingTaskExecutorGateway = new TestingTaskExecutorGatewayBuilder() + .setFreeSlotFunction(freeSlotConsumer) + .createTestingTaskExecutorGateway(); + + increaseRequirementsAndOfferSlotsToSlotPool( + slotPool, + resourceRequirements, + new LocalTaskManagerLocation(), + testingTaskExecutorGateway); + + final Collection<? extends PhysicalSlot> physicalSlots = notifyNewSlots.takeNewSlots(); + + final PhysicalSlot physicalSlot = physicalSlots.iterator().next(); + + slotPool.releaseSlot(physicalSlot.getAllocationId(), new FlinkException("Test failure")); + + final AllocationID freedSlot = Iterables.getOnlyElement(freeSlotConsumer.drainFreedSlots()); + + assertThat(freedSlot, is(physicalSlot.getAllocationId())); + } + + @Test + public void testReturnIdleSlotsAfterTimeout() { + final Time idleSlotTimeout = Time.seconds(10); + final long offerTime = 0; + final DefaultDeclarativeSlotPool slotPool = DefaultDeclarativeSlotPoolBuilder.builder() + .setIdleSlotTimeout(idleSlotTimeout) + .build(); + + final ResourceCounter resourceRequirements = createResourceRequirements(); + final FreeSlotConsumer freeSlotConsumer = new FreeSlotConsumer(); + final TestingTaskExecutorGateway testingTaskExecutorGateway = new TestingTaskExecutorGatewayBuilder() + .setFreeSlotFunction(freeSlotConsumer) + .createTestingTaskExecutorGateway(); + + final Collection<SlotOffer> acceptedSlots = increaseRequirementsAndOfferSlotsToSlotPool( + slotPool, + resourceRequirements, + new LocalTaskManagerLocation(), + testingTaskExecutorGateway); + + // decrease the resource requirements so that slots are no longer needed + slotPool.decreaseResourceRequirementsBy(resourceRequirements); + + slotPool.releaseIdleSlots(offerTime + idleSlotTimeout.toMilliseconds()); + + final Collection<AllocationID> freedSlots = freeSlotConsumer.drainFreedSlots(); + + assertThat(acceptedSlots, is(not(empty()))); + assertThat(freedSlots, containsInAnyOrder(acceptedSlots.stream().map(SlotOffer::getAllocationId).toArray())); + assertNoAvailableAndRequiredResources(slotPool); + } + + private void assertNoAvailableAndRequiredResources(DefaultDeclarativeSlotPool slotPool) { + assertTrue(slotPool.getFulfilledResourceRequirements().isEmpty()); + assertTrue(slotPool.getResourceRequirements().isEmpty()); + assertThat(slotPool.getAllSlotsInformation(), is(empty())); + } + + @Test + public void testOnlyReturnExcessIdleSlots() { + final Time idleSlotTimeout = Time.seconds(10); + final long offerTime = 0; + final DefaultDeclarativeSlotPool slotPool = DefaultDeclarativeSlotPoolBuilder.builder() + .setIdleSlotTimeout(idleSlotTimeout) + .build(); + + final ResourceCounter resourceRequirements = createResourceRequirements(); + final Collection<SlotOffer> slotOffers = createSlotOffersForResourceRequirements(resourceRequirements); + + slotPool.increaseResourceRequirementsBy(resourceRequirements); + final Collection<SlotOffer> acceptedSlots = offerSlots(slotPool, slotOffers); + + final ResourceCounter requiredResources = ResourceCounter.withResource(RESOURCE_PROFILE_1, 1); + final ResourceCounter excessRequirements = resourceRequirements.subtract(requiredResources); + slotPool.decreaseResourceRequirementsBy(excessRequirements); + + slotPool.releaseIdleSlots(offerTime + idleSlotTimeout.toMilliseconds()); + + assertThat(acceptedSlots, is(not(empty()))); + assertThat(slotPool.getFulfilledResourceRequirements(), is(requiredResources)); + } + + @Test + public void testReleasedSlotWillBeUsedToFulfillOutstandingResourceRequirements() throws InterruptedException { Review comment: ```suggestion public void testFreedSlotWillBeUsedToFulfillOutstandingResourceRequirements() throws InterruptedException { ``` ---------------------------------------------------------------- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: [email protected]
