zhuzhurk commented on a change in pull request #16485:
URL: https://github.com/apache/flink/pull/16485#discussion_r669283302



##########
File path: 
flink-runtime/src/test/java/org/apache/flink/runtime/executiongraph/ExecutionGraphRestartTest.java
##########
@@ -194,9 +204,27 @@ public void testCancelWhileRestarting() throws Exception {
         }
     }
 
+    private void offerSlots(SlotPool slotPool, int numSlots) {
+        offerSlots(slotPool, new LocalTaskManagerLocation(), numSlots);
+    }
+
+    private void offerSlots(

Review comment:
       This and the above method can be static。
   Or maybe we can also move it into `SlotPoolUtils`.

##########
File path: 
flink-runtime/src/test/java/org/apache/flink/runtime/executiongraph/ExecutionGraphRestartTest.java
##########
@@ -328,18 +361,17 @@ public void testFailingExecutionAfterRestart() throws 
Exception {
 
     /**
      * Tests that a graph is not restarted after cancellation via a call to 
{@link
-     * ExecutionGraph#failGlobal(Throwable)}. This can happen when a slot is 
released concurrently
-     * with cancellation.
+     * ExecutionGraph#failJob(Throwable, long)}. This can happen when a slot 
is released

Review comment:
       I think it should be `ExecutionGraph#failJob(Throwable, long)` -> 
`Execution#fail(Throwable)`

##########
File path: 
flink-runtime/src/test/java/org/apache/flink/runtime/jobmaster/slotpool/SlotPoolBatchSlotRequestTest.java
##########
@@ -141,43 +134,6 @@ public void 
testPendingBatchSlotRequestDoesNotTimeoutIfFulfillingSlotExists() th
         }
     }
 
-    /**
-     * Tests that a batch slot request does not react to {@link
-     * SlotPool#failAllocation(AllocationID, Exception)} signals whose 
exception is not {@link
-     * UnfulfillableSlotRequestException}.
-     */
-    @Test
-    public void testPendingBatchSlotRequestDoesNotFailIfAllocationFails() 
throws Exception {
-        final TestingResourceManagerGateway testingResourceManagerGateway =
-                new TestingResourceManagerGateway();
-        final CompletableFuture<AllocationID> allocationIdFuture = new 
CompletableFuture<>();
-        testingResourceManagerGateway.setRequestSlotConsumer(
-                slotRequest -> 
allocationIdFuture.complete(slotRequest.getAllocationId()));
-
-        final ComponentMainThreadExecutor directMainThreadExecutor =
-                ComponentMainThreadExecutorServiceAdapter.forMainThread();
-
-        final Time batchSlotTimeout = Time.milliseconds(1000L);
-        try (final SlotPoolImpl slotPool =
-                createAndSetUpSlotPool(
-                        directMainThreadExecutor,
-                        testingResourceManagerGateway,
-                        batchSlotTimeout)) {
-
-            final CompletableFuture<PhysicalSlot> slotFuture =
-                    SlotPoolUtils.requestNewAllocatedBatchSlot(
-                            slotPool, directMainThreadExecutor, 
resourceProfile);
-
-            SlotPoolUtils.failAllocation(
-                    slotPool,
-                    directMainThreadExecutor,
-                    allocationIdFuture.get(),
-                    new FlinkException("Failed request"));
-
-            assertThat(slotFuture.isDone(), is(false));
-        }
-    }
-
     /**
      * Tests that a batch slot request does react to {@link 
SlotPool#failAllocation(AllocationID,

Review comment:
       The doc is outdated.

##########
File path: 
flink-runtime/src/main/java/org/apache/flink/runtime/jobmaster/slotpool/DeclarativeSlotPoolBridge.java
##########
@@ -488,6 +488,17 @@ public int getNumPendingRequests() {
         return pendingRequests.size();
     }
 
+    @VisibleForTesting
+    void runBatchSlotTimeoutCheck() {

Review comment:
       How about make `checkBatchSlotTimeout()` package private and move this 
method into `SlotPoolBatchSlotRequestTest.java`? So that we can avoid adding a 
testing only method in production.

##########
File path: 
flink-runtime/src/test/java/org/apache/flink/runtime/scheduler/benchmark/e2e/SchedulerEndToEndBenchmarkBase.java
##########
@@ -74,56 +60,18 @@ public void setup(JobConfiguration jobConfiguration) throws 
Exception {
         final List<JobVertex> jobVertices = 
createDefaultJobVertices(jobConfiguration);
         jobGraph = createJobGraph(jobVertices, jobConfiguration);
 
-        physicalSlotProvider =
-                createPhysicalSlotProvider(
-                        jobConfiguration, jobVertices.size(), 
mainThreadExecutor);
+        physicalSlotProvider = createPhysicalSlotProvider(mainThreadExecutor);
     }
 
     private static PhysicalSlotProvider createPhysicalSlotProvider(
-            JobConfiguration jobConfiguration,
-            int numberOfJobVertices,
-            ComponentMainThreadExecutor mainThreadExecutor)
-            throws Exception {
-        final int slotPoolSize = jobConfiguration.getParallelism() * 
numberOfJobVertices;
+            ComponentMainThreadExecutor mainThreadExecutor) throws Exception {
 
-        final SlotPoolImpl slotPool = new 
SlotPoolBuilder(mainThreadExecutor).build();
-        final TestingTaskExecutorGateway testingTaskExecutorGateway =
-                new 
TestingTaskExecutorGatewayBuilder().createTestingTaskExecutorGateway();
-        offerSlots(
-                slotPool,
-                new RpcTaskManagerGateway(testingTaskExecutorGateway, 
JobMasterId.generate()),
-                slotPoolSize,
-                mainThreadExecutor);
+        final SlotPool slotPool = new 
DeclarativeSlotPoolBridgeBuilder(mainThreadExecutor).build();
 
         return new PhysicalSlotProviderImpl(

Review comment:
       I think we need to keep the `offerSlots()` action.
   The `SchedulingAndDeployingBenchmark` relies on these pre-assigned slots to 
test the performance of task scheduling, slot allocation (from SlotPool) and 
task deployment. 
   

##########
File path: 
flink-runtime/src/test/java/org/apache/flink/runtime/jobmaster/slotpool/SlotPoolBatchSlotRequestTest.java
##########
@@ -224,50 +175,16 @@ public void 
testPendingBatchSlotRequestDoesNotFailIfRMRequestFails() throws Exce
         testingResourceManagerGateway.setRequestSlotFuture(

Review comment:
       This case also seems to be invalid because 
`ResourceManagerGateway#requestSlot(...)` is only used by `SlotPoolImpl`. Or 
maybe we can keep this case to test exceptional 
`ResourceManagerGateway#declareRequiredResources(...)`?




-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: [email protected]

For queries about this service, please contact Infrastructure at:
[email protected]


Reply via email to