tillrohrmann commented on a change in pull request #14465: URL: https://github.com/apache/flink/pull/14465#discussion_r549400907
########## File path: flink-runtime/src/test/java/org/apache/flink/runtime/scheduler/SlotSharingExecutionSlotAllocatorTest.java ########## @@ -413,7 +411,10 @@ private static ResourceProfile fulfilOneOfTwoSlotRequestsAndGetPendingProfile( return requests.get(slotRequestId2).getSlotProfile().getPhysicalSlotResourceProfile(); } - private enum PhysicalSlotFutureCompletionMode { + /** + * Enum for covering different ways of a Future to complete. + */ + public enum PhysicalSlotFutureCompletionMode { SUCCESS, FAILURE, MANUAL, Review comment: Hmm, shouldn't this enum be part of the `TestingPhysicalSlotProvider.java` file? ########## File path: flink-runtime/src/test/java/org/apache/flink/runtime/executiongraph/ExecutionTest.java ########## @@ -215,22 +182,12 @@ public void testCanceledExecutionReturnsSlot() throws Exception { } ); - slotRequestIdFuture.thenAcceptAsync( - (SlotRequestId slotRequestId) -> { - final SingleLogicalSlot singleLogicalSlot = createSingleLogicalSlot( - slotOwner, - taskManagerGateway, - slotRequestId); - slotProvider.complete(slotRequestId, singleLogicalSlot); - }, - testMainThreadUtil.getMainThreadExecutor()); - testMainThreadUtil.execute(scheduler::startScheduling); // cancel the execution in case we could schedule the execution testMainThreadUtil.execute(execution::cancel); - assertThat(returnedSlotFuture.get(), is(equalTo(slotRequestIdFuture.get()))); + assertThat(physicalSlotProvider.getRequests().keySet(), is(physicalSlotProvider.getCancellations().keySet())); Review comment: Much nicer :-) ########## File path: flink-runtime/src/test/java/org/apache/flink/runtime/scheduler/SchedulerTestingUtils.java ########## @@ -136,6 +136,16 @@ public static DefaultSchedulerBuilder newSchedulerBuilderWithSlotSharingExecutio return newSchedulerBuilderWithSlotSharingExecutionSlotAllocator( jobGraph, new TestingPhysicalSlotProvider(SlotSharingExecutionSlotAllocatorTest.PhysicalSlotFutureCompletionMode.SUCCESS), + allocationTimeout); + } + + public static DefaultSchedulerBuilder newSchedulerBuilderWithSlotSharingExecutionSlotAllocator( Review comment: This method could be used in `ExecutionPartitionLifecycleTest.java` instead. ########## File path: flink-runtime/src/test/java/org/apache/flink/runtime/scheduler/SchedulerTestingUtils.java ########## @@ -401,6 +401,58 @@ public static ExecutionAttemptID getAttemptId(DefaultScheduler scheduler, JobVer } } + public static SlotSharingExecutionSlotAllocatorFactoryBuilder slotSharingExecutionSlotAllocatorFactoryBuilder() { Review comment: Maybe name `newSlotSharingExecutionSlotAllocatorFactoryBuilder` ########## File path: flink-runtime/src/test/java/org/apache/flink/runtime/scheduler/TestingPhysicalSlotProvider.java ########## @@ -76,8 +91,10 @@ void completePhysicalSlotFutureFor( AllocationID allocationID) { ResourceProfile resourceProfile = requests.get(slotRequestId).getSlotProfile().getPhysicalSlotResourceProfile(); SharedSlotTestingUtils.TestingPhysicalSlot physicalSlot = new SharedSlotTestingUtils.TestingPhysicalSlot( - resourceProfile, - allocationID); + allocationID, + taskManagerLocation, + taskManagerGateway, + resourceProfile); Review comment: Nit: Maybe it would be simpler to provide a `TestingPhysicalSlotFactory`. Then, the `TestingPhysicalSlotProvider` wouldn't have to know about the `taskManagerLocation` and the `taskManagerGateway`. ########## File path: flink-runtime/src/test/java/org/apache/flink/runtime/scheduler/DefaultSchedulerBatchSchedulingTest.java ########## @@ -160,14 +157,6 @@ private void finishExecution( ).join(); } - @Nonnull - private PhysicalSlotProvider createSlotProvider(SlotPool slotPool, ComponentMainThreadExecutor mainThreadExecutor) { - PhysicalSlotProviderImpl slotProvider = new PhysicalSlotProviderImpl(LocationPreferenceSlotSelectionStrategy.createDefault(), slotPool); - // scheduler.start(mainThreadExecutor); Review comment: Yes it should be fine to remove the start call as it only sets the main thread executor of the old `Scheduler` implementations. ########## File path: flink-runtime/src/test/java/org/apache/flink/runtime/scheduler/DefaultSchedulerBatchSchedulingTest.java ########## @@ -161,11 +161,11 @@ private void finishExecution( } @Nonnull - private SlotProvider createSlotProvider(SlotPool slotPool, ComponentMainThreadExecutor mainThreadExecutor) { - final SchedulerImpl scheduler = new SchedulerImpl(LocationPreferenceSlotSelectionStrategy.createDefault(), slotPool); - scheduler.start(mainThreadExecutor); + private PhysicalSlotProvider createSlotProvider(SlotPool slotPool, ComponentMainThreadExecutor mainThreadExecutor) { + PhysicalSlotProviderImpl slotProvider = new PhysicalSlotProviderImpl(LocationPreferenceSlotSelectionStrategy.createDefault(), slotPool); + // scheduler.start(mainThreadExecutor); Review comment: Can this be removed? ########## File path: flink-runtime/src/test/java/org/apache/flink/runtime/scheduler/DefaultSchedulerBatchSchedulingTest.java ########## @@ -118,7 +118,7 @@ public void testSchedulingOfJobWithFewerSlotsThanParallelism() throws Exception Collections.singletonList(ResourceProfile.ANY), new RpcTaskManagerGateway(testingTaskExecutorGateway, JobMasterId.generate())); - final SlotProvider slotProvider = createSlotProvider(slotPool, mainThreadExecutor); + final PhysicalSlotProvider slotProvider = createSlotProvider(slotPool, mainThreadExecutor); Review comment: Maybe rename `createSlotProvider` into `createPhysicalSlotProvider`. ########## File path: flink-runtime/src/test/java/org/apache/flink/runtime/scheduler/TestingPhysicalSlotProvider.java ########## @@ -85,13 +95,16 @@ private TestingPhysicalSlotProvider(SlotSharingExecutionSlotAllocatorTest.Physic void completePhysicalSlotFutureFor( SlotRequestId slotRequestId, AllocationID allocationID) { - ResourceProfile resourceProfile = requests.get(slotRequestId).getSlotProfile().getPhysicalSlotResourceProfile(); - SharedSlotTestingUtils.TestingPhysicalSlot physicalSlot = new SharedSlotTestingUtils.TestingPhysicalSlot( - allocationID, - taskManagerLocation, - taskManagerGateway, - resourceProfile); - responses.get(slotRequestId).complete(physicalSlot); + synchronized (lock) { Review comment: If concurrency is a problem, shouldn't we also synchronize `allocatePhysicalSlot`? ########## File path: flink-runtime/src/test/java/org/apache/flink/runtime/scheduler/TestingPhysicalSlotProvider.java ########## @@ -132,4 +132,47 @@ PhysicalSlotRequest getFirstRequestOrFail() { Preconditions.checkState(element.isPresent()); return element.get(); } + + public static Builder builder() { + return new Builder(); + } + + /** + * {@code Builder} for creating {@code TestingPhysicalSlotProvider} instances. + */ + public static class Builder { Review comment: Nice :-) ########## File path: flink-runtime/src/test/java/org/apache/flink/runtime/scheduler/TestingPhysicalSlotProvider.java ########## @@ -125,19 +129,23 @@ PhysicalSlotRequest getFirstRequestOrFail() { return getFirstElementOrFail(requests.values()); } - Map<SlotRequestId, PhysicalSlotRequest> getRequests() { + public CountDownLatch getAllocateLatch() { + return allocateLatch; + } Review comment: If it is not important that we are using a latch, we could hide this detail by renaming this method into `waitUntilAllSlotsAreSuccessfullyAllocated` and then calling `allocateLatch.await()` inside of this method. ########## File path: flink-runtime/src/test/java/org/apache/flink/runtime/scheduler/TestingPhysicalSlotProvider.java ########## @@ -104,6 +107,7 @@ void completePhysicalSlotFutureFor( resourceProfile); responses.get(slotRequestId).complete(physicalSlot); availableSlotCount--; + allocateLatch.countDown(); Review comment: What is the exact semantic of the `allocatedLatch`? Triggered if all slot requests have been successfully fulfilled? ########## File path: flink-runtime/src/test/java/org/apache/flink/runtime/executiongraph/ExecutionGraphDeploymentTest.java ########## @@ -578,21 +576,23 @@ public void testExecutionGraphIsDeployedInTopologicalOrder() throws Exception { return CompletableFuture.completedFuture(Acknowledge.get()); }); + final TaskManagerLocation taskManagerLocation = new LocalTaskManagerLocation(); final TestingTaskExecutorGateway taskExecutorGateway = testingTaskExecutorGatewayBuilder.createTestingTaskExecutorGateway(); final RpcTaskManagerGateway taskManagerGateway = new RpcTaskManagerGateway(taskExecutorGateway, JobMasterId.generate()); - final Collection<CompletableFuture<LogicalSlot>> slotFutures = new ArrayList<>(numberTasks); - for (int i = 0; i < numberTasks; i++) { - slotFutures.add(new CompletableFuture<>()); - } - - final SlotProvider slotProvider = new IteratorTestingSlotProvider(slotFutures.iterator()); + final Collection<CompletableFuture<TestingPhysicalSlot>> slotFutures = new ArrayList<>(numberTasks); final JobGraph jobGraph = new JobGraph(jobId, "Test Job", sourceVertex, sinkVertex); jobGraph.setScheduleMode(ScheduleMode.EAGER); - final SchedulerBase scheduler = SchedulerTestingUtils - .newSchedulerBuilderWithDefaultSlotAllocator(jobGraph, slotProvider) + final SchedulerBase scheduler = SchedulerTestingUtils.newSchedulerBuilder(jobGraph) + .setExecutionSlotAllocatorFactory(SchedulerTestingUtils.slotSharingExecutionSlotAllocatorFactoryBuilder() + .withSlotProvider(TestingPhysicalSlotProvider.builder() + .withPhysicalSlotCreator((f, a, tml, tmg, r) -> slotFutures.add(f)) Review comment: Ah ok, now I see why you have introduced the consumer. I am not 100% sold that this is the best way to achieve it. Maybe we can collect all the `SlotRequestIds` from the `TestingPhysicalSlotProvider` and then call `TestingPhyiscalSlotProvider.completePhysicalSlotFutureFor` in a random order? ########## File path: flink-runtime/src/test/java/org/apache/flink/runtime/scheduler/TestingPhysicalSlotProvider.java ########## @@ -56,8 +57,11 @@ private final CountDownLatch allocateLatch; private volatile int availableSlotCount; - private TestingPhysicalSlotProvider(SlotSharingExecutionSlotAllocatorTest.PhysicalSlotFutureCompletionMode physicalSlotFutureCompletion, TaskManagerLocation taskManagerLocation, TaskManagerGateway taskManagerGateway, int availableSlotCount) { + private final QuinConsumer<CompletableFuture<TestingPhysicalSlot>, AllocationID, TaskManagerLocation, TaskManagerGateway, ResourceProfile> physicalSlotCreator; Review comment: Instead of completing the future inside of the consumer, we could also introduce a `TestingPhysicalSlot` supplier which creates the `TestingPhyiscalSlot` and let the `TestingPhyiscalSlotProvider` decide where to register it. ########## File path: flink-runtime/src/test/java/org/apache/flink/runtime/scheduler/TestingPhysicalSlotProvider.java ########## @@ -196,9 +201,15 @@ public Builder withAvailableSlotCount(int availableSlotCount) { return this; } + public Builder withPhysicalSlotCreator(QuinConsumer<CompletableFuture<TestingPhysicalSlot>, AllocationID, TaskManagerLocation, TaskManagerGateway, ResourceProfile> physicalSlotCreator) { Review comment: I would suggest to not use side effects to transfer the created `TestingPhysicalSlot` out of the `physicalSlotCreator` and instead have something like `BiFunction<AllocationID, ResourceProfile, TestingPhysicalSlot>`, potentially even a `BiFunctionWithException` if we also want to let the creation method to fail. ########## File path: flink-runtime/src/test/java/org/apache/flink/runtime/executiongraph/ExecutionGraphPartitionReleaseTest.java ########## @@ -179,10 +176,10 @@ private SchedulerBase createScheduler( final JobVertex... vertices) throws Exception { final JobGraph jobGraph = new JobGraph(new JobID(), "test job", vertices); - final SlotProvider slotProvider = new TestingSlotProvider( - ignored -> CompletableFuture.completedFuture(new TestingLogicalSlotBuilder().createTestingLogicalSlot())); - final SchedulerBase scheduler = SchedulerTestingUtils - .newSchedulerBuilderWithDefaultSlotAllocator(jobGraph, slotProvider, Time.seconds(10)) + final SchedulerBase scheduler = SchedulerTestingUtils.newSchedulerBuilder(jobGraph) + .setExecutionSlotAllocatorFactory(SchedulerTestingUtils.slotSharingExecutionSlotAllocatorFactoryBuilder() Review comment: Why is the allocation timeout of 10s important? The test also passes with `withAllocationTimeout` removed. ########## File path: flink-runtime/src/test/java/org/apache/flink/runtime/scheduler/TestingLocalInputPreferredSlotSharingStrategyFactory.java ########## @@ -0,0 +1,48 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.flink.runtime.scheduler; + +import org.apache.flink.runtime.jobmanager.scheduler.CoLocationGroupDesc; +import org.apache.flink.runtime.jobmanager.scheduler.SlotSharingGroup; +import org.apache.flink.runtime.scheduler.strategy.SchedulingTopology; + +import java.util.Set; + +/** + * {@link LocalInputPreferredSlotSharingStrategy.Factory} extension that returns the most recently + * created {@link LocalInputPreferredSlotSharingStrategy} instance for testing purposes. + */ +public class TestingLocalInputPreferredSlotSharingStrategyFactory extends LocalInputPreferredSlotSharingStrategy.Factory { Review comment: I think this is not needed if we test the `TaskManagerLocations` instead of the `ExecutionSlotSharingGroupIDs`. ########## File path: flink-runtime/src/test/java/org/apache/flink/runtime/executiongraph/ExecutionGraphCoLocationRestartTest.java ########## @@ -130,22 +132,23 @@ public void testConstraintsAfterRestart() throws Exception { timeout); //checking execution vertex properties - validateConstraints(eg); + validateConstraints(eg, slotSharingStrategyFactory.getMostRecentStrategyInstance()); ExecutionGraphTestUtils.finishAllVertices(eg); assertThat(eg.getState(), is(FINISHED)); } - private void validateConstraints(ExecutionGraph eg) { + private void validateConstraints(ExecutionGraph eg, SlotSharingStrategy slotSharingStrategy) { ExecutionJobVertex[] tasks = eg.getAllVertices().values().toArray(new ExecutionJobVertex[2]); for (int i = 0; i < NUM_TASKS; i++) { - CoLocationConstraint constr1 = tasks[0].getTaskVertices()[i].getLocationConstraint(); - CoLocationConstraint constr2 = tasks[1].getTaskVertices()[i].getLocationConstraint(); - assertThat(constr1.isAssigned(), is(true)); - assertThat(constr1.getLocation(), equalTo(constr2.getLocation())); + ExecutionVertexID executionVertexID0 = tasks[0].getTaskVertices()[i].getID(); + ExecutionVertexID executionVertexID1 = tasks[1].getTaskVertices()[i].getID(); + + assertThat(slotSharingStrategy.getExecutionSlotSharingGroup(executionVertexID0).getExecutionVertexIds(), hasItem(executionVertexID1)); + assertThat(slotSharingStrategy.getExecutionSlotSharingGroup(executionVertexID1).getExecutionVertexIds(), hasItem(executionVertexID0)); Review comment: Instead of relying on internal details, I think it would be good enough to assert the following: ``` final TaskManagerLocation location0 = tasks[0].getTaskVertices()[i] .getCurrentAssignedResourceLocation(); final TaskManagerLocation location1 = tasks[1].getTaskVertices()[i] .getCurrentAssignedResourceLocation(); assertThat(location0, is(equalTo(location1))); ``` Moreover, we should set the test up in a way that every `PhysicalSlot` has a different `TaskManagerLocation`. That way we ensure that the colocation constraints are honored. ---------------------------------------------------------------- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org