XComp commented on code in PR #19351:
URL: https://github.com/apache/flink/pull/19351#discussion_r844824972
##########
flink-runtime/src/test/java/org/apache/flink/runtime/jobmaster/JobMasterTest.java:
##########
@@ -416,22 +410,18 @@ public void
testAllocatedSlotReportDoesNotContainStaleInformation() throws Excep
rpcService.registerGateway(taskExecutorGateway.getAddress(),
taskExecutorGateway);
- final JobManagerSharedServices jobManagerSharedServices =
Review Comment:
That's a weird one. It was never actually used even when the test was
added... 🤔
##########
flink-runtime/src/test/java/org/apache/flink/runtime/jobmaster/JobMasterSchedulerTest.java:
##########
@@ -65,29 +65,37 @@ public void testIfStartSchedulingFailsJobMasterFails()
throws Exception {
final SchedulerNGFactory schedulerFactory = new
FailingSchedulerFactory();
final JobMasterBuilder.TestingOnCompletionActions onCompletionActions =
new JobMasterBuilder.TestingOnCompletionActions();
- final JobMaster jobMaster =
- new JobMasterBuilder(
- JobGraphTestUtils.emptyJobGraph(),
-
TESTING_RPC_SERVICE_RESOURCE.getTestingRpcService())
- .withSlotPoolServiceSchedulerFactory(
- DefaultSlotPoolServiceSchedulerFactory.create(
-
TestingSlotPoolServiceBuilder.newBuilder(),
- schedulerFactory))
- .withOnCompletionActions(onCompletionActions)
- .createJobMaster();
+ final JobManagerSharedServices jobManagerSharedServices =
+ new TestingJobManagerSharedServicesBuilder().build();
+ try {
+ final JobMaster jobMaster =
+ new JobMasterBuilder(
+ JobGraphTestUtils.emptyJobGraph(),
+
TESTING_RPC_SERVICE_RESOURCE.getTestingRpcService())
+ .withSlotPoolServiceSchedulerFactory(
+
DefaultSlotPoolServiceSchedulerFactory.create(
+
TestingSlotPoolServiceBuilder.newBuilder(),
+ schedulerFactory))
+ .withOnCompletionActions(onCompletionActions)
+
.withJobManagerSharedServices(jobManagerSharedServices)
+ .createJobMaster();
- jobMaster.start();
+ jobMaster.start();
- assertThat(
- onCompletionActions.getJobMasterFailedFuture().join(),
- is(instanceOf(JobMasterException.class)));
+ assertThat(
+ onCompletionActions.getJobMasterFailedFuture().join(),
+ is(instanceOf(JobMasterException.class)));
- // close the jobMaster to remove it from the testing rpc service so
that it can shut down
- // cleanly
- try {
- jobMaster.close();
- } catch (Exception expected) {
- // expected
+ // close the jobMaster to remove it from the testing rpc service
so that it can shut
+ // down
+ // cleanly
Review Comment:
```suggestion
// down cleanly
```
##########
flink-core/src/test/java/org/apache/flink/util/concurrent/FutureUtilsTest.java:
##########
@@ -415,42 +324,6 @@ public void testOrTimeout() throws Exception {
}
}
- @Test
- public void testRetryWithDelayAndPredicate() throws Exception {
Review Comment:
I couldn't find a test where we verify the error Predicate condition like we
do here. Can you point me to it?
##########
flink-core/src/test/java/org/apache/flink/util/concurrent/FutureUtilsTest.java:
##########
@@ -316,38 +257,6 @@ public void testRetryWithDelayRetryStrategy() throws
Exception {
completionTime >= (2 + 4 + 5 + 5));
}
- /** Tests that all scheduled tasks are canceled if the retry future is
being cancelled. */
- @Test
- public void testRetryWithDelayCancellation() {
- final ManuallyTriggeredScheduledExecutor scheduledExecutor =
- new ManuallyTriggeredScheduledExecutor();
-
- CompletableFuture<?> retryFuture =
- FutureUtils.retryWithDelay(
- () ->
- FutureUtils.completedExceptionally(
- new FlinkException("Test exception")),
- 1,
- TestingUtils.infiniteTime(),
- scheduledExecutor);
-
- assertFalse(retryFuture.isDone());
-
- final Collection<ScheduledFuture<?>> scheduledTasks =
- scheduledExecutor.getActiveScheduledTasks();
-
- assertFalse(scheduledTasks.isEmpty());
-
- final ScheduledFuture<?> scheduledFuture =
scheduledTasks.iterator().next();
-
- assertFalse(scheduledFuture.isDone());
Review Comment:
I saw `testScheduleWithDelayCancellation` covering this functionality.
Therefore, removing the test is fine. I'm just wondering whether we should move
this part into `testScheduleWithDelayCancellation` to have it still covered (or
maybe, create a separate test that checks that the task is scheduled).
##########
flink-tests/src/test/java/org/apache/flink/test/checkpointing/RescalingITCase.java:
##########
@@ -555,10 +562,11 @@ public void
testSavepointRescalingPartitionedOperatorState(
() ->
client.triggerSavepoint(
jobID, null,
SavepointFormatType.CANONICAL),
- (int) deadline.timeLeft().getSeconds() / 10,
- Time.seconds(10),
+ new FixedRetryStrategy(
+ (int) deadline.timeLeft().getSeconds() /
10,
+ Duration.ofSeconds(60)),
Review Comment:
Did you change the delay on purpose from 10 to 60 seconds?
##########
flink-runtime/src/test/java/org/apache/flink/runtime/jobmaster/JobMasterExecutionDeploymentReconciliationTest.java:
##########
@@ -157,46 +160,49 @@ public void
testExecutionDeploymentReconciliationForPendingExecution() throws Ex
TestingExecutionDeploymentTrackerWrapper deploymentTrackerWrapper =
new TestingExecutionDeploymentTrackerWrapper();
final JobGraph jobGraph = JobGraphTestUtils.singleNoOpJobGraph();
- JobMaster jobMaster =
createAndStartJobMaster(deploymentTrackerWrapper, jobGraph);
- JobMasterGateway jobMasterGateway =
jobMaster.getSelfGateway(JobMasterGateway.class);
- RPC_SERVICE_RESOURCE
- .getTestingRpcService()
- .registerGateway(jobMasterGateway.getAddress(),
jobMasterGateway);
-
- final CompletableFuture<ExecutionAttemptID> taskSubmissionFuture =
- new CompletableFuture<>();
- final CompletableFuture<ExecutionAttemptID> taskCancellationFuture =
- new CompletableFuture<>();
- final CompletableFuture<Acknowledge> taskSubmissionAcknowledgeFuture =
- new CompletableFuture<>();
- TaskExecutorGateway taskExecutorGateway =
- createTaskExecutorGateway(
- taskCancellationFuture,
- taskSubmissionFuture,
- taskSubmissionAcknowledgeFuture);
- LocalUnresolvedTaskManagerLocation localUnresolvedTaskManagerLocation =
- new LocalUnresolvedTaskManagerLocation();
-
- registerTaskExecutorAndOfferSlots(
- jobMasterGateway,
- jobGraph.getJobID(),
- taskExecutorGateway,
- localUnresolvedTaskManagerLocation);
-
- ExecutionAttemptID pendingExecutionId = taskSubmissionFuture.get();
-
- // the execution has not been acknowledged yet by the TaskExecutor,
but we already allow the
- // ID to be in the heartbeat payload
- jobMasterGateway.heartbeatFromTaskManager(
- localUnresolvedTaskManagerLocation.getResourceID(),
- new TaskExecutorToJobManagerHeartbeatPayload(
- new AccumulatorReport(Collections.emptyList()),
- new
ExecutionDeploymentReport(Collections.singleton(pendingExecutionId))));
-
- taskSubmissionAcknowledgeFuture.complete(Acknowledge.get());
-
- deploymentTrackerWrapper.getTaskDeploymentFuture().get();
- assertFalse(taskCancellationFuture.isDone());
+ try (JobMaster jobMaster =
createAndStartJobMaster(deploymentTrackerWrapper, jobGraph)) {
+ JobMasterGateway jobMasterGateway =
jobMaster.getSelfGateway(JobMasterGateway.class);
+ RPC_SERVICE_RESOURCE
+ .getTestingRpcService()
+ .registerGateway(jobMasterGateway.getAddress(),
jobMasterGateway);
+
+ final CompletableFuture<ExecutionAttemptID> taskSubmissionFuture =
+ new CompletableFuture<>();
+ final CompletableFuture<ExecutionAttemptID> taskCancellationFuture
=
+ new CompletableFuture<>();
+ final CompletableFuture<Acknowledge>
taskSubmissionAcknowledgeFuture =
+ new CompletableFuture<>();
+ TaskExecutorGateway taskExecutorGateway =
+ createTaskExecutorGateway(
+ taskCancellationFuture,
+ taskSubmissionFuture,
+ taskSubmissionAcknowledgeFuture);
+ LocalUnresolvedTaskManagerLocation
localUnresolvedTaskManagerLocation =
+ new LocalUnresolvedTaskManagerLocation();
+
+ registerTaskExecutorAndOfferSlots(
+ jobMasterGateway,
+ jobGraph.getJobID(),
+ taskExecutorGateway,
+ localUnresolvedTaskManagerLocation);
+
+ ExecutionAttemptID pendingExecutionId = taskSubmissionFuture.get();
+
+ // the execution has not been acknowledged yet by the
TaskExecutor, but we already allow
+ // the
+ // ID to be in the heartbeat payload
Review Comment:
```suggestion
// the ID to be in the heartbeat payload
```
--
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
To unsubscribe, e-mail: [email protected]
For queries about this service, please contact Infrastructure at:
[email protected]