This is an automated email from the ASF dual-hosted git repository. zhuzh pushed a commit to branch release-1.14 in repository https://gitbox.apache.org/repos/asf/flink.git
commit 3eae179cd26407ec12c1f9c3cba22fc562ecdc3a Author: Zhu Zhu <[email protected]> AuthorDate: Fri Mar 12 20:08:53 2021 +0800 [hotfix][runtime] Rename `previousExecutionGraphAllocations` in SlotProfile to `reservedAllocations` to avoid confusion --- .../clusterframework/types/SlotProfile.java | 24 ++++++++++++---------- .../PreviousAllocationSlotSelectionStrategy.java | 3 +-- .../MergingSharedSlotProfileRetrieverTest.java | 4 ++-- 3 files changed, 16 insertions(+), 15 deletions(-) diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/clusterframework/types/SlotProfile.java b/flink-runtime/src/main/java/org/apache/flink/runtime/clusterframework/types/SlotProfile.java index d818c67..e57a089 100644 --- a/flink-runtime/src/main/java/org/apache/flink/runtime/clusterframework/types/SlotProfile.java +++ b/flink-runtime/src/main/java/org/apache/flink/runtime/clusterframework/types/SlotProfile.java @@ -19,6 +19,7 @@ package org.apache.flink.runtime.clusterframework.types; import org.apache.flink.runtime.jobmaster.SlotContext; +import org.apache.flink.runtime.jobmaster.slotpool.PreviousAllocationSlotSelectionStrategy; import org.apache.flink.runtime.taskmanager.TaskManagerLocation; import java.util.Collection; @@ -47,21 +48,21 @@ public class SlotProfile { /** This contains desired allocation ids of the slot. */ private final Collection<AllocationID> preferredAllocations; - /** This contains all prior allocation ids from the whole execution graph. */ - private final Set<AllocationID> previousExecutionGraphAllocations; + /** This contains all reserved allocation ids from the whole execution graph. */ + private final Set<AllocationID> reservedAllocations; private SlotProfile( final ResourceProfile taskResourceProfile, final ResourceProfile physicalSlotResourceProfile, final Collection<TaskManagerLocation> preferredLocations, final Collection<AllocationID> preferredAllocations, - final Set<AllocationID> previousExecutionGraphAllocations) { + final Set<AllocationID> reservedAllocations) { this.taskResourceProfile = checkNotNull(taskResourceProfile); this.physicalSlotResourceProfile = checkNotNull(physicalSlotResourceProfile); this.preferredLocations = checkNotNull(preferredLocations); this.preferredAllocations = checkNotNull(preferredAllocations); - this.previousExecutionGraphAllocations = checkNotNull(previousExecutionGraphAllocations); + this.reservedAllocations = checkNotNull(reservedAllocations); } /** Returns the desired resource profile for the task slot. */ @@ -85,12 +86,14 @@ public class SlotProfile { } /** - * Returns a set of all previous allocation ids from the execution graph. + * Returns a set of all reserved allocation ids from the execution graph. It will used by {@link + * PreviousAllocationSlotSelectionStrategy} to support local recovery. In this case, a vertex + * cannot take an reserved allocation unless it exactly prefers that allocation. * * <p>This is optional and can be empty if unused. */ - public Set<AllocationID> getPreviousExecutionGraphAllocations() { - return previousExecutionGraphAllocations; + public Set<AllocationID> getReservedAllocations() { + return reservedAllocations; } /** @@ -102,8 +105,7 @@ public class SlotProfile { * host this task slot * @param preferredLocations specifying the preferred locations * @param priorAllocations specifying the prior allocations - * @param previousExecutionGraphAllocations specifying all prior allocation ids from the whole - * execution graph + * @param reservedAllocations specifying all reserved allocations * @return Slot profile with all the given information */ public static SlotProfile priorAllocation( @@ -111,13 +113,13 @@ public class SlotProfile { final ResourceProfile physicalSlotResourceProfile, final Collection<TaskManagerLocation> preferredLocations, final Collection<AllocationID> priorAllocations, - final Set<AllocationID> previousExecutionGraphAllocations) { + final Set<AllocationID> reservedAllocations) { return new SlotProfile( taskResourceProfile, physicalSlotResourceProfile, preferredLocations, priorAllocations, - previousExecutionGraphAllocations); + reservedAllocations); } } diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/jobmaster/slotpool/PreviousAllocationSlotSelectionStrategy.java b/flink-runtime/src/main/java/org/apache/flink/runtime/jobmaster/slotpool/PreviousAllocationSlotSelectionStrategy.java index a5c21db..6a8a832 100644 --- a/flink-runtime/src/main/java/org/apache/flink/runtime/jobmaster/slotpool/PreviousAllocationSlotSelectionStrategy.java +++ b/flink-runtime/src/main/java/org/apache/flink/runtime/jobmaster/slotpool/PreviousAllocationSlotSelectionStrategy.java @@ -61,8 +61,7 @@ public class PreviousAllocationSlotSelectionStrategy implements SlotSelectionStr } // Second, select based on location preference, excluding blacklisted allocations - Set<AllocationID> blackListedAllocations = - slotProfile.getPreviousExecutionGraphAllocations(); + Set<AllocationID> blackListedAllocations = slotProfile.getReservedAllocations(); Collection<SlotInfoAndResources> availableAndAllowedSlots = computeWithoutBlacklistedSlots(availableSlots, blackListedAllocations); return fallbackSlotSelectionStrategy.selectBestSlotForProfile( diff --git a/flink-runtime/src/test/java/org/apache/flink/runtime/scheduler/MergingSharedSlotProfileRetrieverTest.java b/flink-runtime/src/test/java/org/apache/flink/runtime/scheduler/MergingSharedSlotProfileRetrieverTest.java index cb07728..7102a28 100644 --- a/flink-runtime/src/test/java/org/apache/flink/runtime/scheduler/MergingSharedSlotProfileRetrieverTest.java +++ b/flink-runtime/src/test/java/org/apache/flink/runtime/scheduler/MergingSharedSlotProfileRetrieverTest.java @@ -72,7 +72,7 @@ public class MergingSharedSlotProfileRetrieverTest { assertThat(slotProfile.getPhysicalSlotResourceProfile(), is(ResourceProfile.ZERO)); assertThat(slotProfile.getPreferredLocations(), hasSize(0)); assertThat(slotProfile.getPreferredAllocations(), hasSize(0)); - assertThat(slotProfile.getPreviousExecutionGraphAllocations(), hasSize(0)); + assertThat(slotProfile.getReservedAllocations(), hasSize(0)); } @Test @@ -147,7 +147,7 @@ public class MergingSharedSlotProfileRetrieverTest { slotProfile.getPreferredAllocations(), containsInAnyOrder(prevAllocationID1, prevAllocationID2)); assertThat( - slotProfile.getPreviousExecutionGraphAllocations(), + slotProfile.getReservedAllocations(), containsInAnyOrder(prevAllocationIDs.toArray())); }
