This is an automated email from the ASF dual-hosted git repository. trohrmann pushed a commit to branch master in repository https://gitbox.apache.org/repos/asf/flink.git
commit 2f4813d77ade0544c39d8d9edd58606d3b21bc7b Author: Till Rohrmann <[email protected]> AuthorDate: Sun Mar 14 17:58:08 2021 +0100 [FLINK-21602] Make SlotAllocator.tryReserveSlots failable In order to prepare the SlotAllocator to support failing SlotAllocator.tryReserveSlots calls, this commit changes its return type to Optional<? extends ReservedSlots>. --- .../scheduler/adaptive/AdaptiveScheduler.java | 15 +++++--- .../ParallelismAndResourceAssignments.java | 11 +++--- .../adaptive/allocator/ReservedSlots.java | 42 ++++++++++++++++++++++ .../adaptive/allocator/SlotAllocator.java | 18 ++++------ .../allocator/SlotSharingSlotAllocator.java | 5 ++- .../adaptive/allocator/VertexParallelism.java | 2 +- .../allocator/SlotSharingSlotAllocatorTest.java | 9 +++-- 7 files changed, 73 insertions(+), 29 deletions(-) diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/scheduler/adaptive/AdaptiveScheduler.java b/flink-runtime/src/main/java/org/apache/flink/runtime/scheduler/adaptive/AdaptiveScheduler.java index f420ce5..9b6799d 100644 --- a/flink-runtime/src/main/java/org/apache/flink/runtime/scheduler/adaptive/AdaptiveScheduler.java +++ b/flink-runtime/src/main/java/org/apache/flink/runtime/scheduler/adaptive/AdaptiveScheduler.java @@ -94,12 +94,12 @@ import org.apache.flink.runtime.scheduler.OperatorCoordinatorHandler; import org.apache.flink.runtime.scheduler.SchedulerNG; import org.apache.flink.runtime.scheduler.SchedulerUtils; import org.apache.flink.runtime.scheduler.UpdateSchedulerNgOnInternalFailuresListener; +import org.apache.flink.runtime.scheduler.adaptive.allocator.ReservedSlots; import org.apache.flink.runtime.scheduler.adaptive.allocator.SlotAllocator; import org.apache.flink.runtime.scheduler.adaptive.allocator.SlotSharingSlotAllocator; import org.apache.flink.runtime.scheduler.adaptive.allocator.VertexParallelism; import org.apache.flink.runtime.scheduler.adaptive.scalingpolicy.ReactiveScaleUpController; import org.apache.flink.runtime.scheduler.adaptive.scalingpolicy.ScaleUpController; -import org.apache.flink.runtime.scheduler.strategy.ExecutionVertexID; import org.apache.flink.runtime.shuffle.ShuffleMaster; import org.apache.flink.runtime.state.KeyGroupRange; import org.apache.flink.runtime.util.ResourceCounter; @@ -121,7 +121,6 @@ import java.time.Duration; import java.util.Collection; import java.util.HashSet; import java.util.Iterator; -import java.util.Map; import java.util.Optional; import java.util.concurrent.CompletableFuture; import java.util.concurrent.CompletionException; @@ -594,11 +593,17 @@ public class AdaptiveScheduler jobInformation.getJobID(), "Not enough resources available for scheduling.")); - final Map<ExecutionVertexID, LogicalSlot> slotAssignments = - slotAllocator.reserveResources(vertexParallelism); + final ReservedSlots reservedSlots = + slotAllocator + .tryReserveResources(vertexParallelism) + .orElseThrow( + () -> + new JobExecutionException( + jobInformation.getJobID(), + "Could not reserve all required slots.")); return new ParallelismAndResourceAssignments( - slotAssignments, vertexParallelism.getMaxParallelismForVertices()); + reservedSlots, vertexParallelism.getMaxParallelismForVertices()); } @Override diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/scheduler/adaptive/ParallelismAndResourceAssignments.java b/flink-runtime/src/main/java/org/apache/flink/runtime/scheduler/adaptive/ParallelismAndResourceAssignments.java index 3c4b2dd..febf3aa 100644 --- a/flink-runtime/src/main/java/org/apache/flink/runtime/scheduler/adaptive/ParallelismAndResourceAssignments.java +++ b/flink-runtime/src/main/java/org/apache/flink/runtime/scheduler/adaptive/ParallelismAndResourceAssignments.java @@ -20,6 +20,7 @@ package org.apache.flink.runtime.scheduler.adaptive; import org.apache.flink.runtime.jobgraph.JobVertexID; import org.apache.flink.runtime.jobmaster.LogicalSlot; +import org.apache.flink.runtime.scheduler.adaptive.allocator.ReservedSlots; import org.apache.flink.runtime.scheduler.strategy.ExecutionVertexID; import org.apache.flink.util.Preconditions; @@ -27,14 +28,13 @@ import java.util.Map; /** Assignment of slots to execution vertices. */ public final class ParallelismAndResourceAssignments { - private final Map<ExecutionVertexID, ? extends LogicalSlot> assignedSlots; + private final ReservedSlots reservedSlots; private final Map<JobVertexID, Integer> parallelismPerJobVertex; public ParallelismAndResourceAssignments( - Map<ExecutionVertexID, ? extends LogicalSlot> assignedSlots, - Map<JobVertexID, Integer> parallelismPerJobVertex) { - this.assignedSlots = assignedSlots; + ReservedSlots reservedSlots, Map<JobVertexID, Integer> parallelismPerJobVertex) { + this.reservedSlots = reservedSlots; this.parallelismPerJobVertex = parallelismPerJobVertex; } @@ -44,7 +44,6 @@ public final class ParallelismAndResourceAssignments { } public LogicalSlot getAssignedSlot(ExecutionVertexID executionVertexId) { - Preconditions.checkState(assignedSlots.containsKey(executionVertexId)); - return assignedSlots.get(executionVertexId); + return reservedSlots.getSlotFor(executionVertexId); } } diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/scheduler/adaptive/allocator/ReservedSlots.java b/flink-runtime/src/main/java/org/apache/flink/runtime/scheduler/adaptive/allocator/ReservedSlots.java new file mode 100644 index 0000000..0bb9bbd --- /dev/null +++ b/flink-runtime/src/main/java/org/apache/flink/runtime/scheduler/adaptive/allocator/ReservedSlots.java @@ -0,0 +1,42 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.flink.runtime.scheduler.adaptive.allocator; + +import org.apache.flink.runtime.jobmaster.LogicalSlot; +import org.apache.flink.runtime.scheduler.strategy.ExecutionVertexID; +import org.apache.flink.util.Preconditions; + +import java.util.Map; + +/** Container for the set of reserved slots for {@link ExecutionVertexID}. */ +public final class ReservedSlots { + private final Map<ExecutionVertexID, LogicalSlot> slotPerExecutionVertex; + + private ReservedSlots(Map<ExecutionVertexID, LogicalSlot> slotPerExecutionVertex) { + this.slotPerExecutionVertex = slotPerExecutionVertex; + } + + public LogicalSlot getSlotFor(ExecutionVertexID executionVertexId) { + return Preconditions.checkNotNull(slotPerExecutionVertex.get(executionVertexId)); + } + + public static ReservedSlots create(Map<ExecutionVertexID, LogicalSlot> assignedSlots) { + return new ReservedSlots(assignedSlots); + } +} diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/scheduler/adaptive/allocator/SlotAllocator.java b/flink-runtime/src/main/java/org/apache/flink/runtime/scheduler/adaptive/allocator/SlotAllocator.java index 2519db8..9112b52 100644 --- a/flink-runtime/src/main/java/org/apache/flink/runtime/scheduler/adaptive/allocator/SlotAllocator.java +++ b/flink-runtime/src/main/java/org/apache/flink/runtime/scheduler/adaptive/allocator/SlotAllocator.java @@ -17,13 +17,10 @@ package org.apache.flink.runtime.scheduler.adaptive.allocator; -import org.apache.flink.runtime.jobmaster.LogicalSlot; import org.apache.flink.runtime.jobmaster.SlotInfo; -import org.apache.flink.runtime.scheduler.strategy.ExecutionVertexID; import org.apache.flink.runtime.util.ResourceCounter; import java.util.Collection; -import java.util.Map; import java.util.Optional; /** Component for calculating the slot requirements and mapping of vertices to slots. */ @@ -45,12 +42,8 @@ public interface SlotAllocator { * <p>If a {@link VertexParallelism} is returned then it covers all vertices contained in the * given job information. * - * <p>A returned {@link VertexParallelism} should be directly consumed afterwards (by either - * discarding it or calling {@link #reserveResources(VertexParallelism)}, as there is no - * guarantee that the assignment remains valid over time (because slots can be lost). - * * <p>Implementations of this method must be side-effect free. There is no guarantee that the - * result of this method is ever passed to {@link #reserveResources(VertexParallelism)}. + * result of this method is ever passed to {@link #tryReserveResources(VertexParallelism)}. * * @param jobInformation information about the job graph * @param slots slots to consider for determining the parallelism @@ -62,10 +55,13 @@ public interface SlotAllocator { JobInformation jobInformation, Collection<? extends SlotInfo> slots); /** - * Reserves slots according to the given assignment. + * Reserves slots according to the given assignment if possible. If the underlying set of + * resources has changed and the reservation with respect to vertexParallelism is no longer + * possible, then this method returns {@link Optional#empty()}. * * @param vertexParallelism information on how slots should be assigned to the slots - * @return mapping of vertices to slots + * @return Set of reserved slots if the reservation was successful; otherwise {@link + * Optional#empty()} */ - Map<ExecutionVertexID, LogicalSlot> reserveResources(VertexParallelism vertexParallelism); + Optional<ReservedSlots> tryReserveResources(VertexParallelism vertexParallelism); } diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/scheduler/adaptive/allocator/SlotSharingSlotAllocator.java b/flink-runtime/src/main/java/org/apache/flink/runtime/scheduler/adaptive/allocator/SlotSharingSlotAllocator.java index 26853cf..da5f7a3 100644 --- a/flink-runtime/src/main/java/org/apache/flink/runtime/scheduler/adaptive/allocator/SlotSharingSlotAllocator.java +++ b/flink-runtime/src/main/java/org/apache/flink/runtime/scheduler/adaptive/allocator/SlotSharingSlotAllocator.java @@ -148,8 +148,7 @@ public class SlotSharingSlotAllocator implements SlotAllocator { } @Override - public Map<ExecutionVertexID, LogicalSlot> reserveResources( - VertexParallelism vertexParallelism) { + public Optional<ReservedSlots> tryReserveResources(VertexParallelism vertexParallelism) { Preconditions.checkArgument( vertexParallelism instanceof VertexParallelismWithSlotSharing, String.format( @@ -176,7 +175,7 @@ public class SlotSharingSlotAllocator implements SlotAllocator { } } - return assignedSlots; + return Optional.of(ReservedSlots.create(assignedSlots)); } private SharedSlot reserveSharedSlot(SlotInfo slotInfo) { diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/scheduler/adaptive/allocator/VertexParallelism.java b/flink-runtime/src/main/java/org/apache/flink/runtime/scheduler/adaptive/allocator/VertexParallelism.java index 99e8fd6..d4be287 100644 --- a/flink-runtime/src/main/java/org/apache/flink/runtime/scheduler/adaptive/allocator/VertexParallelism.java +++ b/flink-runtime/src/main/java/org/apache/flink/runtime/scheduler/adaptive/allocator/VertexParallelism.java @@ -27,7 +27,7 @@ import java.util.Map; * the parallelism each vertex could be scheduled with. * * <p>{@link SlotAllocator} implementations may encode additional information to be used in {@link - * SlotAllocator#reserveResources(VertexParallelism)}. + * SlotAllocator#tryReserveResources(VertexParallelism)}. */ public interface VertexParallelism { Map<JobVertexID, Integer> getMaxParallelismForVertices(); diff --git a/flink-runtime/src/test/java/org/apache/flink/runtime/scheduler/adaptive/allocator/SlotSharingSlotAllocatorTest.java b/flink-runtime/src/test/java/org/apache/flink/runtime/scheduler/adaptive/allocator/SlotSharingSlotAllocatorTest.java index 80c0618..b0e3716 100644 --- a/flink-runtime/src/test/java/org/apache/flink/runtime/scheduler/adaptive/allocator/SlotSharingSlotAllocatorTest.java +++ b/flink-runtime/src/test/java/org/apache/flink/runtime/scheduler/adaptive/allocator/SlotSharingSlotAllocatorTest.java @@ -147,8 +147,11 @@ public class SlotSharingSlotAllocatorTest extends TestLogger { final VertexParallelismWithSlotSharing slotAssignments = slotAllocator.determineParallelism(jobInformation, getSlots(50)).get(); - final Map<ExecutionVertexID, LogicalSlot> assignedResources = - slotAllocator.reserveResources(slotAssignments); + final ReservedSlots reservedSlots = + slotAllocator + .tryReserveResources(slotAssignments) + .orElseThrow( + () -> new RuntimeException("Expected that reservation succeeds.")); final Map<ExecutionVertexID, SlotInfo> expectedAssignments = new HashMap<>(); for (SlotSharingSlotAllocator.ExecutionSlotSharingGroupAndSlot assignment : @@ -161,7 +164,7 @@ public class SlotSharingSlotAllocatorTest extends TestLogger { for (Map.Entry<ExecutionVertexID, SlotInfo> expectedAssignment : expectedAssignments.entrySet()) { - final LogicalSlot assignedSlot = assignedResources.get(expectedAssignment.getKey()); + final LogicalSlot assignedSlot = reservedSlots.getSlotFor(expectedAssignment.getKey()); final SlotInfo backingSlot = expectedAssignment.getValue();
