rmetzger commented on a change in pull request #13639:
URL: https://github.com/apache/flink/pull/13639#discussion_r505317363



##########
File path: 
flink-runtime/src/test/java/org/apache/flink/runtime/jobmaster/JobMasterTest.java
##########
@@ -280,6 +286,65 @@ public static void teardownClass() {
                }
        }
 
+       /**
+        * This test ensures that the bookkeeping of TaskExecutors in the 
JobMaster handles cases where TaskExecutors with the same
+        * ID re-register properly. FLINK-19237 was a bug where the 
TaskExecutors and the SlotPool got out of sync, and
+        * slot offers were rejected.
+        */
+       @Test
+       public void testAcceptSlotOfferAfterLeaderchange() throws Exception {
+
+               final JobManagerSharedServices jobManagerSharedServices = new 
TestingJobManagerSharedServicesBuilder().build();
+               final JobMasterConfiguration jobMasterConfiguration = 
JobMasterConfiguration.fromConfiguration(configuration);
+
+               final SchedulerNGFactory schedulerNGFactory = 
SchedulerNGFactoryFactory.createSchedulerNGFactory(configuration);
+
+               final TestingJobMaster jobMaster = new TestingJobMaster(
+                       rpcService,
+                       jobMasterConfiguration,
+                       jmResourceId,
+                       jobGraph,
+                       haServices,
+                       SlotPoolFactory.fromConfiguration(configuration),
+                       jobManagerSharedServices,
+                       heartbeatServices,
+                       UnregisteredJobManagerJobMetricGroupFactory.INSTANCE,
+                       new JobMasterBuilder.TestingOnCompletionActions(),
+                       testingFatalErrorHandler,
+                       JobMasterTest.class.getClassLoader(),
+                       schedulerNGFactory,
+                       NettyShuffleMaster.INSTANCE,
+                       NoOpJobMasterPartitionTracker.FACTORY,
+                       new DefaultExecutionDeploymentTracker(),
+                       DefaultExecutionDeploymentReconciler::new,
+                       System.currentTimeMillis());
+
+               jobMaster.start(jobMasterId).get();
+
+               log.info("Register TaskManager");
+
+               String testingTaskManagerAddress = "fake";
+               UnresolvedTaskManagerLocation unresolvedTaskManagerLocation = 
new LocalUnresolvedTaskManagerLocation();
+               TestingTaskExecutorGateway testingTaskExecutorGateway = new 
TestingTaskExecutorGatewayBuilder().createTestingTaskExecutorGateway();
+               rpcService.registerGateway(testingTaskManagerAddress, 
testingTaskExecutorGateway);
+               
Assert.assertThat(jobMaster.registerTaskManager(testingTaskManagerAddress, 
unresolvedTaskManagerLocation, testingTimeout).get(), 
instanceOf(RegistrationResponse.Success.class));
+
+               log.info("Revoke leadership & re-grant leadership");
+               jobMaster.suspend(new FlinkException("Lost leadership")).get();
+
+               jobMaster.start(JobMasterId.generate()).get();
+
+               log.info("re-register same TaskManager");
+               
Assert.assertThat(jobMaster.registerTaskManager(testingTaskManagerAddress, 
unresolvedTaskManagerLocation, testingTimeout).get(), 
instanceOf(RegistrationResponse.Success.class));
+
+               log.info("Ensure JobMaster accepts slot offer");
+               final SlotOffer slotOffer = new SlotOffer(new AllocationID(), 
0, ResourceProfile.ANY);
+
+               Collection<SlotOffer> acceptedSlots = 
jobMaster.executeInMainThreadExecutor(() -> 
jobMaster.offerSlots(unresolvedTaskManagerLocation.getResourceID(), 
Collections.singleton(slotOffer), testingTimeout).get()).get();
+               Assert.assertThat(acceptedSlots.size(), is(1));

Review comment:
       Ha! Thanks. The old rule applied here again: I was doing something hacky 
-- which is a clear indicator of missing something obvious ;) 




----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
[email protected]


Reply via email to