This is an automated email from the ASF dual-hosted git repository. zhuzh pushed a commit to branch master in repository https://gitbox.apache.org/repos/asf/flink.git
commit 6d9eb50862624cb2af2d77e747550e8ce28908bd Author: Zhu Zhu <[email protected]> AuthorDate: Thu Jun 11 22:50:00 2020 +0800 [FLINK-17018][runtime] Extract common logics of DefaultExecutionSlotAllocator into AbstractExecutionSlotAllocator --- .../scheduler/AbstractExecutionSlotAllocator.java | 131 +++++++++++++++ .../scheduler/DefaultExecutionSlotAllocator.java | 132 +++++---------- .../AbstractExecutionSlotAllocatorTest.java | 178 +++++++++++++++++++++ .../DefaultExecutionSlotAllocatorTest.java | 128 ++++----------- 4 files changed, 374 insertions(+), 195 deletions(-) diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/scheduler/AbstractExecutionSlotAllocator.java b/flink-runtime/src/main/java/org/apache/flink/runtime/scheduler/AbstractExecutionSlotAllocator.java new file mode 100644 index 0000000..d8dbf3b --- /dev/null +++ b/flink-runtime/src/main/java/org/apache/flink/runtime/scheduler/AbstractExecutionSlotAllocator.java @@ -0,0 +1,131 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.flink.runtime.scheduler; + +import org.apache.flink.annotation.VisibleForTesting; +import org.apache.flink.runtime.clusterframework.types.AllocationID; +import org.apache.flink.runtime.clusterframework.types.ResourceProfile; +import org.apache.flink.runtime.clusterframework.types.SlotProfile; +import org.apache.flink.runtime.jobmaster.LogicalSlot; +import org.apache.flink.runtime.scheduler.strategy.ExecutionVertexID; +import org.apache.flink.runtime.taskmanager.TaskManagerLocation; + +import java.util.Collection; +import java.util.Collections; +import java.util.HashMap; +import java.util.Map; +import java.util.Objects; +import java.util.Set; +import java.util.concurrent.CompletableFuture; +import java.util.function.Consumer; +import java.util.stream.Collectors; + +import static org.apache.flink.util.Preconditions.checkNotNull; +import static org.apache.flink.util.Preconditions.checkState; + +/** + * Base class for all {@link ExecutionSlotAllocator}. It is responsible to allocate slots for tasks and + * keep the unfulfilled slot requests for further cancellation. + */ +abstract class AbstractExecutionSlotAllocator implements ExecutionSlotAllocator { + + private final Map<ExecutionVertexID, SlotExecutionVertexAssignment> pendingSlotAssignments; + + private final PreferredLocationsRetriever preferredLocationsRetriever; + + AbstractExecutionSlotAllocator(final PreferredLocationsRetriever preferredLocationsRetriever) { + this.preferredLocationsRetriever = checkNotNull(preferredLocationsRetriever); + this.pendingSlotAssignments = new HashMap<>(); + } + + @Override + public void cancel(final ExecutionVertexID executionVertexId) { + final SlotExecutionVertexAssignment slotExecutionVertexAssignment = pendingSlotAssignments.get(executionVertexId); + if (slotExecutionVertexAssignment != null) { + slotExecutionVertexAssignment.getLogicalSlotFuture().cancel(false); + } + } + + void validateSchedulingRequirements(final Collection<ExecutionVertexSchedulingRequirements> schedulingRequirements) { + schedulingRequirements.stream() + .map(ExecutionVertexSchedulingRequirements::getExecutionVertexId) + .forEach(id -> checkState( + !pendingSlotAssignments.containsKey(id), + "BUG: vertex %s tries to allocate a slot when its previous slot request is still pending", id)); + } + + SlotExecutionVertexAssignment createAndRegisterSlotExecutionVertexAssignment( + final ExecutionVertexID executionVertexId, + final CompletableFuture<LogicalSlot> logicalSlotFuture, + final Consumer<Throwable> slotRequestFailureHandler) { + + final SlotExecutionVertexAssignment slotExecutionVertexAssignment = + new SlotExecutionVertexAssignment(executionVertexId, logicalSlotFuture); + + // add to map first in case the slot future is already completed + pendingSlotAssignments.put(executionVertexId, slotExecutionVertexAssignment); + + logicalSlotFuture.whenComplete( + (ignored, throwable) -> { + pendingSlotAssignments.remove(executionVertexId); + if (throwable != null) { + slotRequestFailureHandler.accept(throwable); + } + }); + + return slotExecutionVertexAssignment; + } + + CompletableFuture<SlotProfile> getSlotProfileFuture( + final ExecutionVertexSchedulingRequirements schedulingRequirements, + final ResourceProfile physicalSlotResourceProfile, + final Set<ExecutionVertexID> producersToIgnore, + final Set<AllocationID> allPreviousAllocationIds) { + + final CompletableFuture<Collection<TaskManagerLocation>> preferredLocationsFuture = + preferredLocationsRetriever.getPreferredLocations( + schedulingRequirements.getExecutionVertexId(), + producersToIgnore); + + return preferredLocationsFuture.thenApply( + preferredLocations -> + SlotProfile.priorAllocation( + schedulingRequirements.getTaskResourceProfile(), + physicalSlotResourceProfile, + preferredLocations, + Collections.singletonList(schedulingRequirements.getPreviousAllocationId()), + allPreviousAllocationIds)); + } + + @VisibleForTesting + static Set<AllocationID> computeAllPriorAllocationIds( + final Collection<ExecutionVertexSchedulingRequirements> executionVertexSchedulingRequirements) { + + return executionVertexSchedulingRequirements + .stream() + .map(ExecutionVertexSchedulingRequirements::getPreviousAllocationId) + .filter(Objects::nonNull) + .collect(Collectors.toSet()); + } + + @VisibleForTesting + Map<ExecutionVertexID, SlotExecutionVertexAssignment> getPendingSlotAssignments() { + return Collections.unmodifiableMap(pendingSlotAssignments); + } +} diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/scheduler/DefaultExecutionSlotAllocator.java b/flink-runtime/src/main/java/org/apache/flink/runtime/scheduler/DefaultExecutionSlotAllocator.java index c7b2dd9..07da10e 100644 --- a/flink-runtime/src/main/java/org/apache/flink/runtime/scheduler/DefaultExecutionSlotAllocator.java +++ b/flink-runtime/src/main/java/org/apache/flink/runtime/scheduler/DefaultExecutionSlotAllocator.java @@ -18,7 +18,6 @@ package org.apache.flink.runtime.scheduler; -import org.apache.flink.annotation.VisibleForTesting; import org.apache.flink.runtime.clusterframework.types.AllocationID; import org.apache.flink.runtime.clusterframework.types.SlotProfile; import org.apache.flink.runtime.executiongraph.SlotProviderStrategy; @@ -28,49 +27,34 @@ import org.apache.flink.runtime.jobmaster.LogicalSlot; import org.apache.flink.runtime.jobmaster.SlotRequestId; import org.apache.flink.runtime.jobmaster.slotpool.SlotProvider; import org.apache.flink.runtime.scheduler.strategy.ExecutionVertexID; -import org.apache.flink.runtime.taskmanager.TaskManagerLocation; import org.slf4j.Logger; import org.slf4j.LoggerFactory; import java.util.ArrayList; -import java.util.Collection; import java.util.Collections; -import java.util.HashMap; import java.util.List; -import java.util.Map; -import java.util.Objects; import java.util.Set; import java.util.concurrent.CompletableFuture; -import java.util.stream.Collectors; import static org.apache.flink.util.Preconditions.checkNotNull; -import static org.apache.flink.util.Preconditions.checkState; /** * Default {@link ExecutionSlotAllocator} which will use {@link SlotProvider} to allocate slots and * keep the unfulfilled requests for further cancellation. */ -public class DefaultExecutionSlotAllocator implements ExecutionSlotAllocator { +public class DefaultExecutionSlotAllocator extends AbstractExecutionSlotAllocator { private static final Logger LOG = LoggerFactory.getLogger(DefaultExecutionSlotAllocator.class); - /** - * Store the uncompleted slot assignments. - */ - private final Map<ExecutionVertexID, SlotExecutionVertexAssignment> pendingSlotAssignments; - private final SlotProviderStrategy slotProviderStrategy; - private final PreferredLocationsRetriever preferredLocationsRetriever; - public DefaultExecutionSlotAllocator( final SlotProviderStrategy slotProviderStrategy, final PreferredLocationsRetriever preferredLocationsRetriever) { - this.slotProviderStrategy = checkNotNull(slotProviderStrategy); - this.preferredLocationsRetriever = checkNotNull(preferredLocationsRetriever); - pendingSlotAssignments = new HashMap<>(); + super(preferredLocationsRetriever); + this.slotProviderStrategy = checkNotNull(slotProviderStrategy); } @Override @@ -86,89 +70,49 @@ public class DefaultExecutionSlotAllocator implements ExecutionSlotAllocator { for (ExecutionVertexSchedulingRequirements schedulingRequirements : executionVertexSchedulingRequirements) { final ExecutionVertexID executionVertexId = schedulingRequirements.getExecutionVertexId(); - final SlotRequestId slotRequestId = new SlotRequestId(); final SlotSharingGroupId slotSharingGroupId = schedulingRequirements.getSlotSharingGroupId(); - LOG.debug("Allocate slot with id {} for execution {}", slotRequestId, executionVertexId); - - CompletableFuture<LogicalSlot> slotFuture = calculatePreferredLocations( - executionVertexId).thenCompose( - (Collection<TaskManagerLocation> preferredLocations) -> - slotProviderStrategy.allocateSlot( - slotRequestId, - new ScheduledUnit( - executionVertexId, - slotSharingGroupId, - schedulingRequirements.getCoLocationConstraint()), - SlotProfile.priorAllocation( - schedulingRequirements.getTaskResourceProfile(), - schedulingRequirements.getPhysicalSlotResourceProfile(), - preferredLocations, - Collections.singletonList(schedulingRequirements.getPreviousAllocationId()), - allPreviousAllocationIds))); - - SlotExecutionVertexAssignment slotExecutionVertexAssignment = - new SlotExecutionVertexAssignment(executionVertexId, slotFuture); - // add to map first to avoid the future completed before added. - pendingSlotAssignments.put(executionVertexId, slotExecutionVertexAssignment); - - slotFuture.whenComplete( - (ignored, throwable) -> { - pendingSlotAssignments.remove(executionVertexId); - if (throwable != null) { - slotProviderStrategy.cancelSlotRequest(slotRequestId, slotSharingGroupId, throwable); - } - }); - - slotExecutionVertexAssignments.add(slotExecutionVertexAssignment); - } + final SlotRequestId slotRequestId = new SlotRequestId(); - return slotExecutionVertexAssignments; - } + final CompletableFuture<LogicalSlot> slotFuture = allocateSlot( + schedulingRequirements, + slotRequestId, + allPreviousAllocationIds); - private void validateSchedulingRequirements(Collection<ExecutionVertexSchedulingRequirements> schedulingRequirements) { - schedulingRequirements.stream() - .map(ExecutionVertexSchedulingRequirements::getExecutionVertexId) - .forEach(id -> checkState( - !pendingSlotAssignments.containsKey(id), - "BUG: vertex %s tries to allocate a slot when its previous slot request is still pending", id)); - } + final SlotExecutionVertexAssignment slotExecutionVertexAssignment = + createAndRegisterSlotExecutionVertexAssignment( + executionVertexId, + slotFuture, + throwable -> slotProviderStrategy.cancelSlotRequest(slotRequestId, slotSharingGroupId, throwable)); - @Override - public void cancel(ExecutionVertexID executionVertexId) { - SlotExecutionVertexAssignment slotExecutionVertexAssignment = pendingSlotAssignments.get(executionVertexId); - if (slotExecutionVertexAssignment != null) { - slotExecutionVertexAssignment.getLogicalSlotFuture().cancel(false); + slotExecutionVertexAssignments.add(slotExecutionVertexAssignment); } - } - - /** - * Calculates the preferred locations for an execution. - * It will first try to use preferred locations based on state, - * if null, will use the preferred locations based on inputs. - */ - private CompletableFuture<Collection<TaskManagerLocation>> calculatePreferredLocations( - ExecutionVertexID executionVertexId) { - return preferredLocationsRetriever.getPreferredLocations(executionVertexId, Collections.emptySet()); - } - /** - * Computes and returns a set with the prior allocation ids from all execution vertices scheduled together. - * - * @param executionVertexSchedulingRequirements contains the execution vertices which are scheduled together - */ - @VisibleForTesting - static Set<AllocationID> computeAllPriorAllocationIds( - Collection<ExecutionVertexSchedulingRequirements> executionVertexSchedulingRequirements) { - return executionVertexSchedulingRequirements - .stream() - .map(ExecutionVertexSchedulingRequirements::getPreviousAllocationId) - .filter(Objects::nonNull) - .collect(Collectors.toSet()); + return slotExecutionVertexAssignments; } - @VisibleForTesting - int getNumberOfPendingSlotAssignments() { - return pendingSlotAssignments.size(); + private CompletableFuture<LogicalSlot> allocateSlot( + final ExecutionVertexSchedulingRequirements schedulingRequirements, + final SlotRequestId slotRequestId, + final Set<AllocationID> allPreviousAllocationIds) { + + final ExecutionVertexID executionVertexId = schedulingRequirements.getExecutionVertexId(); + + LOG.debug("Allocate slot with id {} for execution {}", slotRequestId, executionVertexId); + + final CompletableFuture<SlotProfile> slotProfileFuture = getSlotProfileFuture( + schedulingRequirements, + schedulingRequirements.getPhysicalSlotResourceProfile(), + Collections.emptySet(), + allPreviousAllocationIds); + + return slotProfileFuture.thenCompose( + slotProfile -> slotProviderStrategy.allocateSlot( + slotRequestId, + new ScheduledUnit( + executionVertexId, + schedulingRequirements.getSlotSharingGroupId(), + schedulingRequirements.getCoLocationConstraint()), + slotProfile)); } } diff --git a/flink-runtime/src/test/java/org/apache/flink/runtime/scheduler/AbstractExecutionSlotAllocatorTest.java b/flink-runtime/src/test/java/org/apache/flink/runtime/scheduler/AbstractExecutionSlotAllocatorTest.java new file mode 100644 index 0000000..4077930 --- /dev/null +++ b/flink-runtime/src/test/java/org/apache/flink/runtime/scheduler/AbstractExecutionSlotAllocatorTest.java @@ -0,0 +1,178 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.flink.runtime.scheduler; + +import org.apache.flink.runtime.clusterframework.types.AllocationID; +import org.apache.flink.runtime.jobgraph.JobVertexID; +import org.apache.flink.runtime.scheduler.strategy.ExecutionVertexID; +import org.apache.flink.util.TestLogger; + +import org.junit.Before; +import org.junit.Test; + +import java.util.ArrayList; +import java.util.Arrays; +import java.util.List; +import java.util.Set; +import java.util.concurrent.CompletableFuture; + +import static org.hamcrest.CoreMatchers.is; +import static org.hamcrest.Matchers.contains; +import static org.hamcrest.Matchers.containsInAnyOrder; +import static org.hamcrest.Matchers.hasSize; +import static org.junit.Assert.assertThat; + +/** + * Tests for {@link AbstractExecutionSlotAllocator}. + */ +public class AbstractExecutionSlotAllocatorTest extends TestLogger { + + private AbstractExecutionSlotAllocator executionSlotAllocator; + + @Before + public void setUp() throws Exception { + executionSlotAllocator = new TestingExecutionSlotAllocator(); + } + + @Test + public void testCancel() { + final ExecutionVertexID executionVertexId = new ExecutionVertexID(new JobVertexID(), 0); + + final List<ExecutionVertexSchedulingRequirements> schedulingRequirements = + createSchedulingRequirements(executionVertexId); + final List<SlotExecutionVertexAssignment> assignments = + executionSlotAllocator.allocateSlotsFor(schedulingRequirements); + + executionSlotAllocator.cancel(executionVertexId); + + assertThat(assignments.get(0).getLogicalSlotFuture().isCancelled(), is(true)); + } + + @Test(expected = IllegalStateException.class) + public void testValidateSchedulingRequirements() { + final ExecutionVertexID executionVertexId = new ExecutionVertexID(new JobVertexID(), 0); + + final List<ExecutionVertexSchedulingRequirements> schedulingRequirements = + createSchedulingRequirements(executionVertexId); + executionSlotAllocator.allocateSlotsFor(schedulingRequirements); + + executionSlotAllocator.validateSchedulingRequirements(schedulingRequirements); + } + + @Test + public void testCreateAndRegisterSlotExecutionVertexAssignment() { + final ExecutionVertexID executionVertexId = new ExecutionVertexID(new JobVertexID(), 0); + + final List<ExecutionVertexSchedulingRequirements> schedulingRequirements = + createSchedulingRequirements(executionVertexId); + final List<SlotExecutionVertexAssignment> assignments = + executionSlotAllocator.allocateSlotsFor(schedulingRequirements); + + assertThat(assignments, hasSize(1)); + + final SlotExecutionVertexAssignment assignment = assignments.get(0); + assertThat(assignment.getExecutionVertexId(), is(executionVertexId)); + assertThat(assignment.getLogicalSlotFuture().isDone(), is(false)); + assertThat(executionSlotAllocator.getPendingSlotAssignments().values(), contains(assignment)); + + assignment.getLogicalSlotFuture().cancel(false); + + assertThat(executionSlotAllocator.getPendingSlotAssignments().keySet(), hasSize(0)); + } + + @Test + public void testCompletedExecutionVertexAssignmentWillBeUnregistered() { + final ExecutionVertexID executionVertexId = new ExecutionVertexID(new JobVertexID(), 0); + + final List<ExecutionVertexSchedulingRequirements> schedulingRequirements = + createSchedulingRequirements(executionVertexId); + final List<SlotExecutionVertexAssignment> assignments = + executionSlotAllocator.allocateSlotsFor(schedulingRequirements); + + assignments.get(0).getLogicalSlotFuture().cancel(false); + + assertThat(executionSlotAllocator.getPendingSlotAssignments().keySet(), hasSize(0)); + } + + @Test + public void testComputeAllPriorAllocationIds() { + final List<AllocationID> expectAllocationIds = Arrays.asList(new AllocationID(), new AllocationID()); + final List<ExecutionVertexSchedulingRequirements> testSchedulingRequirements = Arrays.asList( + new ExecutionVertexSchedulingRequirements.Builder(). + withExecutionVertexId(new ExecutionVertexID(new JobVertexID(), 0)). + withPreviousAllocationId(expectAllocationIds.get(0)). + build(), + new ExecutionVertexSchedulingRequirements.Builder(). + withExecutionVertexId(new ExecutionVertexID(new JobVertexID(), 1)). + withPreviousAllocationId(expectAllocationIds.get(0)). + build(), + new ExecutionVertexSchedulingRequirements.Builder(). + withExecutionVertexId(new ExecutionVertexID(new JobVertexID(), 2)). + withPreviousAllocationId(expectAllocationIds.get(1)). + build(), + new ExecutionVertexSchedulingRequirements.Builder(). + withExecutionVertexId(new ExecutionVertexID(new JobVertexID(), 3)). + build() + ); + + final Set<AllocationID> allPriorAllocationIds = + AbstractExecutionSlotAllocator.computeAllPriorAllocationIds(testSchedulingRequirements); + assertThat(allPriorAllocationIds, containsInAnyOrder(expectAllocationIds.toArray())); + } + + private List<ExecutionVertexSchedulingRequirements> createSchedulingRequirements( + final ExecutionVertexID... executionVertexIds) { + + final List<ExecutionVertexSchedulingRequirements> schedulingRequirements = new ArrayList<>(executionVertexIds.length); + + for (ExecutionVertexID executionVertexId : executionVertexIds) { + schedulingRequirements.add(new ExecutionVertexSchedulingRequirements.Builder() + .withExecutionVertexId(executionVertexId).build()); + } + return schedulingRequirements; + } + + private static class TestingExecutionSlotAllocator extends AbstractExecutionSlotAllocator { + + TestingExecutionSlotAllocator() { + super( + new DefaultPreferredLocationsRetriever( + new TestingStateLocationRetriever(), + new TestingInputsLocationsRetriever.Builder().build())); + } + + @Override + public List<SlotExecutionVertexAssignment> allocateSlotsFor( + final List<ExecutionVertexSchedulingRequirements> executionVertexSchedulingRequirements) { + + final List<SlotExecutionVertexAssignment> slotExecutionVertexAssignments = + new ArrayList<>(executionVertexSchedulingRequirements.size()); + + for (ExecutionVertexSchedulingRequirements schedulingRequirements : executionVertexSchedulingRequirements) { + slotExecutionVertexAssignments.add( + createAndRegisterSlotExecutionVertexAssignment( + schedulingRequirements.getExecutionVertexId(), + new CompletableFuture<>(), + throwable -> {})); + } + + return slotExecutionVertexAssignments; + } + } +} diff --git a/flink-runtime/src/test/java/org/apache/flink/runtime/scheduler/DefaultExecutionSlotAllocatorTest.java b/flink-runtime/src/test/java/org/apache/flink/runtime/scheduler/DefaultExecutionSlotAllocatorTest.java index ad74357..0646a32 100644 --- a/flink-runtime/src/test/java/org/apache/flink/runtime/scheduler/DefaultExecutionSlotAllocatorTest.java +++ b/flink-runtime/src/test/java/org/apache/flink/runtime/scheduler/DefaultExecutionSlotAllocatorTest.java @@ -49,18 +49,12 @@ import java.util.Arrays; import java.util.Collection; import java.util.Collections; import java.util.List; -import java.util.Set; -import java.util.concurrent.CancellationException; import java.util.concurrent.CompletableFuture; -import java.util.stream.Collectors; import static org.apache.flink.runtime.scheduler.ExecutionSlotAllocatorTestUtils.createSchedulingRequirements; import static org.apache.flink.runtime.scheduler.ExecutionSlotAllocatorTestUtils.findSlotAssignmentByExecutionVertexId; import static org.hamcrest.Matchers.contains; -import static org.hamcrest.Matchers.containsInAnyOrder; -import static org.hamcrest.Matchers.empty; import static org.hamcrest.Matchers.hasSize; -import static org.hamcrest.Matchers.is; import static org.junit.Assert.assertEquals; import static org.junit.Assert.assertFalse; import static org.junit.Assert.assertThat; @@ -111,7 +105,7 @@ public class DefaultExecutionSlotAllocatorTest extends TestLogger { inputsLocationsRetriever.assignTaskManagerLocation(producerId); assertTrue(consumerSlotAssignment.getLogicalSlotFuture().isDone()); - assertEquals(0, executionSlotAllocator.getNumberOfPendingSlotAssignments()); + assertThat(executionSlotAllocator.getPendingSlotAssignments().keySet(), hasSize(0)); } /** @@ -160,109 +154,47 @@ public class DefaultExecutionSlotAllocatorTest extends TestLogger { assertThat(expectedSlotProfile.getPreferredLocations(), contains(taskManagerLocation)); } - /** - * Tests that cancels an execution vertex which is not existed. - */ @Test - public void testCancelNonExistingExecutionVertex() { - final DefaultExecutionSlotAllocator executionSlotAllocator = createExecutionSlotAllocator(); - - ExecutionVertexID inValidExecutionVertexId = new ExecutionVertexID(new JobVertexID(), 0); - executionSlotAllocator.cancel(inValidExecutionVertexId); - - assertThat(slotProvider.getCancelledSlotRequestIds(), is(empty())); - } - - /** - * Tests that cancels a slot request which has already been fulfilled. - */ - @Test - public void testCancelFulfilledSlotRequest() { - final ExecutionVertexID producerId = new ExecutionVertexID(new JobVertexID(), 0); + public void testDuplicatedSlotAllocationIsNotAllowed() { + final ExecutionVertexID executionVertexId = new ExecutionVertexID(new JobVertexID(), 0); final DefaultExecutionSlotAllocator executionSlotAllocator = createExecutionSlotAllocator(); + slotProvider.disableSlotAllocation(); final List<ExecutionVertexSchedulingRequirements> schedulingRequirements = - createSchedulingRequirements(producerId); + createSchedulingRequirements(executionVertexId); executionSlotAllocator.allocateSlotsFor(schedulingRequirements); - executionSlotAllocator.cancel(producerId); - - assertThat(slotProvider.getCancelledSlotRequestIds(), is(empty())); + try { + executionSlotAllocator.allocateSlotsFor(schedulingRequirements); + fail("exception should happen"); + } catch (IllegalStateException e) { + // IllegalStateException is expected + } } - /** - * Tests that cancels a slot request which has not been fulfilled. - */ @Test - public void testCancelUnFulfilledSlotRequest() throws Exception { - final ExecutionVertexID producerId = new ExecutionVertexID(new JobVertexID(), 0); - + public void testSlotAssignmentIsProperlyRegistered() { final DefaultExecutionSlotAllocator executionSlotAllocator = createExecutionSlotAllocator(); - slotProvider.disableSlotAllocation(); + final ExecutionVertexID executionVertexID = new ExecutionVertexID(new JobVertexID(), 0); final List<ExecutionVertexSchedulingRequirements> schedulingRequirements = - createSchedulingRequirements(producerId); - Collection<SlotExecutionVertexAssignment> assignments = executionSlotAllocator.allocateSlotsFor(schedulingRequirements); - - executionSlotAllocator.cancel(producerId); - - assertThat(slotProvider.getCancelledSlotRequestIds(), hasSize(1)); - assertThat(slotProvider.getCancelledSlotRequestIds(), contains(slotProvider.getReceivedSlotRequestIds().toArray())); - - try { - assignments.iterator().next().getLogicalSlotFuture().get(); - fail("Expect a CancellationException but got nothing."); - } catch (CancellationException ignored) { - // Expected exception - } - } + createSchedulingRequirements(executionVertexID); - /** - * Tests that all prior allocation ids are computed by union all previous allocation ids in scheduling requirements. - */ - @Test - public void testComputeAllPriorAllocationIds() { - List<AllocationID> expectAllocationIds = Arrays.asList(new AllocationID(), new AllocationID()); - List<ExecutionVertexSchedulingRequirements> testSchedulingRequirements = Arrays.asList( - new ExecutionVertexSchedulingRequirements.Builder(). - withExecutionVertexId(new ExecutionVertexID(new JobVertexID(), 0)). - withPreviousAllocationId(expectAllocationIds.get(0)). - build(), - new ExecutionVertexSchedulingRequirements.Builder(). - withExecutionVertexId(new ExecutionVertexID(new JobVertexID(), 1)). - withPreviousAllocationId(expectAllocationIds.get(0)). - build(), - new ExecutionVertexSchedulingRequirements.Builder(). - withExecutionVertexId(new ExecutionVertexID(new JobVertexID(), 2)). - withPreviousAllocationId(expectAllocationIds.get(1)). - build(), - new ExecutionVertexSchedulingRequirements.Builder(). - withExecutionVertexId(new ExecutionVertexID(new JobVertexID(), 3)). - build() - ); + slotProvider.disableSlotAllocation(); + final Collection<SlotExecutionVertexAssignment> slotExecutionVertexAssignments = + executionSlotAllocator.allocateSlotsFor(schedulingRequirements); - Set<AllocationID> allPriorAllocationIds = DefaultExecutionSlotAllocator.computeAllPriorAllocationIds(testSchedulingRequirements); - assertThat(allPriorAllocationIds, containsInAnyOrder(expectAllocationIds.toArray())); - } + final SlotExecutionVertexAssignment slotAssignment = slotExecutionVertexAssignments.iterator().next(); - @Test - public void testDuplicatedSlotAllocationIsNotAllowed() { - final ExecutionVertexID executionVertexId = new ExecutionVertexID(new JobVertexID(), 0); + assertThat(executionSlotAllocator.getPendingSlotAssignments().values(), contains(slotAssignment)); - final DefaultExecutionSlotAllocator executionSlotAllocator = createExecutionSlotAllocator(); - slotProvider.disableSlotAllocation(); + executionSlotAllocator.cancel(executionVertexID); - final List<ExecutionVertexSchedulingRequirements> schedulingRequirements = - createSchedulingRequirements(executionVertexId); - executionSlotAllocator.allocateSlotsFor(schedulingRequirements); + assertThat(executionSlotAllocator.getPendingSlotAssignments().keySet(), hasSize(0)); - try { - executionSlotAllocator.allocateSlotsFor(schedulingRequirements); - fail("exception should happen"); - } catch (IllegalStateException e) { - // IllegalStateException is expected - } + final SlotRequestId slotRequestId = slotProvider.slotAllocationRequests.get(0).f0; + assertThat(slotProvider.getCancelledSlotRequestIds(), contains(slotRequestId)); } private DefaultExecutionSlotAllocator createExecutionSlotAllocator() { @@ -317,18 +249,12 @@ public class DefaultExecutionSlotAllocatorTest extends TestLogger { return Collections.unmodifiableList(slotAllocationRequests); } - public List<SlotRequestId> getReceivedSlotRequestIds() { - return slotAllocationRequests.stream() - .map(requestTuple -> requestTuple.f0) - .collect(Collectors.toList()); - } - - public List<SlotRequestId> getCancelledSlotRequestIds() { - return Collections.unmodifiableList(cancelledSlotRequestIds); - } - public void disableSlotAllocation() { slotAllocationDisabled = true; } + + List<SlotRequestId> getCancelledSlotRequestIds() { + return cancelledSlotRequestIds; + } } }
