zhuzhurk commented on a change in pull request #16485:
URL: https://github.com/apache/flink/pull/16485#discussion_r669283302
##########
File path:
flink-runtime/src/test/java/org/apache/flink/runtime/executiongraph/ExecutionGraphRestartTest.java
##########
@@ -194,9 +204,27 @@ public void testCancelWhileRestarting() throws Exception {
}
}
+ private void offerSlots(SlotPool slotPool, int numSlots) {
+ offerSlots(slotPool, new LocalTaskManagerLocation(), numSlots);
+ }
+
+ private void offerSlots(
Review comment:
This and the above method can be static。
Or maybe we can also move it into `SlotPoolUtils`.
##########
File path:
flink-runtime/src/test/java/org/apache/flink/runtime/executiongraph/ExecutionGraphRestartTest.java
##########
@@ -328,18 +361,17 @@ public void testFailingExecutionAfterRestart() throws
Exception {
/**
* Tests that a graph is not restarted after cancellation via a call to
{@link
- * ExecutionGraph#failGlobal(Throwable)}. This can happen when a slot is
released concurrently
- * with cancellation.
+ * ExecutionGraph#failJob(Throwable, long)}. This can happen when a slot
is released
Review comment:
I think it should be `ExecutionGraph#failJob(Throwable, long)` ->
`Execution#fail(Throwable)`
##########
File path:
flink-runtime/src/test/java/org/apache/flink/runtime/jobmaster/slotpool/SlotPoolBatchSlotRequestTest.java
##########
@@ -141,43 +134,6 @@ public void
testPendingBatchSlotRequestDoesNotTimeoutIfFulfillingSlotExists() th
}
}
- /**
- * Tests that a batch slot request does not react to {@link
- * SlotPool#failAllocation(AllocationID, Exception)} signals whose
exception is not {@link
- * UnfulfillableSlotRequestException}.
- */
- @Test
- public void testPendingBatchSlotRequestDoesNotFailIfAllocationFails()
throws Exception {
- final TestingResourceManagerGateway testingResourceManagerGateway =
- new TestingResourceManagerGateway();
- final CompletableFuture<AllocationID> allocationIdFuture = new
CompletableFuture<>();
- testingResourceManagerGateway.setRequestSlotConsumer(
- slotRequest ->
allocationIdFuture.complete(slotRequest.getAllocationId()));
-
- final ComponentMainThreadExecutor directMainThreadExecutor =
- ComponentMainThreadExecutorServiceAdapter.forMainThread();
-
- final Time batchSlotTimeout = Time.milliseconds(1000L);
- try (final SlotPoolImpl slotPool =
- createAndSetUpSlotPool(
- directMainThreadExecutor,
- testingResourceManagerGateway,
- batchSlotTimeout)) {
-
- final CompletableFuture<PhysicalSlot> slotFuture =
- SlotPoolUtils.requestNewAllocatedBatchSlot(
- slotPool, directMainThreadExecutor,
resourceProfile);
-
- SlotPoolUtils.failAllocation(
- slotPool,
- directMainThreadExecutor,
- allocationIdFuture.get(),
- new FlinkException("Failed request"));
-
- assertThat(slotFuture.isDone(), is(false));
- }
- }
-
/**
* Tests that a batch slot request does react to {@link
SlotPool#failAllocation(AllocationID,
Review comment:
The doc is outdated.
##########
File path:
flink-runtime/src/main/java/org/apache/flink/runtime/jobmaster/slotpool/DeclarativeSlotPoolBridge.java
##########
@@ -488,6 +488,17 @@ public int getNumPendingRequests() {
return pendingRequests.size();
}
+ @VisibleForTesting
+ void runBatchSlotTimeoutCheck() {
Review comment:
How about make `checkBatchSlotTimeout()` package private and move this
method into `SlotPoolBatchSlotRequestTest.java`? So that we can avoid adding a
testing only method in production.
##########
File path:
flink-runtime/src/test/java/org/apache/flink/runtime/scheduler/benchmark/e2e/SchedulerEndToEndBenchmarkBase.java
##########
@@ -74,56 +60,18 @@ public void setup(JobConfiguration jobConfiguration) throws
Exception {
final List<JobVertex> jobVertices =
createDefaultJobVertices(jobConfiguration);
jobGraph = createJobGraph(jobVertices, jobConfiguration);
- physicalSlotProvider =
- createPhysicalSlotProvider(
- jobConfiguration, jobVertices.size(),
mainThreadExecutor);
+ physicalSlotProvider = createPhysicalSlotProvider(mainThreadExecutor);
}
private static PhysicalSlotProvider createPhysicalSlotProvider(
- JobConfiguration jobConfiguration,
- int numberOfJobVertices,
- ComponentMainThreadExecutor mainThreadExecutor)
- throws Exception {
- final int slotPoolSize = jobConfiguration.getParallelism() *
numberOfJobVertices;
+ ComponentMainThreadExecutor mainThreadExecutor) throws Exception {
- final SlotPoolImpl slotPool = new
SlotPoolBuilder(mainThreadExecutor).build();
- final TestingTaskExecutorGateway testingTaskExecutorGateway =
- new
TestingTaskExecutorGatewayBuilder().createTestingTaskExecutorGateway();
- offerSlots(
- slotPool,
- new RpcTaskManagerGateway(testingTaskExecutorGateway,
JobMasterId.generate()),
- slotPoolSize,
- mainThreadExecutor);
+ final SlotPool slotPool = new
DeclarativeSlotPoolBridgeBuilder(mainThreadExecutor).build();
return new PhysicalSlotProviderImpl(
Review comment:
I think we need to keep the `offerSlots()` action.
The `SchedulingAndDeployingBenchmark` relies on these pre-assigned slots to
test the performance of task scheduling, slot allocation (from SlotPool) and
task deployment.
##########
File path:
flink-runtime/src/test/java/org/apache/flink/runtime/jobmaster/slotpool/SlotPoolBatchSlotRequestTest.java
##########
@@ -224,50 +175,16 @@ public void
testPendingBatchSlotRequestDoesNotFailIfRMRequestFails() throws Exce
testingResourceManagerGateway.setRequestSlotFuture(
Review comment:
This case also seems to be invalid because
`ResourceManagerGateway#requestSlot(...)` is only used by `SlotPoolImpl`. Or
maybe we can keep this case to test exceptional
`ResourceManagerGateway#declareRequiredResources(...)`?
--
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
To unsubscribe, e-mail: [email protected]
For queries about this service, please contact Infrastructure at:
[email protected]