This is an automated email from the ASF dual-hosted git repository. guoyangze pushed a commit to branch master in repository https://gitbox.apache.org/repos/asf/flink.git
commit f1a079f95e409d3a81ff5dd199ea3c0dc30470cc Author: Weihua Hu <[email protected]> AuthorDate: Thu Jun 29 19:20:45 2023 +0800 [FLINK-31843][runtime] SlotSelectionStrategy use FreeSlotInfoTracker to select the best slot. --- .../jobmaster/slotpool/AllocatedSlotPool.java | 7 +++ .../jobmaster/slotpool/DeclarativeSlotPool.java | 7 +++ .../slotpool/DeclarativeSlotPoolBridge.java | 7 +++ .../slotpool/DefaultAllocatedSlotPool.java | 19 ++++++++ .../slotpool/DefaultDeclarativeSlotPool.java | 5 +++ ...ultLocationPreferenceSlotSelectionStrategy.java | 8 ++-- ...OutLocationPreferenceSlotSelectionStrategy.java | 25 ++++++----- .../LocationPreferenceSlotSelectionStrategy.java | 24 +++++----- .../slotpool/PhysicalSlotProviderImpl.java | 17 +++++--- .../PreviousAllocationSlotSelectionStrategy.java | 42 +++++------------- .../flink/runtime/jobmaster/slotpool/SlotPool.java | 7 +++ .../jobmaster/slotpool/SlotSelectionStrategy.java | 8 ++-- ...ocationPreferenceSlotSelectionStrategyTest.java | 15 +++---- .../types/SlotSelectionStrategyTestBase.java | 51 ++++++++++------------ .../flink/runtime/jobmaster/JobMasterTest.java | 11 +++++ .../slotpool/TestingDeclarativeSlotPool.java | 9 ++++ .../TestingDeclarativeSlotPoolBuilder.java | 9 ++++ 17 files changed, 166 insertions(+), 105 deletions(-) diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/jobmaster/slotpool/AllocatedSlotPool.java b/flink-runtime/src/main/java/org/apache/flink/runtime/jobmaster/slotpool/AllocatedSlotPool.java index 092963c00d1..03152e364cf 100644 --- a/flink-runtime/src/main/java/org/apache/flink/runtime/jobmaster/slotpool/AllocatedSlotPool.java +++ b/flink-runtime/src/main/java/org/apache/flink/runtime/jobmaster/slotpool/AllocatedSlotPool.java @@ -117,6 +117,13 @@ public interface AllocatedSlotPool { */ Collection<FreeSlotInfo> getFreeSlotsInformation(); + /** + * Returns information about all currently free slots. + * + * @return free slot information + */ + FreeSlotInfoTracker getFreeSlotInfoTracker(); + /** * Returns information about all slots in this pool. * diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/jobmaster/slotpool/DeclarativeSlotPool.java b/flink-runtime/src/main/java/org/apache/flink/runtime/jobmaster/slotpool/DeclarativeSlotPool.java index 9cd89064275..b4a01c9d18b 100644 --- a/flink-runtime/src/main/java/org/apache/flink/runtime/jobmaster/slotpool/DeclarativeSlotPool.java +++ b/flink-runtime/src/main/java/org/apache/flink/runtime/jobmaster/slotpool/DeclarativeSlotPool.java @@ -115,6 +115,13 @@ public interface DeclarativeSlotPool { */ Collection<SlotInfoWithUtilization> getFreeSlotsInformation(); + /** + * Returns the free slot tracker. + * + * @return free slot tracker + */ + FreeSlotInfoTracker getFreeSlotInfoTracker(); + /** * Returns the slot information for all slots (free and allocated slots). * diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/jobmaster/slotpool/DeclarativeSlotPoolBridge.java b/flink-runtime/src/main/java/org/apache/flink/runtime/jobmaster/slotpool/DeclarativeSlotPoolBridge.java index 3cb0d04ed19..36fb9692726 100644 --- a/flink-runtime/src/main/java/org/apache/flink/runtime/jobmaster/slotpool/DeclarativeSlotPoolBridge.java +++ b/flink-runtime/src/main/java/org/apache/flink/runtime/jobmaster/slotpool/DeclarativeSlotPoolBridge.java @@ -439,6 +439,13 @@ public class DeclarativeSlotPoolBridge extends DeclarativeSlotPoolService implem return getDeclarativeSlotPool().getFreeSlotsInformation(); } + @Override + public FreeSlotInfoTracker getFreeSlotInfoTracker() { + assertRunningInMainThread(); + + return getDeclarativeSlotPool().getFreeSlotInfoTracker(); + } + @Override public void disableBatchSlotRequestTimeoutCheck() { isBatchSlotRequestTimeoutCheckDisabled = true; diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/jobmaster/slotpool/DefaultAllocatedSlotPool.java b/flink-runtime/src/main/java/org/apache/flink/runtime/jobmaster/slotpool/DefaultAllocatedSlotPool.java index e83b84fdca2..c768054e3a3 100644 --- a/flink-runtime/src/main/java/org/apache/flink/runtime/jobmaster/slotpool/DefaultAllocatedSlotPool.java +++ b/flink-runtime/src/main/java/org/apache/flink/runtime/jobmaster/slotpool/DefaultAllocatedSlotPool.java @@ -205,6 +205,25 @@ public class DefaultAllocatedSlotPool implements AllocatedSlotPool { return freeSlotInfos; } + private FreeSlotInfo getFreeSlotInfo(AllocationID allocationId) { + final AllocatedSlot allocatedSlot = + Preconditions.checkNotNull(registeredSlots.get(allocationId)); + final Long idleSince = + Preconditions.checkNotNull(freeSlots.getFreeSlotsSince().get(allocationId)); + final SlotInfoWithUtilization slotInfoWithUtilization = + SlotInfoWithUtilization.from(allocatedSlot, this::getTaskExecutorUtilization); + return DefaultFreeSlotInfo.create(slotInfoWithUtilization, idleSince); + } + + @Override + public FreeSlotInfoTracker getFreeSlotInfoTracker() { + return new DefaultFreeSlotInfoTracker( + freeSlots.getFreeSlotsSince().keySet(), + registeredSlots::get, + this::getFreeSlotInfo, + this::getTaskExecutorUtilization); + } + public double getTaskExecutorUtilization(ResourceID resourceId) { Set<AllocationID> slots = slotsPerTaskExecutor.get(resourceId); Preconditions.checkNotNull(slots, "There is no slots on %s", resourceId); diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/jobmaster/slotpool/DefaultDeclarativeSlotPool.java b/flink-runtime/src/main/java/org/apache/flink/runtime/jobmaster/slotpool/DefaultDeclarativeSlotPool.java index bd97f3aa7fc..82147530355 100644 --- a/flink-runtime/src/main/java/org/apache/flink/runtime/jobmaster/slotpool/DefaultDeclarativeSlotPool.java +++ b/flink-runtime/src/main/java/org/apache/flink/runtime/jobmaster/slotpool/DefaultDeclarativeSlotPool.java @@ -570,6 +570,11 @@ public class DefaultDeclarativeSlotPool implements DeclarativeSlotPool { .collect(Collectors.toList()); } + @Override + public FreeSlotInfoTracker getFreeSlotInfoTracker() { + return slotPool.getFreeSlotInfoTracker(); + } + @Override public Collection<? extends SlotInfo> getAllSlotsInformation() { return slotPool.getAllSlotsInformation(); diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/jobmaster/slotpool/DefaultLocationPreferenceSlotSelectionStrategy.java b/flink-runtime/src/main/java/org/apache/flink/runtime/jobmaster/slotpool/DefaultLocationPreferenceSlotSelectionStrategy.java index a10e5609e68..62bb31518e9 100644 --- a/flink-runtime/src/main/java/org/apache/flink/runtime/jobmaster/slotpool/DefaultLocationPreferenceSlotSelectionStrategy.java +++ b/flink-runtime/src/main/java/org/apache/flink/runtime/jobmaster/slotpool/DefaultLocationPreferenceSlotSelectionStrategy.java @@ -18,12 +18,13 @@ package org.apache.flink.runtime.jobmaster.slotpool; +import org.apache.flink.runtime.clusterframework.types.AllocationID; import org.apache.flink.runtime.clusterframework.types.ResourceProfile; import org.apache.flink.runtime.jobmanager.scheduler.Locality; +import org.apache.flink.runtime.jobmaster.SlotInfo; import javax.annotation.Nonnull; -import java.util.Collection; import java.util.Optional; import java.util.function.Supplier; @@ -33,9 +34,10 @@ class DefaultLocationPreferenceSlotSelectionStrategy @Nonnull @Override protected Optional<SlotInfoAndLocality> selectWithoutLocationPreference( - @Nonnull Collection<SlotInfoWithUtilization> availableSlots, + @Nonnull FreeSlotInfoTracker freeSlotInfoTracker, @Nonnull ResourceProfile resourceProfile) { - for (SlotInfoWithUtilization candidate : availableSlots) { + for (AllocationID allocationId : freeSlotInfoTracker.getAvailableSlots()) { + SlotInfo candidate = freeSlotInfoTracker.getSlotInfo(allocationId); if (candidate.getResourceProfile().isMatching(resourceProfile)) { return Optional.of(SlotInfoAndLocality.of(candidate, Locality.UNCONSTRAINED)); } diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/jobmaster/slotpool/EvenlySpreadOutLocationPreferenceSlotSelectionStrategy.java b/flink-runtime/src/main/java/org/apache/flink/runtime/jobmaster/slotpool/EvenlySpreadOutLocationPreferenceSlotSelectionStrategy.java index fdae956c95d..be8bd1cc1c7 100644 --- a/flink-runtime/src/main/java/org/apache/flink/runtime/jobmaster/slotpool/EvenlySpreadOutLocationPreferenceSlotSelectionStrategy.java +++ b/flink-runtime/src/main/java/org/apache/flink/runtime/jobmaster/slotpool/EvenlySpreadOutLocationPreferenceSlotSelectionStrategy.java @@ -18,12 +18,12 @@ package org.apache.flink.runtime.jobmaster.slotpool; +import org.apache.flink.api.java.tuple.Tuple2; import org.apache.flink.runtime.clusterframework.types.ResourceProfile; import org.apache.flink.runtime.jobmanager.scheduler.Locality; import javax.annotation.Nonnull; -import java.util.Collection; import java.util.Comparator; import java.util.Optional; import java.util.function.Supplier; @@ -33,19 +33,22 @@ class EvenlySpreadOutLocationPreferenceSlotSelectionStrategy @Nonnull @Override protected Optional<SlotInfoAndLocality> selectWithoutLocationPreference( - @Nonnull Collection<SlotInfoWithUtilization> availableSlots, + @Nonnull FreeSlotInfoTracker freeSlotInfoTracker, @Nonnull ResourceProfile resourceProfile) { - return availableSlots.stream() - .filter( - slotInfoWithUtilization -> - slotInfoWithUtilization - .getResourceProfile() - .isMatching(resourceProfile)) - .min(Comparator.comparing(SlotInfoWithUtilization::getTaskExecutorUtilization)) + return freeSlotInfoTracker.getAvailableSlots().stream() + .map(freeSlotInfoTracker::getSlotInfo) + .filter(slotInfo -> slotInfo.getResourceProfile().isMatching(resourceProfile)) + // calculate utilization first to avoid duplicated calculation in min() .map( - slotInfoWithUtilization -> + slot -> + new Tuple2<>( + slot, freeSlotInfoTracker.getTaskExecutorUtilization(slot))) + .min(Comparator.comparingDouble(tuple -> tuple.f1)) + .map( + slotInfoWithTaskExecutorUtilization -> SlotInfoAndLocality.of( - slotInfoWithUtilization, Locality.UNCONSTRAINED)); + slotInfoWithTaskExecutorUtilization.f0, + Locality.UNCONSTRAINED)); } @Override diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/jobmaster/slotpool/LocationPreferenceSlotSelectionStrategy.java b/flink-runtime/src/main/java/org/apache/flink/runtime/jobmaster/slotpool/LocationPreferenceSlotSelectionStrategy.java index d5983ee6fa8..f811223d270 100644 --- a/flink-runtime/src/main/java/org/apache/flink/runtime/jobmaster/slotpool/LocationPreferenceSlotSelectionStrategy.java +++ b/flink-runtime/src/main/java/org/apache/flink/runtime/jobmaster/slotpool/LocationPreferenceSlotSelectionStrategy.java @@ -18,10 +18,12 @@ package org.apache.flink.runtime.jobmaster.slotpool; +import org.apache.flink.runtime.clusterframework.types.AllocationID; import org.apache.flink.runtime.clusterframework.types.ResourceID; import org.apache.flink.runtime.clusterframework.types.ResourceProfile; import org.apache.flink.runtime.clusterframework.types.SlotProfile; import org.apache.flink.runtime.jobmanager.scheduler.Locality; +import org.apache.flink.runtime.jobmaster.SlotInfo; import org.apache.flink.runtime.taskmanager.TaskManagerLocation; import org.apache.flink.util.CollectionUtil; @@ -41,12 +43,11 @@ public abstract class LocationPreferenceSlotSelectionStrategy implements SlotSel @Override public Optional<SlotInfoAndLocality> selectBestSlotForProfile( - @Nonnull Collection<SlotInfoWithUtilization> availableSlots, - @Nonnull SlotProfile slotProfile) { + @Nonnull FreeSlotInfoTracker freeSlotInfoTracker, @Nonnull SlotProfile slotProfile) { Collection<TaskManagerLocation> locationPreferences = slotProfile.getPreferredLocations(); - if (availableSlots.isEmpty()) { + if (freeSlotInfoTracker.getAvailableSlots().isEmpty()) { return Optional.empty(); } @@ -54,14 +55,14 @@ public abstract class LocationPreferenceSlotSelectionStrategy implements SlotSel // if we have no location preferences, we can only filter by the additional requirements. return locationPreferences.isEmpty() - ? selectWithoutLocationPreference(availableSlots, resourceProfile) + ? selectWithoutLocationPreference(freeSlotInfoTracker, resourceProfile) : selectWithLocationPreference( - availableSlots, locationPreferences, resourceProfile); + freeSlotInfoTracker, locationPreferences, resourceProfile); } @Nonnull private Optional<SlotInfoAndLocality> selectWithLocationPreference( - @Nonnull Collection<SlotInfoWithUtilization> availableSlots, + @Nonnull FreeSlotInfoTracker freeSlotInfoTracker, @Nonnull Collection<TaskManagerLocation> locationPreferences, @Nonnull ResourceProfile resourceProfile) { @@ -77,11 +78,12 @@ public abstract class LocationPreferenceSlotSelectionStrategy implements SlotSel preferredFQHostNames.merge(locationPreference.getFQDNHostname(), 1, Integer::sum); } - SlotInfoWithUtilization bestCandidate = null; + SlotInfo bestCandidate = null; Locality bestCandidateLocality = Locality.UNKNOWN; double bestCandidateScore = Double.NEGATIVE_INFINITY; - for (SlotInfoWithUtilization candidate : availableSlots) { + for (AllocationID allocationId : freeSlotInfoTracker.getAvailableSlots()) { + SlotInfo candidate = freeSlotInfoTracker.getSlotInfo(allocationId); if (candidate.getResourceProfile().isMatching(resourceProfile)) { @@ -97,7 +99,9 @@ public abstract class LocationPreferenceSlotSelectionStrategy implements SlotSel double candidateScore = calculateCandidateScore( - localWeigh, hostLocalWeigh, candidate::getTaskExecutorUtilization); + localWeigh, + hostLocalWeigh, + () -> freeSlotInfoTracker.getTaskExecutorUtilization(candidate)); if (candidateScore > bestCandidateScore) { bestCandidateScore = candidateScore; bestCandidate = candidate; @@ -117,7 +121,7 @@ public abstract class LocationPreferenceSlotSelectionStrategy implements SlotSel @Nonnull protected abstract Optional<SlotInfoAndLocality> selectWithoutLocationPreference( - @Nonnull Collection<SlotInfoWithUtilization> availableSlots, + @Nonnull FreeSlotInfoTracker freeSlotInfoTracker, @Nonnull ResourceProfile resourceProfile); protected abstract double calculateCandidateScore( diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/jobmaster/slotpool/PhysicalSlotProviderImpl.java b/flink-runtime/src/main/java/org/apache/flink/runtime/jobmaster/slotpool/PhysicalSlotProviderImpl.java index 507aeef3473..508f72bfad2 100644 --- a/flink-runtime/src/main/java/org/apache/flink/runtime/jobmaster/slotpool/PhysicalSlotProviderImpl.java +++ b/flink-runtime/src/main/java/org/apache/flink/runtime/jobmaster/slotpool/PhysicalSlotProviderImpl.java @@ -85,17 +85,20 @@ public class PhysicalSlotProviderImpl implements PhysicalSlotProvider { private Optional<PhysicalSlot> tryAllocateFromAvailable( SlotRequestId slotRequestId, SlotProfile slotProfile) { - Collection<SlotInfoWithUtilization> slotInfoList = slotPool.getAvailableSlotsInformation(); + FreeSlotInfoTracker freeSlotInfoTracker = slotPool.getFreeSlotInfoTracker(); Optional<SlotSelectionStrategy.SlotInfoAndLocality> selectedAvailableSlot = - slotSelectionStrategy.selectBestSlotForProfile(slotInfoList, slotProfile); + slotSelectionStrategy.selectBestSlotForProfile(freeSlotInfoTracker, slotProfile); return selectedAvailableSlot.flatMap( - slotInfoAndLocality -> - slotPool.allocateAvailableSlot( - slotRequestId, - slotInfoAndLocality.getSlotInfo().getAllocationId(), - slotProfile.getPhysicalSlotResourceProfile())); + slotInfoAndLocality -> { + freeSlotInfoTracker.reserveSlot( + slotInfoAndLocality.getSlotInfo().getAllocationId()); + return slotPool.allocateAvailableSlot( + slotRequestId, + slotInfoAndLocality.getSlotInfo().getAllocationId(), + slotProfile.getPhysicalSlotResourceProfile()); + }); } private CompletableFuture<PhysicalSlot> requestNewSlot( diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/jobmaster/slotpool/PreviousAllocationSlotSelectionStrategy.java b/flink-runtime/src/main/java/org/apache/flink/runtime/jobmaster/slotpool/PreviousAllocationSlotSelectionStrategy.java index 66e6b9da4fa..3d8f48e6eb8 100644 --- a/flink-runtime/src/main/java/org/apache/flink/runtime/jobmaster/slotpool/PreviousAllocationSlotSelectionStrategy.java +++ b/flink-runtime/src/main/java/org/apache/flink/runtime/jobmaster/slotpool/PreviousAllocationSlotSelectionStrategy.java @@ -27,11 +27,8 @@ import org.slf4j.LoggerFactory; import javax.annotation.Nonnull; -import java.util.ArrayList; import java.util.Collection; -import java.util.Collections; import java.util.Optional; -import java.util.Set; /** * This class implements a {@link SlotSelectionStrategy} that is based on previous allocations and @@ -51,8 +48,7 @@ public class PreviousAllocationSlotSelectionStrategy implements SlotSelectionStr @Override public Optional<SlotInfoAndLocality> selectBestSlotForProfile( - @Nonnull Collection<SlotInfoWithUtilization> availableSlots, - @Nonnull SlotProfile slotProfile) { + @Nonnull FreeSlotInfoTracker freeSlotInfoTracker, @Nonnull SlotProfile slotProfile) { LOG.debug("Select best slot for profile {}.", slotProfile); @@ -60,39 +56,21 @@ public class PreviousAllocationSlotSelectionStrategy implements SlotSelectionStr // First, if there was a prior allocation try to schedule to the same/old slot if (!priorAllocations.isEmpty()) { - for (SlotInfoWithUtilization availableSlot : availableSlots) { - if (priorAllocations.contains(availableSlot.getAllocationId())) { - return Optional.of(SlotInfoAndLocality.of(availableSlot, Locality.LOCAL)); + for (AllocationID availableSlot : freeSlotInfoTracker.getAvailableSlots()) { + if (priorAllocations.contains(availableSlot)) { + return Optional.of( + SlotInfoAndLocality.of( + freeSlotInfoTracker.getSlotInfo(availableSlot), + Locality.LOCAL)); } } } // Second, select based on location preference, excluding blacklisted allocations - Set<AllocationID> blackListedAllocations = slotProfile.getReservedAllocations(); - Collection<SlotInfoWithUtilization> availableAndAllowedSlots = - computeWithoutBlacklistedSlots(availableSlots, blackListedAllocations); return fallbackSlotSelectionStrategy.selectBestSlotForProfile( - availableAndAllowedSlots, slotProfile); - } - - @Nonnull - private Collection<SlotInfoWithUtilization> computeWithoutBlacklistedSlots( - @Nonnull Collection<SlotInfoWithUtilization> availableSlots, - @Nonnull Set<AllocationID> blacklistedAllocations) { - - if (blacklistedAllocations.isEmpty()) { - return Collections.unmodifiableCollection(availableSlots); - } - - ArrayList<SlotInfoWithUtilization> availableAndAllowedSlots = - new ArrayList<>(availableSlots.size()); - for (SlotInfoWithUtilization availableSlot : availableSlots) { - if (!blacklistedAllocations.contains(availableSlot.getAllocationId())) { - availableAndAllowedSlots.add(availableSlot); - } - } - - return availableAndAllowedSlots; + freeSlotInfoTracker.createNewFreeSlotInfoTrackerWithoutBlockedSlots( + slotProfile.getReservedAllocations()), + slotProfile); } public static PreviousAllocationSlotSelectionStrategy create() { diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/jobmaster/slotpool/SlotPool.java b/flink-runtime/src/main/java/org/apache/flink/runtime/jobmaster/slotpool/SlotPool.java index 1bed443bef4..c9b01df298f 100644 --- a/flink-runtime/src/main/java/org/apache/flink/runtime/jobmaster/slotpool/SlotPool.java +++ b/flink-runtime/src/main/java/org/apache/flink/runtime/jobmaster/slotpool/SlotPool.java @@ -124,6 +124,13 @@ public interface SlotPool extends AllocatedSlotActions, AutoCloseable { */ Collection<SlotInfoWithUtilization> getAvailableSlotsInformation(); + /** + * Returns all free slot tracker. + * + * @return all free slot tracker + */ + FreeSlotInfoTracker getFreeSlotInfoTracker(); + /** * Returns a list of {@link SlotInfo} objects about all slots that are currently allocated in * the slot pool. diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/jobmaster/slotpool/SlotSelectionStrategy.java b/flink-runtime/src/main/java/org/apache/flink/runtime/jobmaster/slotpool/SlotSelectionStrategy.java index 36b001016ae..968ec50791e 100644 --- a/flink-runtime/src/main/java/org/apache/flink/runtime/jobmaster/slotpool/SlotSelectionStrategy.java +++ b/flink-runtime/src/main/java/org/apache/flink/runtime/jobmaster/slotpool/SlotSelectionStrategy.java @@ -24,7 +24,6 @@ import org.apache.flink.runtime.jobmaster.SlotInfo; import javax.annotation.Nonnull; -import java.util.Collection; import java.util.Optional; /** Interface for slot selection strategies. */ @@ -35,14 +34,13 @@ public interface SlotSelectionStrategy { * of available slots and considering the given {@link SlotProfile} that describes the * requirements. * - * @param availableSlots a list of the available slots together with their remaining resources - * to select from. + * @param freeSlotInfoTracker a list of the available slots together with their remaining + * resources to select from. * @param slotProfile a slot profile, describing requirements for the slot selection. * @return the selected slot info with the corresponding locality hint. */ Optional<SlotInfoAndLocality> selectBestSlotForProfile( - @Nonnull Collection<SlotInfoWithUtilization> availableSlots, - @Nonnull SlotProfile slotProfile); + @Nonnull FreeSlotInfoTracker freeSlotInfoTracker, @Nonnull SlotProfile slotProfile); /** This class is a value type that combines a {@link SlotInfo} with a {@link Locality} hint. */ final class SlotInfoAndLocality { diff --git a/flink-runtime/src/test/java/org/apache/flink/runtime/clusterframework/types/LocationPreferenceSlotSelectionStrategyTest.java b/flink-runtime/src/test/java/org/apache/flink/runtime/clusterframework/types/LocationPreferenceSlotSelectionStrategyTest.java index bbae2412d44..c9fc0cd9f79 100644 --- a/flink-runtime/src/test/java/org/apache/flink/runtime/clusterframework/types/LocationPreferenceSlotSelectionStrategyTest.java +++ b/flink-runtime/src/test/java/org/apache/flink/runtime/clusterframework/types/LocationPreferenceSlotSelectionStrategyTest.java @@ -19,8 +19,9 @@ package org.apache.flink.runtime.clusterframework.types; import org.apache.flink.runtime.jobmanager.scheduler.Locality; +import org.apache.flink.runtime.jobmaster.SlotInfo; +import org.apache.flink.runtime.jobmaster.slotpool.FreeSlotInfoTracker; import org.apache.flink.runtime.jobmaster.slotpool.LocationPreferenceSlotSelectionStrategy; -import org.apache.flink.runtime.jobmaster.slotpool.SlotInfoWithUtilization; import org.apache.flink.runtime.jobmaster.slotpool.SlotSelectionStrategy; import org.apache.flink.runtime.taskmanager.TaskManagerLocation; @@ -32,7 +33,6 @@ import java.util.Arrays; import java.util.Collections; import java.util.HashSet; import java.util.Optional; -import java.util.Set; import static org.assertj.core.api.Assertions.assertThat; @@ -166,8 +166,7 @@ class LocationPreferenceSlotSelectionStrategyTest extends SlotSelectionStrategyT } protected static void assertMatchingSlotEqualsToSlotInfo( - Optional<SlotSelectionStrategy.SlotInfoAndLocality> matchingSlot, - SlotInfoWithUtilization slotInfo) { + Optional<SlotSelectionStrategy.SlotInfoAndLocality> matchingSlot, SlotInfo slotInfo) { assertThat(matchingSlot) .hasValueSatisfying( slotInfoAndLocality -> @@ -177,15 +176,15 @@ class LocationPreferenceSlotSelectionStrategyTest extends SlotSelectionStrategyT protected static void assertMatchingSlotLocalityAndInCandidates( Optional<SlotSelectionStrategy.SlotInfoAndLocality> matchingSlot, Locality locality, - Set<SlotInfoWithUtilization> candidates) { + FreeSlotInfoTracker candidates) { assertThat(matchingSlot) .hasValueSatisfying( slotInfoAndLocality -> { assertThat(slotInfoAndLocality.getLocality()).isEqualTo(locality); - assertThat(candidates) + assertThat(candidates.getAvailableSlots()) .anySatisfy( - slotInfoWithUtilization -> - assertThat(slotInfoWithUtilization) + allocationId -> + assertThat(candidates.getSlotInfo(allocationId)) .isEqualTo( slotInfoAndLocality .getSlotInfo())); diff --git a/flink-runtime/src/test/java/org/apache/flink/runtime/clusterframework/types/SlotSelectionStrategyTestBase.java b/flink-runtime/src/test/java/org/apache/flink/runtime/clusterframework/types/SlotSelectionStrategyTestBase.java index 51cb6c96817..805209d4245 100644 --- a/flink-runtime/src/test/java/org/apache/flink/runtime/clusterframework/types/SlotSelectionStrategyTestBase.java +++ b/flink-runtime/src/test/java/org/apache/flink/runtime/clusterframework/types/SlotSelectionStrategyTestBase.java @@ -21,15 +21,16 @@ package org.apache.flink.runtime.clusterframework.types; import org.apache.flink.runtime.executiongraph.utils.SimpleAckingTaskManagerGateway; import org.apache.flink.runtime.instance.SimpleSlotContext; import org.apache.flink.runtime.jobmanager.slots.TaskManagerGateway; -import org.apache.flink.runtime.jobmaster.slotpool.SlotInfoWithUtilization; +import org.apache.flink.runtime.jobmaster.SlotInfo; +import org.apache.flink.runtime.jobmaster.slotpool.FreeSlotInfoTracker; +import org.apache.flink.runtime.jobmaster.slotpool.FreeSlotInfoTrackerTestUtils; import org.apache.flink.runtime.jobmaster.slotpool.SlotSelectionStrategy; import org.apache.flink.runtime.taskmanager.TaskManagerLocation; import java.net.InetAddress; -import java.util.Collections; -import java.util.HashSet; +import java.util.HashMap; +import java.util.Map; import java.util.Optional; -import java.util.Set; /** Test base for {@link SlotSelectionStrategy}. */ abstract class SlotSelectionStrategyTestBase { @@ -56,35 +57,27 @@ abstract class SlotSelectionStrategyTestBase { protected final TaskManagerGateway taskManagerGateway = new SimpleAckingTaskManagerGateway(); - protected final SlotInfoWithUtilization slotInfo1 = - SlotInfoWithUtilization.from( - new SimpleSlotContext(aid1, tml1, 1, taskManagerGateway, resourceProfile), - ignored -> 0.0d); - protected final SlotInfoWithUtilization slotInfo2 = - SlotInfoWithUtilization.from( - new SimpleSlotContext(aid2, tml2, 2, taskManagerGateway, biggerResourceProfile), - ignored -> 0.0d); - protected final SlotInfoWithUtilization slotInfo3 = - SlotInfoWithUtilization.from( - new SimpleSlotContext(aid3, tml3, 3, taskManagerGateway, resourceProfile), - ignored -> 0.0d); - protected final SlotInfoWithUtilization slotInfo4 = - SlotInfoWithUtilization.from( - new SimpleSlotContext(aid4, tml4, 4, taskManagerGateway, resourceProfile), - ignored -> 0.0d); + protected final SlotInfo slotInfo1 = + new SimpleSlotContext(aid1, tml1, 1, taskManagerGateway, resourceProfile); + protected final SlotInfo slotInfo2 = + new SimpleSlotContext(aid2, tml2, 2, taskManagerGateway, biggerResourceProfile); + protected final SlotInfo slotInfo3 = + new SimpleSlotContext(aid3, tml3, 3, taskManagerGateway, resourceProfile); + protected final SlotInfo slotInfo4 = + new SimpleSlotContext(aid4, tml4, 4, taskManagerGateway, resourceProfile); - protected final Set<SlotInfoWithUtilization> candidates = - Collections.unmodifiableSet(createCandidates()); + protected final FreeSlotInfoTracker candidates = createCandidates(); protected SlotSelectionStrategy selectionStrategy; - private Set<SlotInfoWithUtilization> createCandidates() { - Set<SlotInfoWithUtilization> candidates = new HashSet<>(4); - candidates.add(slotInfo1); - candidates.add(slotInfo2); - candidates.add(slotInfo3); - candidates.add(slotInfo4); - return candidates; + private FreeSlotInfoTracker createCandidates() { + Map<AllocationID, SlotInfo> candidates = new HashMap<>(4); + + candidates.put(slotInfo1.getAllocationId(), slotInfo1); + candidates.put(slotInfo2.getAllocationId(), slotInfo2); + candidates.put(slotInfo3.getAllocationId(), slotInfo3); + candidates.put(slotInfo4.getAllocationId(), slotInfo4); + return FreeSlotInfoTrackerTestUtils.createDefaultFreeSlotInfoTracker(candidates); } protected Optional<SlotSelectionStrategy.SlotInfoAndLocality> runMatching( diff --git a/flink-runtime/src/test/java/org/apache/flink/runtime/jobmaster/JobMasterTest.java b/flink-runtime/src/test/java/org/apache/flink/runtime/jobmaster/JobMasterTest.java index 00ef90ac807..04dabcbb86e 100644 --- a/flink-runtime/src/test/java/org/apache/flink/runtime/jobmaster/JobMasterTest.java +++ b/flink-runtime/src/test/java/org/apache/flink/runtime/jobmaster/JobMasterTest.java @@ -81,6 +81,8 @@ import org.apache.flink.runtime.jobgraph.tasks.AbstractInvokable; import org.apache.flink.runtime.jobmanager.PartitionProducerDisposedException; import org.apache.flink.runtime.jobmanager.slots.TaskManagerGateway; import org.apache.flink.runtime.jobmaster.slotpool.DeclarativeSlotPoolFactory; +import org.apache.flink.runtime.jobmaster.slotpool.FreeSlotInfoTracker; +import org.apache.flink.runtime.jobmaster.slotpool.FreeSlotInfoTrackerTestUtils; import org.apache.flink.runtime.jobmaster.slotpool.PhysicalSlot; import org.apache.flink.runtime.jobmaster.slotpool.SlotInfoWithUtilization; import org.apache.flink.runtime.jobmaster.slotpool.SlotPool; @@ -583,6 +585,15 @@ class JobMasterTest { return Collections.unmodifiableCollection(allSlotInfos); } + @Override + public FreeSlotInfoTracker getFreeSlotInfoTracker() { + Map<AllocationID, SlotInfo> freeSlots = + registeredSlots.values().stream() + .flatMap(Collection::stream) + .collect(Collectors.toMap(SlotInfo::getAllocationId, s -> s)); + return FreeSlotInfoTrackerTestUtils.createDefaultFreeSlotInfoTracker(freeSlots); + } + @Override public Collection<SlotInfo> getAllocatedSlotsInformation() { return Collections.emptyList(); diff --git a/flink-runtime/src/test/java/org/apache/flink/runtime/jobmaster/slotpool/TestingDeclarativeSlotPool.java b/flink-runtime/src/test/java/org/apache/flink/runtime/jobmaster/slotpool/TestingDeclarativeSlotPool.java index b3c49bc8cb7..4f12a42956b 100644 --- a/flink-runtime/src/test/java/org/apache/flink/runtime/jobmaster/slotpool/TestingDeclarativeSlotPool.java +++ b/flink-runtime/src/test/java/org/apache/flink/runtime/jobmaster/slotpool/TestingDeclarativeSlotPool.java @@ -66,6 +66,8 @@ final class TestingDeclarativeSlotPool implements DeclarativeSlotPool { private final Supplier<Collection<SlotInfoWithUtilization>> getFreeSlotsInformationSupplier; + private final Supplier<FreeSlotInfoTracker> getFreeSlotInfoTrackerSupplier; + private final Supplier<Collection<? extends SlotInfo>> getAllSlotsInformationSupplier; private final BiFunction<ResourceID, Exception, ResourceCounter> releaseSlotsFunction; @@ -104,6 +106,7 @@ final class TestingDeclarativeSlotPool implements DeclarativeSlotPool { Collection<SlotOffer>> registerSlotsFunction, Supplier<Collection<SlotInfoWithUtilization>> getFreeSlotsInformationSupplier, + Supplier<FreeSlotInfoTracker> getFreeSlotInfoTrackerSupplier, Supplier<Collection<? extends SlotInfo>> getAllSlotsInformationSupplier, BiFunction<ResourceID, Exception, ResourceCounter> releaseSlotsFunction, BiFunction<AllocationID, Exception, ResourceCounter> releaseSlotFunction, @@ -119,6 +122,7 @@ final class TestingDeclarativeSlotPool implements DeclarativeSlotPool { this.offerSlotsFunction = offerSlotsFunction; this.registerSlotsFunction = registerSlotsFunction; this.getFreeSlotsInformationSupplier = getFreeSlotsInformationSupplier; + this.getFreeSlotInfoTrackerSupplier = getFreeSlotInfoTrackerSupplier; this.getAllSlotsInformationSupplier = getAllSlotsInformationSupplier; this.releaseSlotsFunction = releaseSlotsFunction; this.releaseSlotFunction = releaseSlotFunction; @@ -175,6 +179,11 @@ final class TestingDeclarativeSlotPool implements DeclarativeSlotPool { return getFreeSlotsInformationSupplier.get(); } + @Override + public FreeSlotInfoTracker getFreeSlotInfoTracker() { + return getFreeSlotInfoTrackerSupplier.get(); + } + @Override public Collection<? extends SlotInfo> getAllSlotsInformation() { return getAllSlotsInformationSupplier.get(); diff --git a/flink-runtime/src/test/java/org/apache/flink/runtime/jobmaster/slotpool/TestingDeclarativeSlotPoolBuilder.java b/flink-runtime/src/test/java/org/apache/flink/runtime/jobmaster/slotpool/TestingDeclarativeSlotPoolBuilder.java index 99a02324b29..fb27a5797e1 100644 --- a/flink-runtime/src/test/java/org/apache/flink/runtime/jobmaster/slotpool/TestingDeclarativeSlotPoolBuilder.java +++ b/flink-runtime/src/test/java/org/apache/flink/runtime/jobmaster/slotpool/TestingDeclarativeSlotPoolBuilder.java @@ -58,6 +58,8 @@ public class TestingDeclarativeSlotPoolBuilder { Collections::emptyList; private Supplier<Collection<? extends SlotInfo>> getAllSlotsInformationSupplier = Collections::emptyList; + private Supplier<FreeSlotInfoTracker> getFreeSlotInfoTrackerSupplier = + () -> TestingFreeSlotInfoTracker.newBuilder().build(); private BiFunction<ResourceID, Exception, ResourceCounter> releaseSlotsFunction = (ignoredA, ignoredB) -> ResourceCounter.empty(); private BiFunction<AllocationID, Exception, ResourceCounter> releaseSlotFunction = @@ -133,6 +135,12 @@ public class TestingDeclarativeSlotPoolBuilder { return this; } + public TestingDeclarativeSlotPoolBuilder setGetFreeSlotInfoTrackerSupplier( + Supplier<FreeSlotInfoTracker> getFreeSlotInfoTrackerSupplier) { + this.getFreeSlotInfoTrackerSupplier = getFreeSlotInfoTrackerSupplier; + return this; + } + public TestingDeclarativeSlotPoolBuilder setGetAllSlotsInformationSupplier( Supplier<Collection<? extends SlotInfo>> getAllSlotsInformationSupplier) { this.getAllSlotsInformationSupplier = getAllSlotsInformationSupplier; @@ -190,6 +198,7 @@ public class TestingDeclarativeSlotPoolBuilder { offerSlotsFunction, registerSlotsFunction, getFreeSlotsInformationSupplier, + getFreeSlotInfoTrackerSupplier, getAllSlotsInformationSupplier, releaseSlotsFunction, releaseSlotFunction,
