This is an automated email from the ASF dual-hosted git repository.

zhuzh pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/flink.git

commit 95cf59a669610bfcf12a561281322113ea32cd90
Author: Zhu Zhu <[email protected]>
AuthorDate: Thu Jun 11 22:57:00 2020 +0800

    [FLINK-17018][runtime] Introduce OneSlotPerExecutionSlotAllocator which 
will request one physical slot for each single execution vertex
    
    OneSlotPerExecutionSlotAllocator allocates slots in bulks so that the 
SlotProvider can check whether this bulk of slot requests can be fulfilled at 
the same time.
    It has several limitations:
    1. Slot sharing will be ignored.
    2. Co-location constraints are not allowed.
    3. Intra-bulk input location preferences will be ignored.
---
 .../jobmaster/slotpool/PhysicalSlotRequest.java    |   6 +-
 .../jobmaster/slotpool/SingleLogicalSlot.java      |   2 +-
 .../OneSlotPerExecutionSlotAllocator.java          | 217 +++++++++++++++
 .../OneSlotPerExecutionSlotAllocatorTest.java      | 304 +++++++++++++++++++++
 4 files changed, 525 insertions(+), 4 deletions(-)

diff --git 
a/flink-runtime/src/main/java/org/apache/flink/runtime/jobmaster/slotpool/PhysicalSlotRequest.java
 
b/flink-runtime/src/main/java/org/apache/flink/runtime/jobmaster/slotpool/PhysicalSlotRequest.java
index b953e43..60030ca 100644
--- 
a/flink-runtime/src/main/java/org/apache/flink/runtime/jobmaster/slotpool/PhysicalSlotRequest.java
+++ 
b/flink-runtime/src/main/java/org/apache/flink/runtime/jobmaster/slotpool/PhysicalSlotRequest.java
@@ -42,11 +42,11 @@ public class PhysicalSlotRequest {
                this.slotWillBeOccupiedIndefinitely = 
slotWillBeOccupiedIndefinitely;
        }
 
-       SlotRequestId getSlotRequestId() {
+       public SlotRequestId getSlotRequestId() {
                return slotRequestId;
        }
 
-       SlotProfile getSlotProfile() {
+       public SlotProfile getSlotProfile() {
                return slotProfile;
        }
 
@@ -63,7 +63,7 @@ public class PhysicalSlotRequest {
 
                private final PhysicalSlot physicalSlot;
 
-               Result(final SlotRequestId slotRequestId, final PhysicalSlot 
physicalSlot) {
+               public Result(final SlotRequestId slotRequestId, final 
PhysicalSlot physicalSlot) {
                        this.slotRequestId = slotRequestId;
                        this.physicalSlot = physicalSlot;
                }
diff --git 
a/flink-runtime/src/main/java/org/apache/flink/runtime/jobmaster/slotpool/SingleLogicalSlot.java
 
b/flink-runtime/src/main/java/org/apache/flink/runtime/jobmaster/slotpool/SingleLogicalSlot.java
index 710f003..f5ba44c 100644
--- 
a/flink-runtime/src/main/java/org/apache/flink/runtime/jobmaster/slotpool/SingleLogicalSlot.java
+++ 
b/flink-runtime/src/main/java/org/apache/flink/runtime/jobmaster/slotpool/SingleLogicalSlot.java
@@ -166,7 +166,7 @@ public class SingleLogicalSlot implements LogicalSlot, 
PhysicalSlot.Payload {
                return slotSharingGroupId;
        }
 
-       static SingleLogicalSlot allocateFromPhysicalSlot(
+       public static SingleLogicalSlot allocateFromPhysicalSlot(
                        final SlotRequestId slotRequestId,
                        final PhysicalSlot physicalSlot,
                        final Locality locality,
diff --git 
a/flink-runtime/src/main/java/org/apache/flink/runtime/scheduler/OneSlotPerExecutionSlotAllocator.java
 
b/flink-runtime/src/main/java/org/apache/flink/runtime/scheduler/OneSlotPerExecutionSlotAllocator.java
new file mode 100644
index 0000000..c350f5d
--- /dev/null
+++ 
b/flink-runtime/src/main/java/org/apache/flink/runtime/scheduler/OneSlotPerExecutionSlotAllocator.java
@@ -0,0 +1,217 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.runtime.scheduler;
+
+import org.apache.flink.api.common.time.Time;
+import org.apache.flink.runtime.clusterframework.types.AllocationID;
+import org.apache.flink.runtime.clusterframework.types.SlotProfile;
+import org.apache.flink.runtime.concurrent.FutureUtils;
+import org.apache.flink.runtime.jobmanager.scheduler.Locality;
+import org.apache.flink.runtime.jobmaster.LogicalSlot;
+import org.apache.flink.runtime.jobmaster.SlotOwner;
+import org.apache.flink.runtime.jobmaster.SlotRequestId;
+import org.apache.flink.runtime.jobmaster.slotpool.BulkSlotProvider;
+import org.apache.flink.runtime.jobmaster.slotpool.PhysicalSlotRequest;
+import org.apache.flink.runtime.jobmaster.slotpool.SingleLogicalSlot;
+import org.apache.flink.runtime.jobmaster.slotpool.SlotProvider;
+import org.apache.flink.runtime.scheduler.strategy.ExecutionVertexID;
+import org.apache.flink.util.FlinkException;
+
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import java.util.ArrayList;
+import java.util.Collection;
+import java.util.Collections;
+import java.util.LinkedHashMap;
+import java.util.List;
+import java.util.Map;
+import java.util.Set;
+import java.util.concurrent.CompletableFuture;
+import java.util.stream.Collectors;
+
+import static org.apache.flink.util.Preconditions.checkNotNull;
+import static org.apache.flink.util.Preconditions.checkState;
+
+/**
+ * This slot allocator will request one physical slot for each single 
execution vertex.
+ * The slots will be requested in bulks so that the {@link SlotProvider} can 
check
+ * whether this bulk of slot requests can be fulfilled at the same time.
+ * It has several limitations:
+ *
+ * <p>1. Slot sharing will be ignored.
+ *
+ * <p>2. Co-location constraints are not allowed.
+ *
+ * <p>3. Intra-bulk input location preferences will be ignored.
+ */
+class OneSlotPerExecutionSlotAllocator extends AbstractExecutionSlotAllocator 
implements SlotOwner {
+
+       private static final Logger LOG = 
LoggerFactory.getLogger(OneSlotPerExecutionSlotAllocator.class);
+
+       private final BulkSlotProvider slotProvider;
+
+       private final boolean slotWillBeOccupiedIndefinitely;
+
+       private final Time allocationTimeout;
+
+       OneSlotPerExecutionSlotAllocator(
+                       final BulkSlotProvider slotProvider,
+                       final PreferredLocationsRetriever 
preferredLocationsRetriever,
+                       final boolean slotWillBeOccupiedIndefinitely,
+                       final Time allocationTimeout) {
+
+               super(preferredLocationsRetriever);
+
+               this.slotProvider = checkNotNull(slotProvider);
+               this.slotWillBeOccupiedIndefinitely = 
slotWillBeOccupiedIndefinitely;
+               this.allocationTimeout = checkNotNull(allocationTimeout);
+       }
+
+       @Override
+       public List<SlotExecutionVertexAssignment> allocateSlotsFor(
+                       final List<ExecutionVertexSchedulingRequirements> 
executionVertexSchedulingRequirements) {
+
+               
validateSchedulingRequirements(executionVertexSchedulingRequirements);
+
+               
validateNoCoLocationConstraint(executionVertexSchedulingRequirements);
+
+               // LinkedHashMap is needed to retain the given order
+               final LinkedHashMap<SlotRequestId, 
SlotExecutionVertexAssignment> slotExecutionVertexAssignments =
+                       
createSlotExecutionVertexAssignments(executionVertexSchedulingRequirements);
+
+               final Map<ExecutionVertexID, SlotRequestId> 
executionVertexSlotRequestIds = slotExecutionVertexAssignments
+                       .entrySet()
+                       .stream()
+                       .collect(Collectors.toMap(e -> 
e.getValue().getExecutionVertexId(), Map.Entry::getKey));
+
+               final List<CompletableFuture<PhysicalSlotRequest>> 
physicalSlotRequestFutures =
+                       createPhysicalSlotRequestFutures(
+                               executionVertexSchedulingRequirements,
+                               executionVertexSlotRequestIds);
+
+               allocateSlotsForAssignments(
+                       physicalSlotRequestFutures,
+                       slotExecutionVertexAssignments);
+
+               return Collections.unmodifiableList(new 
ArrayList<>(slotExecutionVertexAssignments.values()));
+       }
+
+       private static void validateNoCoLocationConstraint(
+                       final Collection<ExecutionVertexSchedulingRequirements> 
schedulingRequirements) {
+
+               final boolean hasCoLocationConstraint = 
schedulingRequirements.stream()
+                       .anyMatch(r -> r.getCoLocationConstraint() != null);
+               checkState(
+                       !hasCoLocationConstraint,
+                       "Jobs with co-location constraints are not allowed to 
run with pipelined region scheduling strategy.");
+       }
+
+       private LinkedHashMap<SlotRequestId, SlotExecutionVertexAssignment> 
createSlotExecutionVertexAssignments(
+                       final List<ExecutionVertexSchedulingRequirements> 
executionVertexSchedulingRequirements) {
+
+               final LinkedHashMap<SlotRequestId, 
SlotExecutionVertexAssignment> assignments = new LinkedHashMap<>();
+               for (ExecutionVertexSchedulingRequirements 
schedulingRequirements : executionVertexSchedulingRequirements) {
+                       final ExecutionVertexID executionVertexId = 
schedulingRequirements.getExecutionVertexId();
+
+                       final SlotRequestId slotRequestId = new SlotRequestId();
+                       final SlotExecutionVertexAssignment 
slotExecutionVertexAssignment =
+                               createAndRegisterSlotExecutionVertexAssignment(
+                                       executionVertexId,
+                                       new CompletableFuture<>(),
+                                       throwable -> 
slotProvider.cancelSlotRequest(slotRequestId, throwable));
+                       assignments.put(slotRequestId, 
slotExecutionVertexAssignment);
+               }
+
+               return assignments;
+       }
+
+       private List<CompletableFuture<PhysicalSlotRequest>> 
createPhysicalSlotRequestFutures(
+                       final List<ExecutionVertexSchedulingRequirements> 
executionVertexSchedulingRequirements,
+                       final Map<ExecutionVertexID, SlotRequestId> 
executionVertexSlotRequestIds) {
+
+               final Set<AllocationID> allPreviousAllocationIds =
+                       
computeAllPriorAllocationIds(executionVertexSchedulingRequirements);
+
+               final List<CompletableFuture<PhysicalSlotRequest>> 
physicalSlotRequestFutures =
+                       new 
ArrayList<>(executionVertexSchedulingRequirements.size());
+               for (ExecutionVertexSchedulingRequirements 
schedulingRequirements : executionVertexSchedulingRequirements) {
+                       final ExecutionVertexID executionVertexId = 
schedulingRequirements.getExecutionVertexId();
+                       final SlotRequestId slotRequestId = 
executionVertexSlotRequestIds.get(executionVertexId);
+
+                       LOG.debug("Allocate slot with id {} for execution {}", 
slotRequestId, executionVertexId);
+
+                       // use the task resource profile as the physical slot 
resource requirement since slot sharing is ignored
+                       final CompletableFuture<SlotProfile> slotProfileFuture 
= getSlotProfileFuture(
+                               schedulingRequirements,
+                               schedulingRequirements.getTaskResourceProfile(),
+                               executionVertexSlotRequestIds.keySet(),
+                               allPreviousAllocationIds);
+
+                       final CompletableFuture<PhysicalSlotRequest> 
physicalSlotRequestFuture =
+                               slotProfileFuture.thenApply(
+                                       slotProfile -> 
createPhysicalSlotRequest(slotRequestId, slotProfile));
+                       
physicalSlotRequestFutures.add(physicalSlotRequestFuture);
+               }
+
+               return physicalSlotRequestFutures;
+       }
+
+       private PhysicalSlotRequest createPhysicalSlotRequest(
+                       final SlotRequestId slotRequestId,
+                       final SlotProfile slotProfile) {
+               return new PhysicalSlotRequest(slotRequestId, slotProfile, 
slotWillBeOccupiedIndefinitely);
+       }
+
+       private void allocateSlotsForAssignments(
+                       final List<CompletableFuture<PhysicalSlotRequest>> 
physicalSlotRequestFutures,
+                       final Map<SlotRequestId, SlotExecutionVertexAssignment> 
slotExecutionVertexAssignments) {
+
+               FutureUtils.combineAll(physicalSlotRequestFutures)
+                       .thenCompose(physicalSlotRequests -> 
slotProvider.allocatePhysicalSlots(physicalSlotRequests, allocationTimeout))
+                       .thenAccept(physicalSlotRequestResults -> {
+                               for (PhysicalSlotRequest.Result result : 
physicalSlotRequestResults) {
+                                       final SlotRequestId slotRequestId = 
result.getSlotRequestId();
+                                       final SlotExecutionVertexAssignment 
assignment = slotExecutionVertexAssignments.get(slotRequestId);
+
+                                       checkState(assignment != null);
+
+                                       final LogicalSlot logicalSlot = 
SingleLogicalSlot.allocateFromPhysicalSlot(
+                                               slotRequestId,
+                                               result.getPhysicalSlot(),
+                                               Locality.UNKNOWN,
+                                               this,
+                                               slotWillBeOccupiedIndefinitely);
+                                       
assignment.getLogicalSlotFuture().complete(logicalSlot);
+                               }
+                       })
+                       .exceptionally(ex -> {
+                               slotExecutionVertexAssignments.values().forEach(
+                                       assignment -> 
assignment.getLogicalSlotFuture().completeExceptionally(ex));
+                               return null;
+                       });
+       }
+
+       @Override
+       public void returnLogicalSlot(LogicalSlot logicalSlot) {
+               slotProvider.cancelSlotRequest(
+                       logicalSlot.getSlotRequestId(),
+                       new FlinkException("Slot is being returned to 
OneSlotPerExecutionSlotAllocator."));
+       }
+}
diff --git 
a/flink-runtime/src/test/java/org/apache/flink/runtime/scheduler/OneSlotPerExecutionSlotAllocatorTest.java
 
b/flink-runtime/src/test/java/org/apache/flink/runtime/scheduler/OneSlotPerExecutionSlotAllocatorTest.java
new file mode 100644
index 0000000..c74f762
--- /dev/null
+++ 
b/flink-runtime/src/test/java/org/apache/flink/runtime/scheduler/OneSlotPerExecutionSlotAllocatorTest.java
@@ -0,0 +1,304 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.runtime.scheduler;
+
+import org.apache.flink.api.common.time.Time;
+import org.apache.flink.runtime.clusterframework.types.AllocationID;
+import org.apache.flink.runtime.clusterframework.types.ResourceProfile;
+import org.apache.flink.runtime.clusterframework.types.SlotProfile;
+import org.apache.flink.runtime.concurrent.ComponentMainThreadExecutor;
+import org.apache.flink.runtime.concurrent.FutureUtils;
+import org.apache.flink.runtime.instance.SlotSharingGroupId;
+import org.apache.flink.runtime.jobgraph.JobVertexID;
+import org.apache.flink.runtime.jobmanager.scheduler.CoLocationConstraint;
+import org.apache.flink.runtime.jobmanager.scheduler.CoLocationGroup;
+import org.apache.flink.runtime.jobmaster.SlotRequestId;
+import org.apache.flink.runtime.jobmaster.slotpool.BulkSlotProvider;
+import org.apache.flink.runtime.jobmaster.slotpool.PhysicalSlotRequest;
+import org.apache.flink.runtime.scheduler.strategy.ExecutionVertexID;
+import org.apache.flink.runtime.taskmanager.LocalTaskManagerLocation;
+import org.apache.flink.runtime.taskmanager.TaskManagerLocation;
+import org.apache.flink.util.TestLogger;
+
+import org.junit.Before;
+import org.junit.Test;
+
+import java.util.ArrayList;
+import java.util.Collection;
+import java.util.Collections;
+import java.util.List;
+import java.util.concurrent.CompletableFuture;
+
+import static 
org.apache.flink.runtime.jobmaster.slotpool.PhysicalSlotTestUtils.createPhysicalSlot;
+import static 
org.apache.flink.runtime.scheduler.ExecutionSlotAllocatorTestUtils.createSchedulingRequirements;
+import static 
org.apache.flink.runtime.scheduler.ExecutionSlotAllocatorTestUtils.findSlotAssignmentByExecutionVertexId;
+import static org.hamcrest.CoreMatchers.is;
+import static org.hamcrest.Matchers.contains;
+import static org.hamcrest.Matchers.equalTo;
+import static org.hamcrest.Matchers.hasSize;
+import static org.junit.Assert.assertThat;
+
+/**
+ * Tests for {@link OneSlotPerExecutionSlotAllocator}.
+ */
+public class OneSlotPerExecutionSlotAllocatorTest extends TestLogger {
+
+       private TestingBulkSlotProvider slotProvider;
+
+       @Before
+       public void setUp() throws Exception {
+               slotProvider = new TestingBulkSlotProvider();
+       }
+
+       @Test
+       public void testSucceededSlotAllocation() {
+               final ExecutionSlotAllocator executionSlotAllocator = 
createExecutionSlotAllocator();
+
+               final ExecutionVertexID executionVertexId = new 
ExecutionVertexID(new JobVertexID(), 0);
+               final List<ExecutionVertexSchedulingRequirements> 
schedulingRequirements =
+                       createSchedulingRequirements(executionVertexId);
+
+               final Collection<SlotExecutionVertexAssignment> 
slotExecutionVertexAssignments =
+                       
executionSlotAllocator.allocateSlotsFor(schedulingRequirements);
+
+               assertThat(slotExecutionVertexAssignments, hasSize(1));
+
+               final SlotExecutionVertexAssignment slotAssignment = 
slotExecutionVertexAssignments.iterator().next();
+
+               assertThat(slotAssignment.getExecutionVertexId(), 
equalTo(executionVertexId));
+               assertThat(slotAssignment.getLogicalSlotFuture().isDone(), 
is(true));
+               
assertThat(slotAssignment.getLogicalSlotFuture().isCompletedExceptionally(), 
is(false));
+       }
+
+       @Test
+       public void testFailedSlotAllocation() {
+               final OneSlotPerExecutionSlotAllocator executionSlotAllocator = 
createExecutionSlotAllocator();
+
+               final ExecutionVertexID executionVertexId = new 
ExecutionVertexID(new JobVertexID(), 0);
+               final List<ExecutionVertexSchedulingRequirements> 
schedulingRequirements =
+                       createSchedulingRequirements(executionVertexId);
+
+               slotProvider.forceFailingSlotAllocation();
+               final Collection<SlotExecutionVertexAssignment> 
slotExecutionVertexAssignments =
+                       
executionSlotAllocator.allocateSlotsFor(schedulingRequirements);
+
+               final SlotExecutionVertexAssignment slotAssignment = 
slotExecutionVertexAssignments.iterator().next();
+
+               
assertThat(slotAssignment.getLogicalSlotFuture().isCompletedExceptionally(), 
is(true));
+               
assertThat(executionSlotAllocator.getPendingSlotAssignments().keySet(), 
hasSize(0));
+
+               final SlotRequestId slotRequestId = 
slotProvider.getSlotRequests().get(0).getSlotRequestId();
+               assertThat(slotProvider.getCancelledSlotRequestIds(), 
contains(slotRequestId));
+       }
+
+       @Test
+       public void testInterBulkInputLocationPreferencesAreRespected() {
+               final ExecutionVertexID producerId = new ExecutionVertexID(new 
JobVertexID(), 0);
+               final ExecutionVertexID consumerId = new ExecutionVertexID(new 
JobVertexID(), 0);
+
+               final TestingInputsLocationsRetriever inputsLocationsRetriever 
= new TestingInputsLocationsRetriever.Builder()
+                       .connectConsumerToProducer(consumerId, producerId)
+                       .build();
+
+               final ExecutionSlotAllocator executionSlotAllocator = 
createExecutionSlotAllocator(
+                       new TestingStateLocationRetriever(),
+                       inputsLocationsRetriever);
+
+               inputsLocationsRetriever.markScheduled(producerId);
+               final List<ExecutionVertexSchedulingRequirements> 
schedulingRequirementsForProducer =
+                       createSchedulingRequirements(producerId);
+               final Collection<SlotExecutionVertexAssignment> 
slotExecutionVertexAssignmentsForProducer =
+                       
executionSlotAllocator.allocateSlotsFor(schedulingRequirementsForProducer);
+               final SlotExecutionVertexAssignment producerSlotAssignment =
+                       findSlotAssignmentByExecutionVertexId(producerId, 
slotExecutionVertexAssignmentsForProducer);
+
+               
assertThat(producerSlotAssignment.getLogicalSlotFuture().isDone(), is(true));
+
+               inputsLocationsRetriever.markScheduled(consumerId);
+               final List<ExecutionVertexSchedulingRequirements> 
schedulingRequirementsForConsumer =
+                       createSchedulingRequirements(consumerId);
+               final Collection<SlotExecutionVertexAssignment> 
slotExecutionVertexAssignmentsForConsumer =
+                       
executionSlotAllocator.allocateSlotsFor(schedulingRequirementsForConsumer);
+               final SlotExecutionVertexAssignment consumerSlotAssignment =
+                       findSlotAssignmentByExecutionVertexId(consumerId, 
slotExecutionVertexAssignmentsForConsumer);
+
+               
assertThat(consumerSlotAssignment.getLogicalSlotFuture().isDone(), is(false));
+
+               inputsLocationsRetriever.assignTaskManagerLocation(producerId);
+
+               
assertThat(consumerSlotAssignment.getLogicalSlotFuture().isDone(), is(true));
+       }
+
+       @Test
+       public void testIntraBulkInputLocationPreferencesDoNotBlockAllocation() 
{
+               final ExecutionVertexID producerId = new ExecutionVertexID(new 
JobVertexID(), 0);
+               final ExecutionVertexID consumerId = new ExecutionVertexID(new 
JobVertexID(), 0);
+
+               final TestingInputsLocationsRetriever inputsLocationsRetriever 
= new TestingInputsLocationsRetriever.Builder()
+                       .connectConsumerToProducer(consumerId, producerId)
+                       .build();
+
+               final ExecutionSlotAllocator executionSlotAllocator = 
createExecutionSlotAllocator(
+                       new TestingStateLocationRetriever(),
+                       inputsLocationsRetriever);
+
+               inputsLocationsRetriever.markScheduled(producerId);
+               inputsLocationsRetriever.markScheduled(consumerId);
+
+               final List<ExecutionVertexSchedulingRequirements> 
schedulingRequirements =
+                       createSchedulingRequirements(producerId, consumerId);
+               final Collection<SlotExecutionVertexAssignment> 
slotExecutionVertexAssignments =
+                       
executionSlotAllocator.allocateSlotsFor(schedulingRequirements);
+
+               assertThat(slotExecutionVertexAssignments, hasSize(2));
+
+               final SlotExecutionVertexAssignment producerSlotAssignment =
+                       findSlotAssignmentByExecutionVertexId(producerId, 
slotExecutionVertexAssignments);
+               final SlotExecutionVertexAssignment consumerSlotAssignment =
+                       findSlotAssignmentByExecutionVertexId(consumerId, 
slotExecutionVertexAssignments);
+
+               
assertThat(producerSlotAssignment.getLogicalSlotFuture().isDone(), is(true));
+               
assertThat(consumerSlotAssignment.getLogicalSlotFuture().isDone(), is(true));
+       }
+
+       @Test
+       public void testCreatedSlotRequests() {
+               final ExecutionVertexID executionVertexId = new 
ExecutionVertexID(new JobVertexID(), 0);
+               final AllocationID allocationId = new AllocationID();
+               final SlotSharingGroupId sharingGroupId = new 
SlotSharingGroupId();
+               final ResourceProfile taskResourceProfile = 
ResourceProfile.fromResources(0.5, 250);
+               final ResourceProfile physicalSlotResourceProfile = 
ResourceProfile.fromResources(1.0, 300);
+               final TaskManagerLocation taskManagerLocation = new 
LocalTaskManagerLocation();
+
+               final TestingStateLocationRetriever stateLocationRetriever = 
new TestingStateLocationRetriever();
+               stateLocationRetriever.setStateLocation(executionVertexId, 
taskManagerLocation);
+
+               final ExecutionSlotAllocator executionSlotAllocator = 
createExecutionSlotAllocator(
+                       stateLocationRetriever,
+                       new TestingInputsLocationsRetriever.Builder().build());
+
+               final List<ExecutionVertexSchedulingRequirements> 
schedulingRequirements = Collections.singletonList(
+                       new ExecutionVertexSchedulingRequirements.Builder()
+                               .withExecutionVertexId(executionVertexId)
+                               .withPreviousAllocationId(allocationId)
+                               .withSlotSharingGroupId(sharingGroupId)
+                               
.withPhysicalSlotResourceProfile(physicalSlotResourceProfile)
+                               .withTaskResourceProfile(taskResourceProfile)
+                               .build()
+               );
+
+               executionSlotAllocator.allocateSlotsFor(schedulingRequirements);
+               assertThat(slotProvider.getSlotRequests(), hasSize(1));
+
+               final SlotProfile requestSlotProfile = 
slotProvider.getSlotRequests().iterator().next().getSlotProfile();
+
+               assertThat(requestSlotProfile.getPreferredAllocations(), 
contains(allocationId));
+               
assertThat(requestSlotProfile.getPreviousExecutionGraphAllocations(), 
contains(allocationId));
+               assertThat(requestSlotProfile.getTaskResourceProfile(), 
equalTo(taskResourceProfile));
+               assertThat(requestSlotProfile.getPreferredLocations(), 
contains(taskManagerLocation));
+               // task resource profile is used instead of slot sharing group 
resource profile since slot sharing is ignored
+               assertThat(requestSlotProfile.getPhysicalSlotResourceProfile(), 
equalTo(taskResourceProfile));
+       }
+
+       @Test(expected = IllegalStateException.class)
+       public void testCoLocationConstraintThrowsException() {
+               final ExecutionSlotAllocator executionSlotAllocator = 
createExecutionSlotAllocator();
+
+               final CoLocationConstraint coLocationConstraint = new 
CoLocationGroup().getLocationConstraint(0);
+               final List<ExecutionVertexSchedulingRequirements> 
schedulingRequirements = Collections.singletonList(
+                       new ExecutionVertexSchedulingRequirements.Builder()
+                               .withExecutionVertexId(new 
ExecutionVertexID(new JobVertexID(), 0))
+                               .withCoLocationConstraint(coLocationConstraint)
+                               .build()
+               );
+
+               executionSlotAllocator.allocateSlotsFor(schedulingRequirements);
+       }
+
+       private OneSlotPerExecutionSlotAllocator createExecutionSlotAllocator() 
{
+               return createExecutionSlotAllocator(
+                       new TestingStateLocationRetriever(),
+                       new TestingInputsLocationsRetriever.Builder().build());
+       }
+
+       private OneSlotPerExecutionSlotAllocator createExecutionSlotAllocator(
+                       final StateLocationRetriever stateLocationRetriever,
+                       final InputsLocationsRetriever 
inputsLocationsRetriever) {
+
+               return new OneSlotPerExecutionSlotAllocator(
+                       slotProvider,
+                       new 
DefaultPreferredLocationsRetriever(stateLocationRetriever, 
inputsLocationsRetriever),
+                       true,
+                       Time.seconds(10));
+       }
+
+       private static class TestingBulkSlotProvider implements 
BulkSlotProvider {
+
+               private final List<PhysicalSlotRequest> slotRequests = new 
ArrayList<>();
+
+               private final List<SlotRequestId> cancelledSlotRequestIds = new 
ArrayList<>();
+
+               private boolean forceFailingSlotAllocation = false;
+
+               @Override
+               public void start(ComponentMainThreadExecutor 
mainThreadExecutor) {
+               }
+
+               @Override
+               public 
CompletableFuture<Collection<PhysicalSlotRequest.Result>> allocatePhysicalSlots(
+                               final Collection<PhysicalSlotRequest> 
physicalSlotRequests,
+                               final Time timeout) {
+
+                       slotRequests.addAll(physicalSlotRequests);
+
+                       if (forceFailingSlotAllocation) {
+                               return FutureUtils.completedExceptionally(new 
Exception("Forced failure"));
+                       }
+
+                       final List<PhysicalSlotRequest.Result> results = new 
ArrayList<>(physicalSlotRequests.size());
+                       for (PhysicalSlotRequest request : 
physicalSlotRequests) {
+                               final PhysicalSlotRequest.Result result = new 
PhysicalSlotRequest.Result(
+                                       request.getSlotRequestId(),
+                                       createPhysicalSlot());
+                               results.add(result);
+                       }
+                       return CompletableFuture.completedFuture(results);
+               }
+
+               @Override
+               public void cancelSlotRequest(
+                               final SlotRequestId slotRequestId,
+                               final Throwable cause) {
+                       cancelledSlotRequestIds.add(slotRequestId);
+               }
+
+               List<PhysicalSlotRequest> getSlotRequests() {
+                       return Collections.unmodifiableList(slotRequests);
+               }
+
+               List<SlotRequestId> getCancelledSlotRequestIds() {
+                       return 
Collections.unmodifiableList(cancelledSlotRequestIds);
+               }
+
+               void forceFailingSlotAllocation() {
+                       this.forceFailingSlotAllocation = true;
+               }
+       }
+
+}

Reply via email to