This is an automated email from the ASF dual-hosted git repository.

zhuzh pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/flink.git

commit 6d9eb50862624cb2af2d77e747550e8ce28908bd
Author: Zhu Zhu <[email protected]>
AuthorDate: Thu Jun 11 22:50:00 2020 +0800

    [FLINK-17018][runtime] Extract common logics of 
DefaultExecutionSlotAllocator into AbstractExecutionSlotAllocator
---
 .../scheduler/AbstractExecutionSlotAllocator.java  | 131 +++++++++++++++
 .../scheduler/DefaultExecutionSlotAllocator.java   | 132 +++++----------
 .../AbstractExecutionSlotAllocatorTest.java        | 178 +++++++++++++++++++++
 .../DefaultExecutionSlotAllocatorTest.java         | 128 ++++-----------
 4 files changed, 374 insertions(+), 195 deletions(-)

diff --git 
a/flink-runtime/src/main/java/org/apache/flink/runtime/scheduler/AbstractExecutionSlotAllocator.java
 
b/flink-runtime/src/main/java/org/apache/flink/runtime/scheduler/AbstractExecutionSlotAllocator.java
new file mode 100644
index 0000000..d8dbf3b
--- /dev/null
+++ 
b/flink-runtime/src/main/java/org/apache/flink/runtime/scheduler/AbstractExecutionSlotAllocator.java
@@ -0,0 +1,131 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.runtime.scheduler;
+
+import org.apache.flink.annotation.VisibleForTesting;
+import org.apache.flink.runtime.clusterframework.types.AllocationID;
+import org.apache.flink.runtime.clusterframework.types.ResourceProfile;
+import org.apache.flink.runtime.clusterframework.types.SlotProfile;
+import org.apache.flink.runtime.jobmaster.LogicalSlot;
+import org.apache.flink.runtime.scheduler.strategy.ExecutionVertexID;
+import org.apache.flink.runtime.taskmanager.TaskManagerLocation;
+
+import java.util.Collection;
+import java.util.Collections;
+import java.util.HashMap;
+import java.util.Map;
+import java.util.Objects;
+import java.util.Set;
+import java.util.concurrent.CompletableFuture;
+import java.util.function.Consumer;
+import java.util.stream.Collectors;
+
+import static org.apache.flink.util.Preconditions.checkNotNull;
+import static org.apache.flink.util.Preconditions.checkState;
+
+/**
+ * Base class for all {@link ExecutionSlotAllocator}. It is responsible to 
allocate slots for tasks and
+ * keep the unfulfilled slot requests for further cancellation.
+ */
+abstract class AbstractExecutionSlotAllocator implements 
ExecutionSlotAllocator {
+
+       private final Map<ExecutionVertexID, SlotExecutionVertexAssignment> 
pendingSlotAssignments;
+
+       private final PreferredLocationsRetriever preferredLocationsRetriever;
+
+       AbstractExecutionSlotAllocator(final PreferredLocationsRetriever 
preferredLocationsRetriever) {
+               this.preferredLocationsRetriever = 
checkNotNull(preferredLocationsRetriever);
+               this.pendingSlotAssignments = new HashMap<>();
+       }
+
+       @Override
+       public void cancel(final ExecutionVertexID executionVertexId) {
+               final SlotExecutionVertexAssignment 
slotExecutionVertexAssignment = pendingSlotAssignments.get(executionVertexId);
+               if (slotExecutionVertexAssignment != null) {
+                       
slotExecutionVertexAssignment.getLogicalSlotFuture().cancel(false);
+               }
+       }
+
+       void validateSchedulingRequirements(final 
Collection<ExecutionVertexSchedulingRequirements> schedulingRequirements) {
+               schedulingRequirements.stream()
+                       
.map(ExecutionVertexSchedulingRequirements::getExecutionVertexId)
+                       .forEach(id -> checkState(
+                               !pendingSlotAssignments.containsKey(id),
+                               "BUG: vertex %s tries to allocate a slot when 
its previous slot request is still pending", id));
+       }
+
+       SlotExecutionVertexAssignment 
createAndRegisterSlotExecutionVertexAssignment(
+                       final ExecutionVertexID executionVertexId,
+                       final CompletableFuture<LogicalSlot> logicalSlotFuture,
+                       final Consumer<Throwable> slotRequestFailureHandler) {
+
+               final SlotExecutionVertexAssignment 
slotExecutionVertexAssignment =
+                       new SlotExecutionVertexAssignment(executionVertexId, 
logicalSlotFuture);
+
+               // add to map first in case the slot future is already completed
+               pendingSlotAssignments.put(executionVertexId, 
slotExecutionVertexAssignment);
+
+               logicalSlotFuture.whenComplete(
+                       (ignored, throwable) -> {
+                               
pendingSlotAssignments.remove(executionVertexId);
+                               if (throwable != null) {
+                                       
slotRequestFailureHandler.accept(throwable);
+                               }
+                       });
+
+               return slotExecutionVertexAssignment;
+       }
+
+       CompletableFuture<SlotProfile> getSlotProfileFuture(
+                       final ExecutionVertexSchedulingRequirements 
schedulingRequirements,
+                       final ResourceProfile physicalSlotResourceProfile,
+                       final Set<ExecutionVertexID> producersToIgnore,
+                       final Set<AllocationID> allPreviousAllocationIds) {
+
+               final CompletableFuture<Collection<TaskManagerLocation>> 
preferredLocationsFuture =
+                       preferredLocationsRetriever.getPreferredLocations(
+                               schedulingRequirements.getExecutionVertexId(),
+                               producersToIgnore);
+
+               return preferredLocationsFuture.thenApply(
+                       preferredLocations ->
+                               SlotProfile.priorAllocation(
+                                       
schedulingRequirements.getTaskResourceProfile(),
+                                       physicalSlotResourceProfile,
+                                       preferredLocations,
+                                       
Collections.singletonList(schedulingRequirements.getPreviousAllocationId()),
+                                       allPreviousAllocationIds));
+       }
+
+       @VisibleForTesting
+       static Set<AllocationID> computeAllPriorAllocationIds(
+                       final Collection<ExecutionVertexSchedulingRequirements> 
executionVertexSchedulingRequirements) {
+
+               return executionVertexSchedulingRequirements
+                       .stream()
+                       
.map(ExecutionVertexSchedulingRequirements::getPreviousAllocationId)
+                       .filter(Objects::nonNull)
+                       .collect(Collectors.toSet());
+       }
+
+       @VisibleForTesting
+       Map<ExecutionVertexID, SlotExecutionVertexAssignment> 
getPendingSlotAssignments() {
+               return Collections.unmodifiableMap(pendingSlotAssignments);
+       }
+}
diff --git 
a/flink-runtime/src/main/java/org/apache/flink/runtime/scheduler/DefaultExecutionSlotAllocator.java
 
b/flink-runtime/src/main/java/org/apache/flink/runtime/scheduler/DefaultExecutionSlotAllocator.java
index c7b2dd9..07da10e 100644
--- 
a/flink-runtime/src/main/java/org/apache/flink/runtime/scheduler/DefaultExecutionSlotAllocator.java
+++ 
b/flink-runtime/src/main/java/org/apache/flink/runtime/scheduler/DefaultExecutionSlotAllocator.java
@@ -18,7 +18,6 @@
 
 package org.apache.flink.runtime.scheduler;
 
-import org.apache.flink.annotation.VisibleForTesting;
 import org.apache.flink.runtime.clusterframework.types.AllocationID;
 import org.apache.flink.runtime.clusterframework.types.SlotProfile;
 import org.apache.flink.runtime.executiongraph.SlotProviderStrategy;
@@ -28,49 +27,34 @@ import org.apache.flink.runtime.jobmaster.LogicalSlot;
 import org.apache.flink.runtime.jobmaster.SlotRequestId;
 import org.apache.flink.runtime.jobmaster.slotpool.SlotProvider;
 import org.apache.flink.runtime.scheduler.strategy.ExecutionVertexID;
-import org.apache.flink.runtime.taskmanager.TaskManagerLocation;
 
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 
 import java.util.ArrayList;
-import java.util.Collection;
 import java.util.Collections;
-import java.util.HashMap;
 import java.util.List;
-import java.util.Map;
-import java.util.Objects;
 import java.util.Set;
 import java.util.concurrent.CompletableFuture;
-import java.util.stream.Collectors;
 
 import static org.apache.flink.util.Preconditions.checkNotNull;
-import static org.apache.flink.util.Preconditions.checkState;
 
 /**
  * Default {@link ExecutionSlotAllocator} which will use {@link SlotProvider} 
to allocate slots and
  * keep the unfulfilled requests for further cancellation.
  */
-public class DefaultExecutionSlotAllocator implements ExecutionSlotAllocator {
+public class DefaultExecutionSlotAllocator extends 
AbstractExecutionSlotAllocator {
 
        private static final Logger LOG = 
LoggerFactory.getLogger(DefaultExecutionSlotAllocator.class);
 
-       /**
-        * Store the uncompleted slot assignments.
-        */
-       private final Map<ExecutionVertexID, SlotExecutionVertexAssignment> 
pendingSlotAssignments;
-
        private final SlotProviderStrategy slotProviderStrategy;
 
-       private final PreferredLocationsRetriever preferredLocationsRetriever;
-
        public DefaultExecutionSlotAllocator(
                        final SlotProviderStrategy slotProviderStrategy,
                        final PreferredLocationsRetriever 
preferredLocationsRetriever) {
-               this.slotProviderStrategy = checkNotNull(slotProviderStrategy);
-               this.preferredLocationsRetriever = 
checkNotNull(preferredLocationsRetriever);
 
-               pendingSlotAssignments = new HashMap<>();
+               super(preferredLocationsRetriever);
+               this.slotProviderStrategy = checkNotNull(slotProviderStrategy);
        }
 
        @Override
@@ -86,89 +70,49 @@ public class DefaultExecutionSlotAllocator implements 
ExecutionSlotAllocator {
 
                for (ExecutionVertexSchedulingRequirements 
schedulingRequirements : executionVertexSchedulingRequirements) {
                        final ExecutionVertexID executionVertexId = 
schedulingRequirements.getExecutionVertexId();
-                       final SlotRequestId slotRequestId = new SlotRequestId();
                        final SlotSharingGroupId slotSharingGroupId = 
schedulingRequirements.getSlotSharingGroupId();
 
-                       LOG.debug("Allocate slot with id {} for execution {}", 
slotRequestId, executionVertexId);
-
-                       CompletableFuture<LogicalSlot> slotFuture = 
calculatePreferredLocations(
-                                       executionVertexId).thenCompose(
-                                                       
(Collection<TaskManagerLocation> preferredLocations) ->
-                                                               
slotProviderStrategy.allocateSlot(
-                                                                       
slotRequestId,
-                                                                       new 
ScheduledUnit(
-                                                                               
executionVertexId,
-                                                                               
slotSharingGroupId,
-                                                                               
schedulingRequirements.getCoLocationConstraint()),
-                                                                       
SlotProfile.priorAllocation(
-                                                                               
schedulingRequirements.getTaskResourceProfile(),
-                                                                               
schedulingRequirements.getPhysicalSlotResourceProfile(),
-                                                                               
preferredLocations,
-                                                                               
Collections.singletonList(schedulingRequirements.getPreviousAllocationId()),
-                                                                               
allPreviousAllocationIds)));
-
-                       SlotExecutionVertexAssignment 
slotExecutionVertexAssignment =
-                                       new 
SlotExecutionVertexAssignment(executionVertexId, slotFuture);
-                       // add to map first to avoid the future completed 
before added.
-                       pendingSlotAssignments.put(executionVertexId, 
slotExecutionVertexAssignment);
-
-                       slotFuture.whenComplete(
-                                       (ignored, throwable) -> {
-                                               
pendingSlotAssignments.remove(executionVertexId);
-                                               if (throwable != null) {
-                                                       
slotProviderStrategy.cancelSlotRequest(slotRequestId, slotSharingGroupId, 
throwable);
-                                               }
-                                       });
-
-                       
slotExecutionVertexAssignments.add(slotExecutionVertexAssignment);
-               }
+                       final SlotRequestId slotRequestId = new SlotRequestId();
 
-               return slotExecutionVertexAssignments;
-       }
+                       final CompletableFuture<LogicalSlot> slotFuture = 
allocateSlot(
+                               schedulingRequirements,
+                               slotRequestId,
+                               allPreviousAllocationIds);
 
-       private void 
validateSchedulingRequirements(Collection<ExecutionVertexSchedulingRequirements>
 schedulingRequirements) {
-               schedulingRequirements.stream()
-                       
.map(ExecutionVertexSchedulingRequirements::getExecutionVertexId)
-                       .forEach(id -> checkState(
-                               !pendingSlotAssignments.containsKey(id),
-                               "BUG: vertex %s tries to allocate a slot when 
its previous slot request is still pending", id));
-       }
+                       final SlotExecutionVertexAssignment 
slotExecutionVertexAssignment =
+                               createAndRegisterSlotExecutionVertexAssignment(
+                                       executionVertexId,
+                                       slotFuture,
+                                       throwable -> 
slotProviderStrategy.cancelSlotRequest(slotRequestId, slotSharingGroupId, 
throwable));
 
-       @Override
-       public void cancel(ExecutionVertexID executionVertexId) {
-               SlotExecutionVertexAssignment slotExecutionVertexAssignment = 
pendingSlotAssignments.get(executionVertexId);
-               if (slotExecutionVertexAssignment != null) {
-                       
slotExecutionVertexAssignment.getLogicalSlotFuture().cancel(false);
+                       
slotExecutionVertexAssignments.add(slotExecutionVertexAssignment);
                }
-       }
-
-       /**
-        * Calculates the preferred locations for an execution.
-        * It will first try to use preferred locations based on state,
-        * if null, will use the preferred locations based on inputs.
-        */
-       private CompletableFuture<Collection<TaskManagerLocation>> 
calculatePreferredLocations(
-                       ExecutionVertexID executionVertexId) {
-               return 
preferredLocationsRetriever.getPreferredLocations(executionVertexId, 
Collections.emptySet());
-       }
 
-       /**
-        * Computes and returns a set with the prior allocation ids from all 
execution vertices scheduled together.
-        *
-        * @param executionVertexSchedulingRequirements contains the execution 
vertices which are scheduled together
-        */
-       @VisibleForTesting
-       static Set<AllocationID> computeAllPriorAllocationIds(
-                       Collection<ExecutionVertexSchedulingRequirements> 
executionVertexSchedulingRequirements) {
-               return executionVertexSchedulingRequirements
-                       .stream()
-                       
.map(ExecutionVertexSchedulingRequirements::getPreviousAllocationId)
-                       .filter(Objects::nonNull)
-                       .collect(Collectors.toSet());
+               return slotExecutionVertexAssignments;
        }
 
-       @VisibleForTesting
-       int getNumberOfPendingSlotAssignments() {
-               return pendingSlotAssignments.size();
+       private CompletableFuture<LogicalSlot> allocateSlot(
+                       final ExecutionVertexSchedulingRequirements 
schedulingRequirements,
+                       final SlotRequestId slotRequestId,
+                       final Set<AllocationID> allPreviousAllocationIds) {
+
+               final ExecutionVertexID executionVertexId = 
schedulingRequirements.getExecutionVertexId();
+
+               LOG.debug("Allocate slot with id {} for execution {}", 
slotRequestId, executionVertexId);
+
+               final CompletableFuture<SlotProfile> slotProfileFuture = 
getSlotProfileFuture(
+                       schedulingRequirements,
+                       schedulingRequirements.getPhysicalSlotResourceProfile(),
+                       Collections.emptySet(),
+                       allPreviousAllocationIds);
+
+               return slotProfileFuture.thenCompose(
+                       slotProfile -> slotProviderStrategy.allocateSlot(
+                               slotRequestId,
+                               new ScheduledUnit(
+                                       executionVertexId,
+                                       
schedulingRequirements.getSlotSharingGroupId(),
+                                       
schedulingRequirements.getCoLocationConstraint()),
+                               slotProfile));
        }
 }
diff --git 
a/flink-runtime/src/test/java/org/apache/flink/runtime/scheduler/AbstractExecutionSlotAllocatorTest.java
 
b/flink-runtime/src/test/java/org/apache/flink/runtime/scheduler/AbstractExecutionSlotAllocatorTest.java
new file mode 100644
index 0000000..4077930
--- /dev/null
+++ 
b/flink-runtime/src/test/java/org/apache/flink/runtime/scheduler/AbstractExecutionSlotAllocatorTest.java
@@ -0,0 +1,178 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.runtime.scheduler;
+
+import org.apache.flink.runtime.clusterframework.types.AllocationID;
+import org.apache.flink.runtime.jobgraph.JobVertexID;
+import org.apache.flink.runtime.scheduler.strategy.ExecutionVertexID;
+import org.apache.flink.util.TestLogger;
+
+import org.junit.Before;
+import org.junit.Test;
+
+import java.util.ArrayList;
+import java.util.Arrays;
+import java.util.List;
+import java.util.Set;
+import java.util.concurrent.CompletableFuture;
+
+import static org.hamcrest.CoreMatchers.is;
+import static org.hamcrest.Matchers.contains;
+import static org.hamcrest.Matchers.containsInAnyOrder;
+import static org.hamcrest.Matchers.hasSize;
+import static org.junit.Assert.assertThat;
+
+/**
+ * Tests for {@link AbstractExecutionSlotAllocator}.
+ */
+public class AbstractExecutionSlotAllocatorTest extends TestLogger {
+
+       private AbstractExecutionSlotAllocator executionSlotAllocator;
+
+       @Before
+       public void setUp() throws Exception {
+               executionSlotAllocator = new TestingExecutionSlotAllocator();
+       }
+
+       @Test
+       public void testCancel() {
+               final ExecutionVertexID executionVertexId = new 
ExecutionVertexID(new JobVertexID(), 0);
+
+               final List<ExecutionVertexSchedulingRequirements> 
schedulingRequirements =
+                       createSchedulingRequirements(executionVertexId);
+               final List<SlotExecutionVertexAssignment> assignments =
+                       
executionSlotAllocator.allocateSlotsFor(schedulingRequirements);
+
+               executionSlotAllocator.cancel(executionVertexId);
+
+               
assertThat(assignments.get(0).getLogicalSlotFuture().isCancelled(), is(true));
+       }
+
+       @Test(expected = IllegalStateException.class)
+       public void testValidateSchedulingRequirements() {
+               final ExecutionVertexID executionVertexId = new 
ExecutionVertexID(new JobVertexID(), 0);
+
+               final List<ExecutionVertexSchedulingRequirements> 
schedulingRequirements =
+                       createSchedulingRequirements(executionVertexId);
+               executionSlotAllocator.allocateSlotsFor(schedulingRequirements);
+
+               
executionSlotAllocator.validateSchedulingRequirements(schedulingRequirements);
+       }
+
+       @Test
+       public void testCreateAndRegisterSlotExecutionVertexAssignment() {
+               final ExecutionVertexID executionVertexId = new 
ExecutionVertexID(new JobVertexID(), 0);
+
+               final List<ExecutionVertexSchedulingRequirements> 
schedulingRequirements =
+                       createSchedulingRequirements(executionVertexId);
+               final List<SlotExecutionVertexAssignment> assignments =
+                       
executionSlotAllocator.allocateSlotsFor(schedulingRequirements);
+
+               assertThat(assignments, hasSize(1));
+
+               final SlotExecutionVertexAssignment assignment = 
assignments.get(0);
+               assertThat(assignment.getExecutionVertexId(), 
is(executionVertexId));
+               assertThat(assignment.getLogicalSlotFuture().isDone(), 
is(false));
+               
assertThat(executionSlotAllocator.getPendingSlotAssignments().values(), 
contains(assignment));
+
+               assignment.getLogicalSlotFuture().cancel(false);
+
+               
assertThat(executionSlotAllocator.getPendingSlotAssignments().keySet(), 
hasSize(0));
+       }
+
+       @Test
+       public void testCompletedExecutionVertexAssignmentWillBeUnregistered() {
+               final ExecutionVertexID executionVertexId = new 
ExecutionVertexID(new JobVertexID(), 0);
+
+               final List<ExecutionVertexSchedulingRequirements> 
schedulingRequirements =
+                       createSchedulingRequirements(executionVertexId);
+               final List<SlotExecutionVertexAssignment> assignments =
+                       
executionSlotAllocator.allocateSlotsFor(schedulingRequirements);
+
+               assignments.get(0).getLogicalSlotFuture().cancel(false);
+
+               
assertThat(executionSlotAllocator.getPendingSlotAssignments().keySet(), 
hasSize(0));
+       }
+
+       @Test
+       public void testComputeAllPriorAllocationIds() {
+               final List<AllocationID> expectAllocationIds = 
Arrays.asList(new AllocationID(), new AllocationID());
+               final List<ExecutionVertexSchedulingRequirements> 
testSchedulingRequirements = Arrays.asList(
+                       new ExecutionVertexSchedulingRequirements.Builder().
+                               withExecutionVertexId(new ExecutionVertexID(new 
JobVertexID(), 0)).
+                               
withPreviousAllocationId(expectAllocationIds.get(0)).
+                               build(),
+                       new ExecutionVertexSchedulingRequirements.Builder().
+                               withExecutionVertexId(new ExecutionVertexID(new 
JobVertexID(), 1)).
+                               
withPreviousAllocationId(expectAllocationIds.get(0)).
+                               build(),
+                       new ExecutionVertexSchedulingRequirements.Builder().
+                               withExecutionVertexId(new ExecutionVertexID(new 
JobVertexID(), 2)).
+                               
withPreviousAllocationId(expectAllocationIds.get(1)).
+                               build(),
+                       new ExecutionVertexSchedulingRequirements.Builder().
+                               withExecutionVertexId(new ExecutionVertexID(new 
JobVertexID(), 3)).
+                               build()
+               );
+
+               final Set<AllocationID> allPriorAllocationIds =
+                       
AbstractExecutionSlotAllocator.computeAllPriorAllocationIds(testSchedulingRequirements);
+               assertThat(allPriorAllocationIds, 
containsInAnyOrder(expectAllocationIds.toArray()));
+       }
+
+       private List<ExecutionVertexSchedulingRequirements> 
createSchedulingRequirements(
+                       final ExecutionVertexID... executionVertexIds) {
+
+               final List<ExecutionVertexSchedulingRequirements> 
schedulingRequirements = new ArrayList<>(executionVertexIds.length);
+
+               for (ExecutionVertexID executionVertexId : executionVertexIds) {
+                       schedulingRequirements.add(new 
ExecutionVertexSchedulingRequirements.Builder()
+                               
.withExecutionVertexId(executionVertexId).build());
+               }
+               return schedulingRequirements;
+       }
+
+       private static class TestingExecutionSlotAllocator extends 
AbstractExecutionSlotAllocator {
+
+               TestingExecutionSlotAllocator() {
+                       super(
+                               new DefaultPreferredLocationsRetriever(
+                                       new TestingStateLocationRetriever(),
+                                       new 
TestingInputsLocationsRetriever.Builder().build()));
+               }
+
+               @Override
+               public List<SlotExecutionVertexAssignment> allocateSlotsFor(
+                               final 
List<ExecutionVertexSchedulingRequirements> 
executionVertexSchedulingRequirements) {
+
+                       final List<SlotExecutionVertexAssignment> 
slotExecutionVertexAssignments =
+                               new 
ArrayList<>(executionVertexSchedulingRequirements.size());
+
+                       for (ExecutionVertexSchedulingRequirements 
schedulingRequirements : executionVertexSchedulingRequirements) {
+                               slotExecutionVertexAssignments.add(
+                                       
createAndRegisterSlotExecutionVertexAssignment(
+                                               
schedulingRequirements.getExecutionVertexId(),
+                                               new CompletableFuture<>(),
+                                               throwable -> {}));
+                       }
+
+                       return slotExecutionVertexAssignments;
+               }
+       }
+}
diff --git 
a/flink-runtime/src/test/java/org/apache/flink/runtime/scheduler/DefaultExecutionSlotAllocatorTest.java
 
b/flink-runtime/src/test/java/org/apache/flink/runtime/scheduler/DefaultExecutionSlotAllocatorTest.java
index ad74357..0646a32 100644
--- 
a/flink-runtime/src/test/java/org/apache/flink/runtime/scheduler/DefaultExecutionSlotAllocatorTest.java
+++ 
b/flink-runtime/src/test/java/org/apache/flink/runtime/scheduler/DefaultExecutionSlotAllocatorTest.java
@@ -49,18 +49,12 @@ import java.util.Arrays;
 import java.util.Collection;
 import java.util.Collections;
 import java.util.List;
-import java.util.Set;
-import java.util.concurrent.CancellationException;
 import java.util.concurrent.CompletableFuture;
-import java.util.stream.Collectors;
 
 import static 
org.apache.flink.runtime.scheduler.ExecutionSlotAllocatorTestUtils.createSchedulingRequirements;
 import static 
org.apache.flink.runtime.scheduler.ExecutionSlotAllocatorTestUtils.findSlotAssignmentByExecutionVertexId;
 import static org.hamcrest.Matchers.contains;
-import static org.hamcrest.Matchers.containsInAnyOrder;
-import static org.hamcrest.Matchers.empty;
 import static org.hamcrest.Matchers.hasSize;
-import static org.hamcrest.Matchers.is;
 import static org.junit.Assert.assertEquals;
 import static org.junit.Assert.assertFalse;
 import static org.junit.Assert.assertThat;
@@ -111,7 +105,7 @@ public class DefaultExecutionSlotAllocatorTest extends 
TestLogger {
                inputsLocationsRetriever.assignTaskManagerLocation(producerId);
 
                
assertTrue(consumerSlotAssignment.getLogicalSlotFuture().isDone());
-               assertEquals(0, 
executionSlotAllocator.getNumberOfPendingSlotAssignments());
+               
assertThat(executionSlotAllocator.getPendingSlotAssignments().keySet(), 
hasSize(0));
        }
 
        /**
@@ -160,109 +154,47 @@ public class DefaultExecutionSlotAllocatorTest extends 
TestLogger {
                assertThat(expectedSlotProfile.getPreferredLocations(), 
contains(taskManagerLocation));
        }
 
-       /**
-        * Tests that cancels an execution vertex which is not existed.
-        */
        @Test
-       public void testCancelNonExistingExecutionVertex() {
-               final DefaultExecutionSlotAllocator executionSlotAllocator = 
createExecutionSlotAllocator();
-
-               ExecutionVertexID inValidExecutionVertexId = new 
ExecutionVertexID(new JobVertexID(), 0);
-               executionSlotAllocator.cancel(inValidExecutionVertexId);
-
-               assertThat(slotProvider.getCancelledSlotRequestIds(), 
is(empty()));
-       }
-
-       /**
-        * Tests that cancels a slot request which has already been fulfilled.
-        */
-       @Test
-       public void testCancelFulfilledSlotRequest() {
-               final ExecutionVertexID producerId = new ExecutionVertexID(new 
JobVertexID(), 0);
+       public void testDuplicatedSlotAllocationIsNotAllowed() {
+               final ExecutionVertexID executionVertexId = new 
ExecutionVertexID(new JobVertexID(), 0);
 
                final DefaultExecutionSlotAllocator executionSlotAllocator = 
createExecutionSlotAllocator();
+               slotProvider.disableSlotAllocation();
 
                final List<ExecutionVertexSchedulingRequirements> 
schedulingRequirements =
-                               createSchedulingRequirements(producerId);
+                       createSchedulingRequirements(executionVertexId);
                executionSlotAllocator.allocateSlotsFor(schedulingRequirements);
 
-               executionSlotAllocator.cancel(producerId);
-
-               assertThat(slotProvider.getCancelledSlotRequestIds(), 
is(empty()));
+               try {
+                       
executionSlotAllocator.allocateSlotsFor(schedulingRequirements);
+                       fail("exception should happen");
+               } catch (IllegalStateException e) {
+                       // IllegalStateException is expected
+               }
        }
 
-       /**
-        * Tests that cancels a slot request which has not been fulfilled.
-        */
        @Test
-       public void testCancelUnFulfilledSlotRequest() throws Exception {
-               final ExecutionVertexID producerId = new ExecutionVertexID(new 
JobVertexID(), 0);
-
+       public void testSlotAssignmentIsProperlyRegistered() {
                final DefaultExecutionSlotAllocator executionSlotAllocator = 
createExecutionSlotAllocator();
 
-               slotProvider.disableSlotAllocation();
+               final ExecutionVertexID executionVertexID = new 
ExecutionVertexID(new JobVertexID(), 0);
                final List<ExecutionVertexSchedulingRequirements> 
schedulingRequirements =
-                               createSchedulingRequirements(producerId);
-               Collection<SlotExecutionVertexAssignment> assignments = 
executionSlotAllocator.allocateSlotsFor(schedulingRequirements);
-
-               executionSlotAllocator.cancel(producerId);
-
-               assertThat(slotProvider.getCancelledSlotRequestIds(), 
hasSize(1));
-               assertThat(slotProvider.getCancelledSlotRequestIds(), 
contains(slotProvider.getReceivedSlotRequestIds().toArray()));
-
-               try {
-                       
assignments.iterator().next().getLogicalSlotFuture().get();
-                       fail("Expect a CancellationException but got nothing.");
-               } catch (CancellationException ignored) {
-                       // Expected exception
-               }
-       }
+                       createSchedulingRequirements(executionVertexID);
 
-       /**
-        * Tests that all prior allocation ids are computed by union all 
previous allocation ids in scheduling requirements.
-        */
-       @Test
-       public void testComputeAllPriorAllocationIds() {
-               List<AllocationID> expectAllocationIds = Arrays.asList(new 
AllocationID(), new AllocationID());
-               List<ExecutionVertexSchedulingRequirements> 
testSchedulingRequirements = Arrays.asList(
-                               new 
ExecutionVertexSchedulingRequirements.Builder().
-                                               withExecutionVertexId(new 
ExecutionVertexID(new JobVertexID(), 0)).
-                                               
withPreviousAllocationId(expectAllocationIds.get(0)).
-                                               build(),
-                               new 
ExecutionVertexSchedulingRequirements.Builder().
-                                               withExecutionVertexId(new 
ExecutionVertexID(new JobVertexID(), 1)).
-                                               
withPreviousAllocationId(expectAllocationIds.get(0)).
-                                               build(),
-                               new 
ExecutionVertexSchedulingRequirements.Builder().
-                                               withExecutionVertexId(new 
ExecutionVertexID(new JobVertexID(), 2)).
-                                               
withPreviousAllocationId(expectAllocationIds.get(1)).
-                                               build(),
-                               new 
ExecutionVertexSchedulingRequirements.Builder().
-                                               withExecutionVertexId(new 
ExecutionVertexID(new JobVertexID(), 3)).
-                                               build()
-               );
+               slotProvider.disableSlotAllocation();
+               final Collection<SlotExecutionVertexAssignment> 
slotExecutionVertexAssignments =
+                       
executionSlotAllocator.allocateSlotsFor(schedulingRequirements);
 
-               Set<AllocationID> allPriorAllocationIds = 
DefaultExecutionSlotAllocator.computeAllPriorAllocationIds(testSchedulingRequirements);
-               assertThat(allPriorAllocationIds, 
containsInAnyOrder(expectAllocationIds.toArray()));
-       }
+               final SlotExecutionVertexAssignment slotAssignment = 
slotExecutionVertexAssignments.iterator().next();
 
-       @Test
-       public void testDuplicatedSlotAllocationIsNotAllowed() {
-               final ExecutionVertexID executionVertexId = new 
ExecutionVertexID(new JobVertexID(), 0);
+               
assertThat(executionSlotAllocator.getPendingSlotAssignments().values(), 
contains(slotAssignment));
 
-               final DefaultExecutionSlotAllocator executionSlotAllocator = 
createExecutionSlotAllocator();
-               slotProvider.disableSlotAllocation();
+               executionSlotAllocator.cancel(executionVertexID);
 
-               final List<ExecutionVertexSchedulingRequirements> 
schedulingRequirements =
-                       createSchedulingRequirements(executionVertexId);
-               executionSlotAllocator.allocateSlotsFor(schedulingRequirements);
+               
assertThat(executionSlotAllocator.getPendingSlotAssignments().keySet(), 
hasSize(0));
 
-               try {
-                       
executionSlotAllocator.allocateSlotsFor(schedulingRequirements);
-                       fail("exception should happen");
-               } catch (IllegalStateException e) {
-                       // IllegalStateException is expected
-               }
+               final SlotRequestId slotRequestId = 
slotProvider.slotAllocationRequests.get(0).f0;
+               assertThat(slotProvider.getCancelledSlotRequestIds(), 
contains(slotRequestId));
        }
 
        private DefaultExecutionSlotAllocator createExecutionSlotAllocator() {
@@ -317,18 +249,12 @@ public class DefaultExecutionSlotAllocatorTest extends 
TestLogger {
                        return 
Collections.unmodifiableList(slotAllocationRequests);
                }
 
-               public List<SlotRequestId> getReceivedSlotRequestIds() {
-                       return slotAllocationRequests.stream()
-                                       .map(requestTuple -> requestTuple.f0)
-                                       .collect(Collectors.toList());
-               }
-
-               public List<SlotRequestId> getCancelledSlotRequestIds() {
-                       return 
Collections.unmodifiableList(cancelledSlotRequestIds);
-               }
-
                public void disableSlotAllocation() {
                        slotAllocationDisabled = true;
                }
+
+               List<SlotRequestId> getCancelledSlotRequestIds() {
+                       return cancelledSlotRequestIds;
+               }
        }
 }

Reply via email to