This is an automated email from the ASF dual-hosted git repository.

trohrmann pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/flink.git

commit ce55b21d53c29e37523c82ee001f4ecf968ad84e
Author: Till Rohrmann <[email protected]>
AuthorDate: Mon Mar 15 17:23:27 2021 +0100

    [FLINK-21602] Let SlotSharingSlotAllocator check whether resources are 
available before reserving them
    
    The SlotSharingSlotAllocator.tryReserve now checks whether the expected set 
of resources is still available
    before reserving slots. If the resources are no longer available, then 
tryReserve will return Optional.empty().
---
 .../scheduler/adaptive/AdaptiveScheduler.java      |  3 +-
 .../allocator/IsSlotAvailableAndFreeFunction.java  | 35 ++++++++++++
 .../allocator/SlotSharingSlotAllocator.java        | 62 +++++++++++++++++-----
 .../allocator/SlotSharingSlotAllocatorTest.java    | 48 ++++++++++++++---
 4 files changed, 128 insertions(+), 20 deletions(-)

diff --git 
a/flink-runtime/src/main/java/org/apache/flink/runtime/scheduler/adaptive/AdaptiveScheduler.java
 
b/flink-runtime/src/main/java/org/apache/flink/runtime/scheduler/adaptive/AdaptiveScheduler.java
index 9b6799d..c1d0caa 100644
--- 
a/flink-runtime/src/main/java/org/apache/flink/runtime/scheduler/adaptive/AdaptiveScheduler.java
+++ 
b/flink-runtime/src/main/java/org/apache/flink/runtime/scheduler/adaptive/AdaptiveScheduler.java
@@ -257,7 +257,8 @@ public class AdaptiveScheduler
         this.slotAllocator =
                 new SlotSharingSlotAllocator(
                         declarativeSlotPool::reserveFreeSlot,
-                        declarativeSlotPool::freeReservedSlot);
+                        declarativeSlotPool::freeReservedSlot,
+                        declarativeSlotPool::containsFreeSlot);
 
         
declarativeSlotPool.registerNewSlotsListener(this::newResourcesAvailable);
 
diff --git 
a/flink-runtime/src/main/java/org/apache/flink/runtime/scheduler/adaptive/allocator/IsSlotAvailableAndFreeFunction.java
 
b/flink-runtime/src/main/java/org/apache/flink/runtime/scheduler/adaptive/allocator/IsSlotAvailableAndFreeFunction.java
new file mode 100644
index 0000000..4bb5538
--- /dev/null
+++ 
b/flink-runtime/src/main/java/org/apache/flink/runtime/scheduler/adaptive/allocator/IsSlotAvailableAndFreeFunction.java
@@ -0,0 +1,35 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.runtime.scheduler.adaptive.allocator;
+
+import org.apache.flink.runtime.clusterframework.types.AllocationID;
+
+/** Functional interface for checking whether a slot is available and free. */
+@FunctionalInterface
+public interface IsSlotAvailableAndFreeFunction {
+
+    /**
+     * Returns {@code true} if a slot with the given {@link AllocationID} is 
available and free.
+     *
+     * @param allocationId allocationId specifies the slot to check
+     * @return {@code true} if a slot with the given allocationId is available 
and free; otherwise
+     *     {@code false}
+     */
+    boolean isSlotAvailableAndFree(AllocationID allocationId);
+}
diff --git 
a/flink-runtime/src/main/java/org/apache/flink/runtime/scheduler/adaptive/allocator/SlotSharingSlotAllocator.java
 
b/flink-runtime/src/main/java/org/apache/flink/runtime/scheduler/adaptive/allocator/SlotSharingSlotAllocator.java
index da5f7a3..fc72a45 100644
--- 
a/flink-runtime/src/main/java/org/apache/flink/runtime/scheduler/adaptive/allocator/SlotSharingSlotAllocator.java
+++ 
b/flink-runtime/src/main/java/org/apache/flink/runtime/scheduler/adaptive/allocator/SlotSharingSlotAllocator.java
@@ -17,6 +17,7 @@
 
 package org.apache.flink.runtime.scheduler.adaptive.allocator;
 
+import org.apache.flink.runtime.clusterframework.types.AllocationID;
 import org.apache.flink.runtime.clusterframework.types.ResourceProfile;
 import org.apache.flink.runtime.instance.SlotSharingGroupId;
 import org.apache.flink.runtime.jobgraph.JobVertexID;
@@ -29,6 +30,8 @@ import 
org.apache.flink.runtime.scheduler.strategy.ExecutionVertexID;
 import org.apache.flink.runtime.util.ResourceCounter;
 import org.apache.flink.util.Preconditions;
 
+import javax.annotation.Nonnull;
+
 import java.util.ArrayList;
 import java.util.Collection;
 import java.util.HashMap;
@@ -45,11 +48,15 @@ public class SlotSharingSlotAllocator implements 
SlotAllocator {
 
     private final ReserveSlotFunction reserveSlotFunction;
     private final FreeSlotFunction freeSlotFunction;
+    private final IsSlotAvailableAndFreeFunction 
isSlotAvailableAndFreeFunction;
 
     public SlotSharingSlotAllocator(
-            ReserveSlotFunction reserveSlot, FreeSlotFunction 
freeSlotFunction) {
+            ReserveSlotFunction reserveSlot,
+            FreeSlotFunction freeSlotFunction,
+            IsSlotAvailableAndFreeFunction isSlotAvailableAndFreeFunction) {
         this.reserveSlotFunction = reserveSlot;
         this.freeSlotFunction = freeSlotFunction;
+        this.isSlotAvailableAndFreeFunction = isSlotAvailableAndFreeFunction;
     }
 
     @Override
@@ -159,23 +166,52 @@ public class SlotSharingSlotAllocator implements 
SlotAllocator {
         final VertexParallelismWithSlotSharing 
vertexParallelismWithSlotSharing =
                 (VertexParallelismWithSlotSharing) vertexParallelism;
 
-        final Map<ExecutionVertexID, LogicalSlot> assignedSlots = new 
HashMap<>();
+        final Collection<AllocationID> expectedSlots =
+                
calculateExpectedSlots(vertexParallelismWithSlotSharing.getAssignments());
+
+        if (areAllExpectedSlotsAvailableAndFree(expectedSlots)) {
+            final Map<ExecutionVertexID, LogicalSlot> assignedSlots = new 
HashMap<>();
+
+            for (ExecutionSlotSharingGroupAndSlot executionSlotSharingGroup :
+                    vertexParallelismWithSlotSharing.getAssignments()) {
+                final SharedSlot sharedSlot =
+                        
reserveSharedSlot(executionSlotSharingGroup.getSlotInfo());
+
+                for (ExecutionVertexID executionVertexId :
+                        executionSlotSharingGroup
+                                .getExecutionSlotSharingGroup()
+                                .getContainedExecutionVertices()) {
+                    final LogicalSlot logicalSlot = 
sharedSlot.allocateLogicalSlot();
+                    assignedSlots.put(executionVertexId, logicalSlot);
+                }
+            }
+
+            return Optional.of(ReservedSlots.create(assignedSlots));
+        } else {
+            return Optional.empty();
+        }
+    }
 
-        for (ExecutionSlotSharingGroupAndSlot executionSlotSharingGroup :
-                vertexParallelismWithSlotSharing.getAssignments()) {
-            final SharedSlot sharedSlot =
-                    reserveSharedSlot(executionSlotSharingGroup.getSlotInfo());
+    @Nonnull
+    private Collection<AllocationID> calculateExpectedSlots(
+            Iterable<? extends ExecutionSlotSharingGroupAndSlot> assignments) {
+        final Collection<AllocationID> requiredSlots = new ArrayList<>();
+
+        for (ExecutionSlotSharingGroupAndSlot assignment : assignments) {
+            requiredSlots.add(assignment.getSlotInfo().getAllocationId());
+        }
+        return requiredSlots;
+    }
 
-            for (ExecutionVertexID executionVertexId :
-                    executionSlotSharingGroup
-                            .getExecutionSlotSharingGroup()
-                            .getContainedExecutionVertices()) {
-                final LogicalSlot logicalSlot = 
sharedSlot.allocateLogicalSlot();
-                assignedSlots.put(executionVertexId, logicalSlot);
+    private boolean areAllExpectedSlotsAvailableAndFree(
+            Iterable<? extends AllocationID> requiredSlots) {
+        for (AllocationID requiredSlot : requiredSlots) {
+            if 
(!isSlotAvailableAndFreeFunction.isSlotAvailableAndFree(requiredSlot)) {
+                return false;
             }
         }
 
-        return Optional.of(ReservedSlots.create(assignedSlots));
+        return true;
     }
 
     private SharedSlot reserveSharedSlot(SlotInfo slotInfo) {
diff --git 
a/flink-runtime/src/test/java/org/apache/flink/runtime/scheduler/adaptive/allocator/SlotSharingSlotAllocatorTest.java
 
b/flink-runtime/src/test/java/org/apache/flink/runtime/scheduler/adaptive/allocator/SlotSharingSlotAllocatorTest.java
index b0e3716..22b8ddd 100644
--- 
a/flink-runtime/src/test/java/org/apache/flink/runtime/scheduler/adaptive/allocator/SlotSharingSlotAllocatorTest.java
+++ 
b/flink-runtime/src/test/java/org/apache/flink/runtime/scheduler/adaptive/allocator/SlotSharingSlotAllocatorTest.java
@@ -40,6 +40,7 @@ import java.util.stream.Collectors;
 
 import static org.hamcrest.CoreMatchers.is;
 import static org.hamcrest.Matchers.contains;
+import static org.junit.Assert.assertFalse;
 import static org.junit.Assert.assertThat;
 
 /** Tests for the {@link SlotSharingSlotAllocator}. */
@@ -52,6 +53,8 @@ public class SlotSharingSlotAllocatorTest extends TestLogger {
                             .withAllocationID(allocationId)
                             .withResourceProfile(resourceProfile)
                             .build();
+    private static final IsSlotAvailableAndFreeFunction 
TEST_IS_SLOT_FREE_FUNCTION =
+            ignored -> true;
 
     private static final SlotSharingGroup slotSharingGroup1 = new 
SlotSharingGroup();
     private static final SlotSharingGroup slotSharingGroup2 = new 
SlotSharingGroup();
@@ -65,7 +68,10 @@ public class SlotSharingSlotAllocatorTest extends TestLogger 
{
     @Test
     public void testCalculateRequiredSlots() {
         final SlotSharingSlotAllocator slotAllocator =
-                new SlotSharingSlotAllocator(TEST_RESERVE_SLOT_FUNCTION, 
TEST_FREE_SLOT_FUNCTION);
+                new SlotSharingSlotAllocator(
+                        TEST_RESERVE_SLOT_FUNCTION,
+                        TEST_FREE_SLOT_FUNCTION,
+                        TEST_IS_SLOT_FREE_FUNCTION);
 
         final ResourceCounter resourceCounter =
                 slotAllocator.calculateRequiredSlots(Arrays.asList(vertex1, 
vertex2, vertex3));
@@ -81,7 +87,10 @@ public class SlotSharingSlotAllocatorTest extends TestLogger 
{
     @Test
     public void testDetermineParallelismWithMinimumSlots() {
         final SlotSharingSlotAllocator slotAllocator =
-                new SlotSharingSlotAllocator(TEST_RESERVE_SLOT_FUNCTION, 
TEST_FREE_SLOT_FUNCTION);
+                new SlotSharingSlotAllocator(
+                        TEST_RESERVE_SLOT_FUNCTION,
+                        TEST_FREE_SLOT_FUNCTION,
+                        TEST_IS_SLOT_FREE_FUNCTION);
 
         final JobInformation jobInformation =
                 new TestJobInformation(Arrays.asList(vertex1, vertex2, 
vertex3));
@@ -100,7 +109,10 @@ public class SlotSharingSlotAllocatorTest extends 
TestLogger {
     @Test
     public void testDetermineParallelismWithManySlots() {
         final SlotSharingSlotAllocator slotAllocator =
-                new SlotSharingSlotAllocator(TEST_RESERVE_SLOT_FUNCTION, 
TEST_FREE_SLOT_FUNCTION);
+                new SlotSharingSlotAllocator(
+                        TEST_RESERVE_SLOT_FUNCTION,
+                        TEST_FREE_SLOT_FUNCTION,
+                        TEST_IS_SLOT_FREE_FUNCTION);
 
         final JobInformation jobInformation =
                 new TestJobInformation(Arrays.asList(vertex1, vertex2, 
vertex3));
@@ -125,7 +137,10 @@ public class SlotSharingSlotAllocatorTest extends 
TestLogger {
     @Test
     public void 
testDetermineParallelismUnsuccessfulWithLessSlotsThanSlotSharingGroups() {
         final SlotSharingSlotAllocator slotAllocator =
-                new SlotSharingSlotAllocator(TEST_RESERVE_SLOT_FUNCTION, 
TEST_FREE_SLOT_FUNCTION);
+                new SlotSharingSlotAllocator(
+                        TEST_RESERVE_SLOT_FUNCTION,
+                        TEST_FREE_SLOT_FUNCTION,
+                        TEST_IS_SLOT_FREE_FUNCTION);
 
         final JobInformation jobInformation =
                 new TestJobInformation(Arrays.asList(vertex1, vertex2, 
vertex3));
@@ -137,9 +152,12 @@ public class SlotSharingSlotAllocatorTest extends 
TestLogger {
     }
 
     @Test
-    public void testReserveResources() {
+    public void testReserveAvailableResources() {
         final SlotSharingSlotAllocator slotAllocator =
-                new SlotSharingSlotAllocator(TEST_RESERVE_SLOT_FUNCTION, 
TEST_FREE_SLOT_FUNCTION);
+                new SlotSharingSlotAllocator(
+                        TEST_RESERVE_SLOT_FUNCTION,
+                        TEST_FREE_SLOT_FUNCTION,
+                        TEST_IS_SLOT_FREE_FUNCTION);
 
         final JobInformation jobInformation =
                 new TestJobInformation(Arrays.asList(vertex1, vertex2, 
vertex3));
@@ -172,6 +190,24 @@ public class SlotSharingSlotAllocatorTest extends 
TestLogger {
         }
     }
 
+    @Test
+    public void testReserveUnavailableResources() {
+        final SlotSharingSlotAllocator slotSharingSlotAllocator =
+                new SlotSharingSlotAllocator(
+                        TEST_RESERVE_SLOT_FUNCTION, TEST_FREE_SLOT_FUNCTION, 
ignored -> false);
+
+        final JobInformation jobInformation =
+                new TestJobInformation(Arrays.asList(vertex1, vertex2, 
vertex3));
+
+        final VertexParallelismWithSlotSharing slotAssignments =
+                slotSharingSlotAllocator.determineParallelism(jobInformation, 
getSlots(50)).get();
+
+        final Optional<? extends ReservedSlots> reservedSlots =
+                slotSharingSlotAllocator.tryReserveResources(slotAssignments);
+
+        assertFalse(reservedSlots.isPresent());
+    }
+
     private static Collection<SlotInfo> getSlots(int count) {
         final Collection<SlotInfo> slotInfo = new ArrayList<>();
         for (int i = 0; i < count; i++) {

Reply via email to