tillrohrmann commented on a change in pull request #15812:
URL: https://github.com/apache/flink/pull/15812#discussion_r623767811



##########
File path: 
flink-runtime/src/test/java/org/apache/flink/runtime/executiongraph/ExecutionGraphRestartTest.java
##########
@@ -97,8 +103,10 @@ private void executeOperationForAllExecutions(
         }
     }
 
-    private SlotPoolImpl createSlotPoolImpl() {
-        return new TestingSlotPoolImpl(TEST_JOB_ID);
+    private SlotPool createSlotPoolImpl() {
+        return new 
DeclarativeSlotPoolBridgeServiceFactory(SystemClock.getInstance(), 
DEFAULT_RPC_TIME_OUT, DEFAULT_IDLE_SLOT_TIME_OUT, 
DEFAULT_BATCH_TIME_OUT).createSlotPoolService(TEST_JOB_ID).castInto(SlotPool.class).orElseThrow(()
 ->
+                new IllegalStateException(
+                        "The AdaptiveScheduler requires a 
DeclarativeSlotPool."));

Review comment:
       Can we simply create a `DeclarativeSlotPoolBridge` here?

##########
File path: 
flink-runtime/src/test/java/org/apache/flink/runtime/jobmaster/slotpool/SlotPoolInteractionsTest.java
##########
@@ -140,11 +140,11 @@ public void testSlotAllocationTimeout() throws Exception {
             // wait until we have timed out the slot request
             slotRequestTimeoutFuture.get();
 
-            assertEquals(0L, pool.getNumberOfPendingRequests());
+            assertEquals(0L, pool.getAllocatedSlotsInformation().size());

Review comment:
       Same here.

##########
File path: 
flink-runtime/src/main/java/org/apache/flink/runtime/jobmaster/slotpool/DefaultSlotPoolServiceFactory.java
##########
@@ -38,6 +38,7 @@ public DefaultSlotPoolServiceFactory(
     @Override
     @Nonnull
     public SlotPoolService createSlotPoolService(@Nonnull JobID jobId) {
-        return new SlotPoolImpl(jobId, clock, rpcTimeout, slotIdleTimeout, 
batchSlotTimeout);
+        return new DeclarativeSlotPoolService(

Review comment:
       Yes, I think we should remove this factory as well.

##########
File path: 
flink-runtime/src/main/java/org/apache/flink/runtime/jobmaster/slotpool/DefaultSlotPoolServiceFactory.java
##########
@@ -38,6 +38,7 @@ public DefaultSlotPoolServiceFactory(
     @Override
     @Nonnull
     public SlotPoolService createSlotPoolService(@Nonnull JobID jobId) {
-        return new SlotPoolImpl(jobId, clock, rpcTimeout, slotIdleTimeout, 
batchSlotTimeout);
+        return new DeclarativeSlotPoolService(

Review comment:
       I think we can then also remove 
`ClusterOptions.ENABLE_DECLARATIVE_RESOURCE_MANAGEMENT` and the test profile 
using `flink.tests.disable-declarative` because we effectively remove the 
non-declarative resource management. The test profile should be in 
`jobs-template.yml` under `legacy_slot_management`.
   
   Also, `ResourceManagerGateway.requestSlot` should be removed.

##########
File path: 
flink-runtime/src/test/java/org/apache/flink/runtime/jobmaster/slotpool/SlotPoolInteractionsTest.java
##########
@@ -181,7 +180,7 @@ public void testExtraSlotsAreKept() throws Exception {
             // wait until we have timed out the slot request
             slotRequestTimeoutFuture.get();
 
-            assertEquals(0L, pool.getNumberOfPendingRequests());
+            assertEquals(0L, pool.getAllocatedSlotsInformation().size());

Review comment:
       Same here.

##########
File path: 
flink-runtime/src/test/java/org/apache/flink/runtime/jobmaster/slotpool/SlotPoolUtils.java
##########
@@ -54,12 +55,12 @@ private SlotPoolUtils() {
         throw new UnsupportedOperationException("Cannot instantiate this 
class.");
     }
 
-    static TestingSlotPoolImpl createAndSetUpSlotPool(
+    static SlotPool createAndSetUpSlotPool(
             @Nullable final ResourceManagerGateway resourceManagerGateway) 
throws Exception {
-
-        return new 
SlotPoolBuilder(ComponentMainThreadExecutorServiceAdapter.forMainThread())
-                .setResourceManagerGateway(resourceManagerGateway)
-                .build();
+        return new DefaultSlotPoolServiceFactory(SystemClock.getInstance(), 
TIMEOUT, TIMEOUT, TIMEOUT).createSlotPoolService(new JobID())
+                .castInto(SlotPool.class).orElseThrow(() ->
+                        new IllegalStateException(
+                                "The DefaultScheduler requires a SlotPool."));

Review comment:
       Why can't we longer use the `SlotPoolBuilder` here?

##########
File path: 
flink-runtime/src/test/java/org/apache/flink/runtime/jobmaster/slotpool/SlotPoolInteractionsTest.java
##########
@@ -108,19 +109,18 @@ public void 
testCancelSlotAllocationWithoutResourceManager() throws Exception {
             // wait for the timeout of the pending slot request
             timeoutFuture.get();
 
-            assertEquals(0L, pool.getNumberOfWaitingForResourceRequests());
+            assertEquals(0L, pool.getAllocatedSlotsInformation().size());

Review comment:
       I think this is not the correct translation. We need to check that the 
pending slot request has been removed.

##########
File path: 
flink-runtime/src/test/java/org/apache/flink/runtime/jobmaster/slotpool/SlotPoolInteractionsTest.java
##########
@@ -190,22 +189,21 @@ public void testExtraSlotsAreKept() throws Exception {
 
             testMainThreadExecutor.execute(
                     () -> 
pool.registerTaskManager(taskManagerLocation.getResourceID()));
-
+                    ;
             assertTrue(
-                    testMainThreadExecutor.execute(
-                            () ->
-                                    pool.offerSlot(
-                                            taskManagerLocation, 
taskManagerGateway, slotOffer)));
+                    testMainThreadExecutor.execute(() ->
+                            pool.offerSlots(taskManagerLocation, 
taskManagerGateway,
+                                    Lists.newArrayList(slotOffer)) != null));
 
-            assertTrue(pool.containsAvailableSlot(allocationId));
+            
assertTrue(pool.getAllocatedSlotsInformation().contains(slotOffer));

Review comment:
       I think you should use `pool.getAvailableSlotsInformation` for this 
assertion.

##########
File path: 
flink-runtime/src/test/java/org/apache/flink/runtime/scheduler/benchmark/e2e/SchedulerBenchmarkBase.java
##########
@@ -91,7 +91,9 @@ private static PhysicalSlotProvider 
createPhysicalSlotProvider(
             throws Exception {
         final int slotPoolSize = jobConfiguration.getParallelism() * 
numberOfJobVertices;
 
-        final SlotPoolImpl slotPool = new 
SlotPoolBuilder(mainThreadExecutor).build();
+        final SlotPool slotPool = new 
SlotPoolBuilder(mainThreadExecutor).build().castInto(SlotPool.class).orElseThrow(()
 ->

Review comment:
       Why is the `castInto` needed here? Shouldn't the builder return a 
`SlotPool`?




-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
[email protected]


Reply via email to