tillrohrmann commented on a change in pull request #13639:
URL: https://github.com/apache/flink/pull/13639#discussion_r505238512



##########
File path: 
flink-core/src/test/java/org/apache/flink/api/java/typeutils/runtime/kryo/KryoSerializerConcurrencyCheckInactiveITCase.java
##########
@@ -47,6 +50,7 @@
         */
        @Test
        public void testWithNoConcurrencyCheck() throws Exception {
+               assumeFalse(log.isDebugEnabled()); // this test will fail on 
DEBUG log level.

Review comment:
       I think it would be good to explain why the test fails.

##########
File path: 
flink-runtime/src/test/java/org/apache/flink/runtime/jobmaster/JobMasterTest.java
##########
@@ -280,6 +286,65 @@ public static void teardownClass() {
                }
        }
 
+       /**
+        * This test ensures that the bookkeeping of TaskExecutors in the 
JobMaster handles cases where TaskExecutors with the same
+        * ID re-register properly. FLINK-19237 was a bug where the 
TaskExecutors and the SlotPool got out of sync, and
+        * slot offers were rejected.
+        */
+       @Test
+       public void testAcceptSlotOfferAfterLeaderchange() throws Exception {

Review comment:
       ```suggestion
        public void testAcceptSlotOfferAfterLeaderChange() throws Exception {
   ```

##########
File path: 
flink-runtime/src/test/java/org/apache/flink/runtime/jobmaster/JobMasterTest.java
##########
@@ -2177,4 +2242,58 @@ public void disposeStorageLocation() throws IOException {
 
                }
        }
+
+
+       private static class TestingJobMaster extends JobMaster {
+
+               public TestingJobMaster(RpcService rpcService,
+                               JobMasterConfiguration jobMasterConfiguration,
+                               ResourceID resourceId,
+                               JobGraph jobGraph,
+                               HighAvailabilityServices 
highAvailabilityService,
+                               SlotPoolFactory slotPoolFactory,
+                               JobManagerSharedServices 
jobManagerSharedServices,
+                               HeartbeatServices heartbeatServices,
+                               JobManagerJobMetricGroupFactory 
jobMetricGroupFactory,
+                               OnCompletionActions jobCompletionActions,
+                               FatalErrorHandler fatalErrorHandler,
+                               ClassLoader userCodeLoader,
+                               SchedulerNGFactory schedulerNGFactory,
+                               ShuffleMaster<?> shuffleMaster,
+                               PartitionTrackerFactory partitionTrackerFactory,
+                               ExecutionDeploymentTracker 
executionDeploymentTracker,
+                               ExecutionDeploymentReconciler.Factory 
executionDeploymentReconcilerFactory,
+                               long initializationTimestamp) throws Exception {
+                       super(rpcService,
+                               jobMasterConfiguration,
+                               resourceId,
+                               jobGraph,
+                               highAvailabilityService,
+                               slotPoolFactory,
+                               jobManagerSharedServices,
+                               heartbeatServices,
+                               jobMetricGroupFactory,
+                               jobCompletionActions,
+                               fatalErrorHandler,
+                               userCodeLoader,
+                               schedulerNGFactory,
+                               shuffleMaster,
+                               partitionTrackerFactory,
+                               executionDeploymentTracker,
+                               executionDeploymentReconcilerFactory,
+                               initializationTimestamp);
+               }
+
+               public <T> CompletableFuture<T> 
executeInMainThreadExecutor(SupplierWithException<T, Throwable> runnable) {
+                       CompletableFuture<T> result = new CompletableFuture<>();
+                       getMainThreadExecutor().execute(() -> {
+                               try {
+                                       result.complete(runnable.get());
+                               } catch (Throwable throwable) {
+                                       result.completeExceptionally(throwable);
+                               }
+                       });
+                       return result;
+               }
+       }

Review comment:
       This class should not be needed.

##########
File path: 
flink-runtime/src/test/java/org/apache/flink/runtime/jobmaster/JobMasterTest.java
##########
@@ -280,6 +286,65 @@ public static void teardownClass() {
                }
        }
 
+       /**
+        * This test ensures that the bookkeeping of TaskExecutors in the 
JobMaster handles cases where TaskExecutors with the same
+        * ID re-register properly. FLINK-19237 was a bug where the 
TaskExecutors and the SlotPool got out of sync, and
+        * slot offers were rejected.
+        */
+       @Test
+       public void testAcceptSlotOfferAfterLeaderchange() throws Exception {
+
+               final JobManagerSharedServices jobManagerSharedServices = new 
TestingJobManagerSharedServicesBuilder().build();
+               final JobMasterConfiguration jobMasterConfiguration = 
JobMasterConfiguration.fromConfiguration(configuration);
+
+               final SchedulerNGFactory schedulerNGFactory = 
SchedulerNGFactoryFactory.createSchedulerNGFactory(configuration);
+
+               final TestingJobMaster jobMaster = new TestingJobMaster(
+                       rpcService,
+                       jobMasterConfiguration,
+                       jmResourceId,
+                       jobGraph,
+                       haServices,
+                       SlotPoolFactory.fromConfiguration(configuration),
+                       jobManagerSharedServices,
+                       heartbeatServices,
+                       UnregisteredJobManagerJobMetricGroupFactory.INSTANCE,
+                       new JobMasterBuilder.TestingOnCompletionActions(),
+                       testingFatalErrorHandler,
+                       JobMasterTest.class.getClassLoader(),
+                       schedulerNGFactory,
+                       NettyShuffleMaster.INSTANCE,
+                       NoOpJobMasterPartitionTracker.FACTORY,
+                       new DefaultExecutionDeploymentTracker(),
+                       DefaultExecutionDeploymentReconciler::new,
+                       System.currentTimeMillis());
+
+               jobMaster.start(jobMasterId).get();
+
+               log.info("Register TaskManager");
+
+               String testingTaskManagerAddress = "fake";
+               UnresolvedTaskManagerLocation unresolvedTaskManagerLocation = 
new LocalUnresolvedTaskManagerLocation();
+               TestingTaskExecutorGateway testingTaskExecutorGateway = new 
TestingTaskExecutorGatewayBuilder().createTestingTaskExecutorGateway();
+               rpcService.registerGateway(testingTaskManagerAddress, 
testingTaskExecutorGateway);
+               
Assert.assertThat(jobMaster.registerTaskManager(testingTaskManagerAddress, 
unresolvedTaskManagerLocation, testingTimeout).get(), 
instanceOf(RegistrationResponse.Success.class));
+
+               log.info("Revoke leadership & re-grant leadership");
+               jobMaster.suspend(new FlinkException("Lost leadership")).get();
+
+               jobMaster.start(JobMasterId.generate()).get();
+
+               log.info("re-register same TaskManager");
+               
Assert.assertThat(jobMaster.registerTaskManager(testingTaskManagerAddress, 
unresolvedTaskManagerLocation, testingTimeout).get(), 
instanceOf(RegistrationResponse.Success.class));
+
+               log.info("Ensure JobMaster accepts slot offer");
+               final SlotOffer slotOffer = new SlotOffer(new AllocationID(), 
0, ResourceProfile.ANY);
+
+               Collection<SlotOffer> acceptedSlots = 
jobMaster.executeInMainThreadExecutor(() -> 
jobMaster.offerSlots(unresolvedTaskManagerLocation.getResourceID(), 
Collections.singleton(slotOffer), testingTimeout).get()).get();
+               Assert.assertThat(acceptedSlots.size(), is(1));

Review comment:
       Instead of calling methods directly on the `JobMaster` I suggest to 
retrieve the self gateway via 
`jobMaster.getSelfGateway(JobMasterGateway.class)` and then to use this 
interface to send RPCs. That way, we also don't need 
`executeInMainThreadExecutor`.




----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
[email protected]


Reply via email to