This is an automated email from the ASF dual-hosted git repository.

trohrmann pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/flink.git

commit 2f4813d77ade0544c39d8d9edd58606d3b21bc7b
Author: Till Rohrmann <[email protected]>
AuthorDate: Sun Mar 14 17:58:08 2021 +0100

    [FLINK-21602] Make SlotAllocator.tryReserveSlots failable
    
    In order to prepare the SlotAllocator to support failing 
SlotAllocator.tryReserveSlots
    calls, this commit changes its return type to Optional<? extends 
ReservedSlots>.
---
 .../scheduler/adaptive/AdaptiveScheduler.java      | 15 +++++---
 .../ParallelismAndResourceAssignments.java         | 11 +++---
 .../adaptive/allocator/ReservedSlots.java          | 42 ++++++++++++++++++++++
 .../adaptive/allocator/SlotAllocator.java          | 18 ++++------
 .../allocator/SlotSharingSlotAllocator.java        |  5 ++-
 .../adaptive/allocator/VertexParallelism.java      |  2 +-
 .../allocator/SlotSharingSlotAllocatorTest.java    |  9 +++--
 7 files changed, 73 insertions(+), 29 deletions(-)

diff --git 
a/flink-runtime/src/main/java/org/apache/flink/runtime/scheduler/adaptive/AdaptiveScheduler.java
 
b/flink-runtime/src/main/java/org/apache/flink/runtime/scheduler/adaptive/AdaptiveScheduler.java
index f420ce5..9b6799d 100644
--- 
a/flink-runtime/src/main/java/org/apache/flink/runtime/scheduler/adaptive/AdaptiveScheduler.java
+++ 
b/flink-runtime/src/main/java/org/apache/flink/runtime/scheduler/adaptive/AdaptiveScheduler.java
@@ -94,12 +94,12 @@ import 
org.apache.flink.runtime.scheduler.OperatorCoordinatorHandler;
 import org.apache.flink.runtime.scheduler.SchedulerNG;
 import org.apache.flink.runtime.scheduler.SchedulerUtils;
 import 
org.apache.flink.runtime.scheduler.UpdateSchedulerNgOnInternalFailuresListener;
+import org.apache.flink.runtime.scheduler.adaptive.allocator.ReservedSlots;
 import org.apache.flink.runtime.scheduler.adaptive.allocator.SlotAllocator;
 import 
org.apache.flink.runtime.scheduler.adaptive.allocator.SlotSharingSlotAllocator;
 import org.apache.flink.runtime.scheduler.adaptive.allocator.VertexParallelism;
 import 
org.apache.flink.runtime.scheduler.adaptive.scalingpolicy.ReactiveScaleUpController;
 import 
org.apache.flink.runtime.scheduler.adaptive.scalingpolicy.ScaleUpController;
-import org.apache.flink.runtime.scheduler.strategy.ExecutionVertexID;
 import org.apache.flink.runtime.shuffle.ShuffleMaster;
 import org.apache.flink.runtime.state.KeyGroupRange;
 import org.apache.flink.runtime.util.ResourceCounter;
@@ -121,7 +121,6 @@ import java.time.Duration;
 import java.util.Collection;
 import java.util.HashSet;
 import java.util.Iterator;
-import java.util.Map;
 import java.util.Optional;
 import java.util.concurrent.CompletableFuture;
 import java.util.concurrent.CompletionException;
@@ -594,11 +593,17 @@ public class AdaptiveScheduler
                                                 jobInformation.getJobID(),
                                                 "Not enough resources 
available for scheduling."));
 
-        final Map<ExecutionVertexID, LogicalSlot> slotAssignments =
-                slotAllocator.reserveResources(vertexParallelism);
+        final ReservedSlots reservedSlots =
+                slotAllocator
+                        .tryReserveResources(vertexParallelism)
+                        .orElseThrow(
+                                () ->
+                                        new JobExecutionException(
+                                                jobInformation.getJobID(),
+                                                "Could not reserve all 
required slots."));
 
         return new ParallelismAndResourceAssignments(
-                slotAssignments, 
vertexParallelism.getMaxParallelismForVertices());
+                reservedSlots, 
vertexParallelism.getMaxParallelismForVertices());
     }
 
     @Override
diff --git 
a/flink-runtime/src/main/java/org/apache/flink/runtime/scheduler/adaptive/ParallelismAndResourceAssignments.java
 
b/flink-runtime/src/main/java/org/apache/flink/runtime/scheduler/adaptive/ParallelismAndResourceAssignments.java
index 3c4b2dd..febf3aa 100644
--- 
a/flink-runtime/src/main/java/org/apache/flink/runtime/scheduler/adaptive/ParallelismAndResourceAssignments.java
+++ 
b/flink-runtime/src/main/java/org/apache/flink/runtime/scheduler/adaptive/ParallelismAndResourceAssignments.java
@@ -20,6 +20,7 @@ package org.apache.flink.runtime.scheduler.adaptive;
 
 import org.apache.flink.runtime.jobgraph.JobVertexID;
 import org.apache.flink.runtime.jobmaster.LogicalSlot;
+import org.apache.flink.runtime.scheduler.adaptive.allocator.ReservedSlots;
 import org.apache.flink.runtime.scheduler.strategy.ExecutionVertexID;
 import org.apache.flink.util.Preconditions;
 
@@ -27,14 +28,13 @@ import java.util.Map;
 
 /** Assignment of slots to execution vertices. */
 public final class ParallelismAndResourceAssignments {
-    private final Map<ExecutionVertexID, ? extends LogicalSlot> assignedSlots;
+    private final ReservedSlots reservedSlots;
 
     private final Map<JobVertexID, Integer> parallelismPerJobVertex;
 
     public ParallelismAndResourceAssignments(
-            Map<ExecutionVertexID, ? extends LogicalSlot> assignedSlots,
-            Map<JobVertexID, Integer> parallelismPerJobVertex) {
-        this.assignedSlots = assignedSlots;
+            ReservedSlots reservedSlots, Map<JobVertexID, Integer> 
parallelismPerJobVertex) {
+        this.reservedSlots = reservedSlots;
         this.parallelismPerJobVertex = parallelismPerJobVertex;
     }
 
@@ -44,7 +44,6 @@ public final class ParallelismAndResourceAssignments {
     }
 
     public LogicalSlot getAssignedSlot(ExecutionVertexID executionVertexId) {
-        Preconditions.checkState(assignedSlots.containsKey(executionVertexId));
-        return assignedSlots.get(executionVertexId);
+        return reservedSlots.getSlotFor(executionVertexId);
     }
 }
diff --git 
a/flink-runtime/src/main/java/org/apache/flink/runtime/scheduler/adaptive/allocator/ReservedSlots.java
 
b/flink-runtime/src/main/java/org/apache/flink/runtime/scheduler/adaptive/allocator/ReservedSlots.java
new file mode 100644
index 0000000..0bb9bbd
--- /dev/null
+++ 
b/flink-runtime/src/main/java/org/apache/flink/runtime/scheduler/adaptive/allocator/ReservedSlots.java
@@ -0,0 +1,42 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.runtime.scheduler.adaptive.allocator;
+
+import org.apache.flink.runtime.jobmaster.LogicalSlot;
+import org.apache.flink.runtime.scheduler.strategy.ExecutionVertexID;
+import org.apache.flink.util.Preconditions;
+
+import java.util.Map;
+
+/** Container for the set of reserved slots for {@link ExecutionVertexID}. */
+public final class ReservedSlots {
+    private final Map<ExecutionVertexID, LogicalSlot> slotPerExecutionVertex;
+
+    private ReservedSlots(Map<ExecutionVertexID, LogicalSlot> 
slotPerExecutionVertex) {
+        this.slotPerExecutionVertex = slotPerExecutionVertex;
+    }
+
+    public LogicalSlot getSlotFor(ExecutionVertexID executionVertexId) {
+        return 
Preconditions.checkNotNull(slotPerExecutionVertex.get(executionVertexId));
+    }
+
+    public static ReservedSlots create(Map<ExecutionVertexID, LogicalSlot> 
assignedSlots) {
+        return new ReservedSlots(assignedSlots);
+    }
+}
diff --git 
a/flink-runtime/src/main/java/org/apache/flink/runtime/scheduler/adaptive/allocator/SlotAllocator.java
 
b/flink-runtime/src/main/java/org/apache/flink/runtime/scheduler/adaptive/allocator/SlotAllocator.java
index 2519db8..9112b52 100644
--- 
a/flink-runtime/src/main/java/org/apache/flink/runtime/scheduler/adaptive/allocator/SlotAllocator.java
+++ 
b/flink-runtime/src/main/java/org/apache/flink/runtime/scheduler/adaptive/allocator/SlotAllocator.java
@@ -17,13 +17,10 @@
 
 package org.apache.flink.runtime.scheduler.adaptive.allocator;
 
-import org.apache.flink.runtime.jobmaster.LogicalSlot;
 import org.apache.flink.runtime.jobmaster.SlotInfo;
-import org.apache.flink.runtime.scheduler.strategy.ExecutionVertexID;
 import org.apache.flink.runtime.util.ResourceCounter;
 
 import java.util.Collection;
-import java.util.Map;
 import java.util.Optional;
 
 /** Component for calculating the slot requirements and mapping of vertices to 
slots. */
@@ -45,12 +42,8 @@ public interface SlotAllocator {
      * <p>If a {@link VertexParallelism} is returned then it covers all 
vertices contained in the
      * given job information.
      *
-     * <p>A returned {@link VertexParallelism} should be directly consumed 
afterwards (by either
-     * discarding it or calling {@link #reserveResources(VertexParallelism)}, 
as there is no
-     * guarantee that the assignment remains valid over time (because slots 
can be lost).
-     *
      * <p>Implementations of this method must be side-effect free. There is no 
guarantee that the
-     * result of this method is ever passed to {@link 
#reserveResources(VertexParallelism)}.
+     * result of this method is ever passed to {@link 
#tryReserveResources(VertexParallelism)}.
      *
      * @param jobInformation information about the job graph
      * @param slots slots to consider for determining the parallelism
@@ -62,10 +55,13 @@ public interface SlotAllocator {
             JobInformation jobInformation, Collection<? extends SlotInfo> 
slots);
 
     /**
-     * Reserves slots according to the given assignment.
+     * Reserves slots according to the given assignment if possible. If the 
underlying set of
+     * resources has changed and the reservation with respect to 
vertexParallelism is no longer
+     * possible, then this method returns {@link Optional#empty()}.
      *
      * @param vertexParallelism information on how slots should be assigned to 
the slots
-     * @return mapping of vertices to slots
+     * @return Set of reserved slots if the reservation was successful; 
otherwise {@link
+     *     Optional#empty()}
      */
-    Map<ExecutionVertexID, LogicalSlot> reserveResources(VertexParallelism 
vertexParallelism);
+    Optional<ReservedSlots> tryReserveResources(VertexParallelism 
vertexParallelism);
 }
diff --git 
a/flink-runtime/src/main/java/org/apache/flink/runtime/scheduler/adaptive/allocator/SlotSharingSlotAllocator.java
 
b/flink-runtime/src/main/java/org/apache/flink/runtime/scheduler/adaptive/allocator/SlotSharingSlotAllocator.java
index 26853cf..da5f7a3 100644
--- 
a/flink-runtime/src/main/java/org/apache/flink/runtime/scheduler/adaptive/allocator/SlotSharingSlotAllocator.java
+++ 
b/flink-runtime/src/main/java/org/apache/flink/runtime/scheduler/adaptive/allocator/SlotSharingSlotAllocator.java
@@ -148,8 +148,7 @@ public class SlotSharingSlotAllocator implements 
SlotAllocator {
     }
 
     @Override
-    public Map<ExecutionVertexID, LogicalSlot> reserveResources(
-            VertexParallelism vertexParallelism) {
+    public Optional<ReservedSlots> tryReserveResources(VertexParallelism 
vertexParallelism) {
         Preconditions.checkArgument(
                 vertexParallelism instanceof VertexParallelismWithSlotSharing,
                 String.format(
@@ -176,7 +175,7 @@ public class SlotSharingSlotAllocator implements 
SlotAllocator {
             }
         }
 
-        return assignedSlots;
+        return Optional.of(ReservedSlots.create(assignedSlots));
     }
 
     private SharedSlot reserveSharedSlot(SlotInfo slotInfo) {
diff --git 
a/flink-runtime/src/main/java/org/apache/flink/runtime/scheduler/adaptive/allocator/VertexParallelism.java
 
b/flink-runtime/src/main/java/org/apache/flink/runtime/scheduler/adaptive/allocator/VertexParallelism.java
index 99e8fd6..d4be287 100644
--- 
a/flink-runtime/src/main/java/org/apache/flink/runtime/scheduler/adaptive/allocator/VertexParallelism.java
+++ 
b/flink-runtime/src/main/java/org/apache/flink/runtime/scheduler/adaptive/allocator/VertexParallelism.java
@@ -27,7 +27,7 @@ import java.util.Map;
  * the parallelism each vertex could be scheduled with.
  *
  * <p>{@link SlotAllocator} implementations may encode additional information 
to be used in {@link
- * SlotAllocator#reserveResources(VertexParallelism)}.
+ * SlotAllocator#tryReserveResources(VertexParallelism)}.
  */
 public interface VertexParallelism {
     Map<JobVertexID, Integer> getMaxParallelismForVertices();
diff --git 
a/flink-runtime/src/test/java/org/apache/flink/runtime/scheduler/adaptive/allocator/SlotSharingSlotAllocatorTest.java
 
b/flink-runtime/src/test/java/org/apache/flink/runtime/scheduler/adaptive/allocator/SlotSharingSlotAllocatorTest.java
index 80c0618..b0e3716 100644
--- 
a/flink-runtime/src/test/java/org/apache/flink/runtime/scheduler/adaptive/allocator/SlotSharingSlotAllocatorTest.java
+++ 
b/flink-runtime/src/test/java/org/apache/flink/runtime/scheduler/adaptive/allocator/SlotSharingSlotAllocatorTest.java
@@ -147,8 +147,11 @@ public class SlotSharingSlotAllocatorTest extends 
TestLogger {
         final VertexParallelismWithSlotSharing slotAssignments =
                 slotAllocator.determineParallelism(jobInformation, 
getSlots(50)).get();
 
-        final Map<ExecutionVertexID, LogicalSlot> assignedResources =
-                slotAllocator.reserveResources(slotAssignments);
+        final ReservedSlots reservedSlots =
+                slotAllocator
+                        .tryReserveResources(slotAssignments)
+                        .orElseThrow(
+                                () -> new RuntimeException("Expected that 
reservation succeeds."));
 
         final Map<ExecutionVertexID, SlotInfo> expectedAssignments = new 
HashMap<>();
         for (SlotSharingSlotAllocator.ExecutionSlotSharingGroupAndSlot 
assignment :
@@ -161,7 +164,7 @@ public class SlotSharingSlotAllocatorTest extends 
TestLogger {
 
         for (Map.Entry<ExecutionVertexID, SlotInfo> expectedAssignment :
                 expectedAssignments.entrySet()) {
-            final LogicalSlot assignedSlot = 
assignedResources.get(expectedAssignment.getKey());
+            final LogicalSlot assignedSlot = 
reservedSlots.getSlotFor(expectedAssignment.getKey());
 
             final SlotInfo backingSlot = expectedAssignment.getValue();
 

Reply via email to