tillrohrmann commented on a change in pull request #14465:
URL: https://github.com/apache/flink/pull/14465#discussion_r549400907



##########
File path: 
flink-runtime/src/test/java/org/apache/flink/runtime/scheduler/SlotSharingExecutionSlotAllocatorTest.java
##########
@@ -413,7 +411,10 @@ private static ResourceProfile 
fulfilOneOfTwoSlotRequestsAndGetPendingProfile(
                return 
requests.get(slotRequestId2).getSlotProfile().getPhysicalSlotResourceProfile();
        }
 
-       private enum PhysicalSlotFutureCompletionMode {
+       /**
+        * Enum for covering different ways of a Future to complete.
+        */
+       public enum PhysicalSlotFutureCompletionMode {
                SUCCESS,
                FAILURE,
                MANUAL,

Review comment:
       Hmm, shouldn't this enum be part of the 
`TestingPhysicalSlotProvider.java` file?

##########
File path: 
flink-runtime/src/test/java/org/apache/flink/runtime/executiongraph/ExecutionTest.java
##########
@@ -215,22 +182,12 @@ public void testCanceledExecutionReturnsSlot() throws 
Exception {
                        }
                );
 
-               slotRequestIdFuture.thenAcceptAsync(
-                       (SlotRequestId slotRequestId) -> {
-                               final SingleLogicalSlot singleLogicalSlot = 
createSingleLogicalSlot(
-                                       slotOwner,
-                                       taskManagerGateway,
-                                       slotRequestId);
-                               slotProvider.complete(slotRequestId, 
singleLogicalSlot);
-                       },
-                       testMainThreadUtil.getMainThreadExecutor());
-
                testMainThreadUtil.execute(scheduler::startScheduling);
 
                // cancel the execution in case we could schedule the execution
                testMainThreadUtil.execute(execution::cancel);
 
-               assertThat(returnedSlotFuture.get(), 
is(equalTo(slotRequestIdFuture.get())));
+               assertThat(physicalSlotProvider.getRequests().keySet(), 
is(physicalSlotProvider.getCancellations().keySet()));

Review comment:
       Much nicer :-)

##########
File path: 
flink-runtime/src/test/java/org/apache/flink/runtime/scheduler/SchedulerTestingUtils.java
##########
@@ -136,6 +136,16 @@ public static DefaultSchedulerBuilder 
newSchedulerBuilderWithSlotSharingExecutio
                return newSchedulerBuilderWithSlotSharingExecutionSlotAllocator(
                        jobGraph,
                        new 
TestingPhysicalSlotProvider(SlotSharingExecutionSlotAllocatorTest.PhysicalSlotFutureCompletionMode.SUCCESS),
+                       allocationTimeout);
+       }
+
+       public static DefaultSchedulerBuilder 
newSchedulerBuilderWithSlotSharingExecutionSlotAllocator(

Review comment:
       This method could be used in `ExecutionPartitionLifecycleTest.java` 
instead.

##########
File path: 
flink-runtime/src/test/java/org/apache/flink/runtime/scheduler/SchedulerTestingUtils.java
##########
@@ -401,6 +401,58 @@ public static ExecutionAttemptID 
getAttemptId(DefaultScheduler scheduler, JobVer
                }
        }
 
+       public static SlotSharingExecutionSlotAllocatorFactoryBuilder 
slotSharingExecutionSlotAllocatorFactoryBuilder() {

Review comment:
       Maybe name `newSlotSharingExecutionSlotAllocatorFactoryBuilder`

##########
File path: 
flink-runtime/src/test/java/org/apache/flink/runtime/scheduler/TestingPhysicalSlotProvider.java
##########
@@ -76,8 +91,10 @@ void completePhysicalSlotFutureFor(
                AllocationID allocationID) {
                ResourceProfile resourceProfile = 
requests.get(slotRequestId).getSlotProfile().getPhysicalSlotResourceProfile();
                SharedSlotTestingUtils.TestingPhysicalSlot physicalSlot = new 
SharedSlotTestingUtils.TestingPhysicalSlot(
-                       resourceProfile,
-                       allocationID);
+                       allocationID,
+                       taskManagerLocation,
+                       taskManagerGateway,
+                       resourceProfile);

Review comment:
       Nit: Maybe it would be simpler to provide a 
`TestingPhysicalSlotFactory`. Then, the `TestingPhysicalSlotProvider` wouldn't 
have to know about the `taskManagerLocation` and the `taskManagerGateway`.

##########
File path: 
flink-runtime/src/test/java/org/apache/flink/runtime/scheduler/DefaultSchedulerBatchSchedulingTest.java
##########
@@ -160,14 +157,6 @@ private void finishExecution(
                ).join();
        }
 
-       @Nonnull
-       private PhysicalSlotProvider createSlotProvider(SlotPool slotPool, 
ComponentMainThreadExecutor mainThreadExecutor) {
-               PhysicalSlotProviderImpl slotProvider = new 
PhysicalSlotProviderImpl(LocationPreferenceSlotSelectionStrategy.createDefault(),
 slotPool);
-               // scheduler.start(mainThreadExecutor);

Review comment:
       Yes it should be fine to remove the start call as it only sets the main 
thread executor of the old `Scheduler` implementations.

##########
File path: 
flink-runtime/src/test/java/org/apache/flink/runtime/scheduler/DefaultSchedulerBatchSchedulingTest.java
##########
@@ -161,11 +161,11 @@ private void finishExecution(
        }
 
        @Nonnull
-       private SlotProvider createSlotProvider(SlotPool slotPool, 
ComponentMainThreadExecutor mainThreadExecutor) {
-               final SchedulerImpl scheduler = new 
SchedulerImpl(LocationPreferenceSlotSelectionStrategy.createDefault(), 
slotPool);
-               scheduler.start(mainThreadExecutor);
+       private PhysicalSlotProvider createSlotProvider(SlotPool slotPool, 
ComponentMainThreadExecutor mainThreadExecutor) {
+               PhysicalSlotProviderImpl slotProvider = new 
PhysicalSlotProviderImpl(LocationPreferenceSlotSelectionStrategy.createDefault(),
 slotPool);
+               // scheduler.start(mainThreadExecutor);

Review comment:
       Can this be removed?

##########
File path: 
flink-runtime/src/test/java/org/apache/flink/runtime/scheduler/DefaultSchedulerBatchSchedulingTest.java
##########
@@ -118,7 +118,7 @@ public void 
testSchedulingOfJobWithFewerSlotsThanParallelism() throws Exception
                                Collections.singletonList(ResourceProfile.ANY),
                                new 
RpcTaskManagerGateway(testingTaskExecutorGateway, JobMasterId.generate()));
 
-                       final SlotProvider slotProvider = 
createSlotProvider(slotPool, mainThreadExecutor);
+                       final PhysicalSlotProvider slotProvider = 
createSlotProvider(slotPool, mainThreadExecutor);

Review comment:
       Maybe rename `createSlotProvider` into `createPhysicalSlotProvider`.

##########
File path: 
flink-runtime/src/test/java/org/apache/flink/runtime/scheduler/TestingPhysicalSlotProvider.java
##########
@@ -85,13 +95,16 @@ private 
TestingPhysicalSlotProvider(SlotSharingExecutionSlotAllocatorTest.Physic
        void completePhysicalSlotFutureFor(
                SlotRequestId slotRequestId,
                AllocationID allocationID) {
-               ResourceProfile resourceProfile = 
requests.get(slotRequestId).getSlotProfile().getPhysicalSlotResourceProfile();
-               SharedSlotTestingUtils.TestingPhysicalSlot physicalSlot = new 
SharedSlotTestingUtils.TestingPhysicalSlot(
-                       allocationID,
-                       taskManagerLocation,
-                       taskManagerGateway,
-                       resourceProfile);
-               responses.get(slotRequestId).complete(physicalSlot);
+               synchronized (lock) {

Review comment:
       If concurrency is a problem, shouldn't we also synchronize 
`allocatePhysicalSlot`?

##########
File path: 
flink-runtime/src/test/java/org/apache/flink/runtime/scheduler/TestingPhysicalSlotProvider.java
##########
@@ -132,4 +132,47 @@ PhysicalSlotRequest getFirstRequestOrFail() {
                Preconditions.checkState(element.isPresent());
                return element.get();
        }
+
+       public static Builder builder() {
+               return new Builder();
+       }
+
+       /**
+        * {@code Builder} for creating {@code TestingPhysicalSlotProvider} 
instances.
+        */
+       public static class Builder {

Review comment:
       Nice :-)

##########
File path: 
flink-runtime/src/test/java/org/apache/flink/runtime/scheduler/TestingPhysicalSlotProvider.java
##########
@@ -125,19 +129,23 @@ PhysicalSlotRequest getFirstRequestOrFail() {
                return getFirstElementOrFail(requests.values());
        }
 
-       Map<SlotRequestId, PhysicalSlotRequest> getRequests() {
+       public CountDownLatch getAllocateLatch() {
+               return allocateLatch;
+       }

Review comment:
       If it is not important that we are using a latch, we could hide this 
detail by renaming this method into `waitUntilAllSlotsAreSuccessfullyAllocated` 
and then calling `allocateLatch.await()` inside of this method.

##########
File path: 
flink-runtime/src/test/java/org/apache/flink/runtime/scheduler/TestingPhysicalSlotProvider.java
##########
@@ -104,6 +107,7 @@ void completePhysicalSlotFutureFor(
                                resourceProfile);
                        responses.get(slotRequestId).complete(physicalSlot);
                        availableSlotCount--;
+                       allocateLatch.countDown();

Review comment:
       What is the exact semantic of the `allocatedLatch`? Triggered if all 
slot requests have been successfully fulfilled?

##########
File path: 
flink-runtime/src/test/java/org/apache/flink/runtime/executiongraph/ExecutionGraphDeploymentTest.java
##########
@@ -578,21 +576,23 @@ public void 
testExecutionGraphIsDeployedInTopologicalOrder() throws Exception {
                        return 
CompletableFuture.completedFuture(Acknowledge.get());
                });
 
+               final TaskManagerLocation taskManagerLocation = new 
LocalTaskManagerLocation();
                final TestingTaskExecutorGateway taskExecutorGateway = 
testingTaskExecutorGatewayBuilder.createTestingTaskExecutorGateway();
                final RpcTaskManagerGateway taskManagerGateway = new 
RpcTaskManagerGateway(taskExecutorGateway, JobMasterId.generate());
 
-               final Collection<CompletableFuture<LogicalSlot>> slotFutures = 
new ArrayList<>(numberTasks);
-               for (int i = 0; i < numberTasks; i++) {
-                       slotFutures.add(new CompletableFuture<>());
-               }
-
-               final SlotProvider slotProvider = new 
IteratorTestingSlotProvider(slotFutures.iterator());
+               final Collection<CompletableFuture<TestingPhysicalSlot>> 
slotFutures = new ArrayList<>(numberTasks);
 
                final JobGraph jobGraph = new JobGraph(jobId, "Test Job", 
sourceVertex, sinkVertex);
                jobGraph.setScheduleMode(ScheduleMode.EAGER);
 
-               final SchedulerBase scheduler = SchedulerTestingUtils
-                       .newSchedulerBuilderWithDefaultSlotAllocator(jobGraph, 
slotProvider)
+               final SchedulerBase scheduler = 
SchedulerTestingUtils.newSchedulerBuilder(jobGraph)
+                       
.setExecutionSlotAllocatorFactory(SchedulerTestingUtils.slotSharingExecutionSlotAllocatorFactoryBuilder()
+                               
.withSlotProvider(TestingPhysicalSlotProvider.builder()
+                                       .withPhysicalSlotCreator((f, a, tml, 
tmg, r) -> slotFutures.add(f))

Review comment:
       Ah ok, now I see why you have introduced the consumer. I am not 100% 
sold that this is the best way to achieve it. Maybe we can collect all the 
`SlotRequestIds` from the `TestingPhysicalSlotProvider` and then call 
`TestingPhyiscalSlotProvider.completePhysicalSlotFutureFor` in a random order?

##########
File path: 
flink-runtime/src/test/java/org/apache/flink/runtime/scheduler/TestingPhysicalSlotProvider.java
##########
@@ -56,8 +57,11 @@
        private final CountDownLatch allocateLatch;
        private volatile int availableSlotCount;
 
-       private 
TestingPhysicalSlotProvider(SlotSharingExecutionSlotAllocatorTest.PhysicalSlotFutureCompletionMode
 physicalSlotFutureCompletion, TaskManagerLocation taskManagerLocation, 
TaskManagerGateway taskManagerGateway, int availableSlotCount) {
+       private final QuinConsumer<CompletableFuture<TestingPhysicalSlot>, 
AllocationID, TaskManagerLocation, TaskManagerGateway, ResourceProfile> 
physicalSlotCreator;

Review comment:
       Instead of completing the future inside of the consumer, we could also 
introduce a `TestingPhysicalSlot` supplier which creates the 
`TestingPhyiscalSlot` and let the `TestingPhyiscalSlotProvider` decide where to 
register it.

##########
File path: 
flink-runtime/src/test/java/org/apache/flink/runtime/scheduler/TestingPhysicalSlotProvider.java
##########
@@ -196,9 +201,15 @@ public Builder withAvailableSlotCount(int 
availableSlotCount) {
                        return this;
                }
 
+               public Builder 
withPhysicalSlotCreator(QuinConsumer<CompletableFuture<TestingPhysicalSlot>, 
AllocationID, TaskManagerLocation, TaskManagerGateway, ResourceProfile> 
physicalSlotCreator) {

Review comment:
       I would suggest to not use side effects to transfer the created 
`TestingPhysicalSlot` out of the `physicalSlotCreator` and instead have 
something like `BiFunction<AllocationID, ResourceProfile, 
TestingPhysicalSlot>`, potentially even a `BiFunctionWithException` if we also 
want to let the creation method to fail.

##########
File path: 
flink-runtime/src/test/java/org/apache/flink/runtime/executiongraph/ExecutionGraphPartitionReleaseTest.java
##########
@@ -179,10 +176,10 @@ private SchedulerBase createScheduler(
                        final JobVertex... vertices) throws Exception {
 
                final JobGraph jobGraph = new JobGraph(new JobID(), "test job", 
vertices);
-               final SlotProvider slotProvider = new TestingSlotProvider(
-                       ignored -> CompletableFuture.completedFuture(new 
TestingLogicalSlotBuilder().createTestingLogicalSlot()));
-               final SchedulerBase scheduler = SchedulerTestingUtils
-                       .newSchedulerBuilderWithDefaultSlotAllocator(jobGraph, 
slotProvider, Time.seconds(10))
+               final SchedulerBase scheduler = 
SchedulerTestingUtils.newSchedulerBuilder(jobGraph)
+                       
.setExecutionSlotAllocatorFactory(SchedulerTestingUtils.slotSharingExecutionSlotAllocatorFactoryBuilder()

Review comment:
       Why is the allocation timeout of 10s important? The test also passes 
with `withAllocationTimeout` removed.

##########
File path: 
flink-runtime/src/test/java/org/apache/flink/runtime/scheduler/TestingLocalInputPreferredSlotSharingStrategyFactory.java
##########
@@ -0,0 +1,48 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.runtime.scheduler;
+
+import org.apache.flink.runtime.jobmanager.scheduler.CoLocationGroupDesc;
+import org.apache.flink.runtime.jobmanager.scheduler.SlotSharingGroup;
+import org.apache.flink.runtime.scheduler.strategy.SchedulingTopology;
+
+import java.util.Set;
+
+/**
+ * {@link LocalInputPreferredSlotSharingStrategy.Factory} extension that 
returns the most recently
+ * created {@link LocalInputPreferredSlotSharingStrategy} instance for testing 
purposes.
+ */
+public class TestingLocalInputPreferredSlotSharingStrategyFactory extends 
LocalInputPreferredSlotSharingStrategy.Factory {

Review comment:
       I think this is not needed if we test the `TaskManagerLocations` instead 
of the `ExecutionSlotSharingGroupIDs`.

##########
File path: 
flink-runtime/src/test/java/org/apache/flink/runtime/executiongraph/ExecutionGraphCoLocationRestartTest.java
##########
@@ -130,22 +132,23 @@ public void testConstraintsAfterRestart() throws 
Exception {
                        timeout);
 
                //checking execution vertex properties
-               validateConstraints(eg);
+               validateConstraints(eg, 
slotSharingStrategyFactory.getMostRecentStrategyInstance());
 
                ExecutionGraphTestUtils.finishAllVertices(eg);
 
                assertThat(eg.getState(), is(FINISHED));
        }
 
-       private void validateConstraints(ExecutionGraph eg) {
+       private void validateConstraints(ExecutionGraph eg, SlotSharingStrategy 
slotSharingStrategy) {
 
                ExecutionJobVertex[] tasks = 
eg.getAllVertices().values().toArray(new ExecutionJobVertex[2]);
 
                for (int i = 0; i < NUM_TASKS; i++) {
-                       CoLocationConstraint constr1 = 
tasks[0].getTaskVertices()[i].getLocationConstraint();
-                       CoLocationConstraint constr2 = 
tasks[1].getTaskVertices()[i].getLocationConstraint();
-                       assertThat(constr1.isAssigned(), is(true));
-                       assertThat(constr1.getLocation(), 
equalTo(constr2.getLocation()));
+                       ExecutionVertexID executionVertexID0 = 
tasks[0].getTaskVertices()[i].getID();
+                       ExecutionVertexID executionVertexID1 = 
tasks[1].getTaskVertices()[i].getID();
+
+                       
assertThat(slotSharingStrategy.getExecutionSlotSharingGroup(executionVertexID0).getExecutionVertexIds(),
 hasItem(executionVertexID1));
+                       
assertThat(slotSharingStrategy.getExecutionSlotSharingGroup(executionVertexID1).getExecutionVertexIds(),
 hasItem(executionVertexID0));

Review comment:
       Instead of relying on internal details, I think it would be good enough 
to assert the following:
   
   ```
   final TaskManagerLocation location0 = tasks[0].getTaskVertices()[i]
                                .getCurrentAssignedResourceLocation();
                        final TaskManagerLocation location1 = 
tasks[1].getTaskVertices()[i]
                                .getCurrentAssignedResourceLocation();
   
                        assertThat(location0, is(equalTo(location1)));
   ```
   
   Moreover, we should set the test up in a way that every `PhysicalSlot` has a 
different `TaskManagerLocation`. That way we ensure that the colocation 
constraints are honored.




----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


Reply via email to