rmetzger commented on a change in pull request #13639:
URL: https://github.com/apache/flink/pull/13639#discussion_r504731335
##########
File path:
flink-runtime/src/test/java/org/apache/flink/runtime/jobmaster/JobMasterTest.java
##########
@@ -280,6 +286,65 @@ public static void teardownClass() {
}
}
+ /**
+ * This test ensures that the bookkeeping of TaskExecutors in the
JobMaster handles cases where TaskExecutors with the same
+ * ID re-register properly. FLINK-19237 was a bug where the
TaskExecutors and the SlotPool got out of sync, and
+ * slot offers were rejected.
+ */
+ @Test
+ public void testAcceptSlotOfferAfterLeaderchange() throws Exception {
+
+ final JobManagerSharedServices jobManagerSharedServices = new
TestingJobManagerSharedServicesBuilder().build();
+ final JobMasterConfiguration jobMasterConfiguration =
JobMasterConfiguration.fromConfiguration(configuration);
+
+ final SchedulerNGFactory schedulerNGFactory =
SchedulerNGFactoryFactory.createSchedulerNGFactory(configuration);
+
+ final TestingJobMaster jobMaster = new TestingJobMaster(
+ rpcService,
+ jobMasterConfiguration,
+ jmResourceId,
+ jobGraph,
+ haServices,
+ SlotPoolFactory.fromConfiguration(configuration),
+ jobManagerSharedServices,
+ heartbeatServices,
+ UnregisteredJobManagerJobMetricGroupFactory.INSTANCE,
+ new JobMasterBuilder.TestingOnCompletionActions(),
+ testingFatalErrorHandler,
+ JobMasterTest.class.getClassLoader(),
+ schedulerNGFactory,
+ NettyShuffleMaster.INSTANCE,
+ NoOpJobMasterPartitionTracker.FACTORY,
+ new DefaultExecutionDeploymentTracker(),
+ DefaultExecutionDeploymentReconciler::new,
+ System.currentTimeMillis());
+
+ jobMaster.start(jobMasterId).get();
+
+ log.info("Register TaskManager");
+
+ String testingTaskManagerAddress = "fake";
+ UnresolvedTaskManagerLocation unresolvedTaskManagerLocation =
new LocalUnresolvedTaskManagerLocation();
+ TestingTaskExecutorGateway testingTaskExecutorGateway = new
TestingTaskExecutorGatewayBuilder().createTestingTaskExecutorGateway();
+ rpcService.registerGateway(testingTaskManagerAddress,
testingTaskExecutorGateway);
+
Assert.assertThat(jobMaster.registerTaskManager(testingTaskManagerAddress,
unresolvedTaskManagerLocation, testingTimeout).get(),
instanceOf(RegistrationResponse.Success.class));
+
+ log.info("Revoke leadership & re-grant leadership");
+ jobMaster.suspend(new FlinkException("Lost leadership")).get();
+
+ jobMaster.start(JobMasterId.generate()).get();
+
+ log.info("re-register same TaskManager");
+
Assert.assertThat(jobMaster.registerTaskManager(testingTaskManagerAddress,
unresolvedTaskManagerLocation, testingTimeout).get(),
instanceOf(RegistrationResponse.Success.class));
+
+ log.info("Ensure JobMaster accepts slot offer");
+ final SlotOffer slotOffer = new SlotOffer(new AllocationID(),
0, ResourceProfile.ANY);
+
+ Collection<SlotOffer> acceptedSlots =
jobMaster.executeInMainThreadExecutor(() ->
jobMaster.offerSlots(unresolvedTaskManagerLocation.getResourceID(),
Collections.singleton(slotOffer), testingTimeout).get()).get();
+ Assert.assertThat(acceptedSlots.size(), is(1));
+
Review comment:
I will remove this empty line before merging or when addressing comments.
----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
For queries about this service, please contact Infrastructure at:
[email protected]