tillrohrmann commented on a change in pull request #13639:
URL: https://github.com/apache/flink/pull/13639#discussion_r505238512
##########
File path:
flink-core/src/test/java/org/apache/flink/api/java/typeutils/runtime/kryo/KryoSerializerConcurrencyCheckInactiveITCase.java
##########
@@ -47,6 +50,7 @@
*/
@Test
public void testWithNoConcurrencyCheck() throws Exception {
+ assumeFalse(log.isDebugEnabled()); // this test will fail on
DEBUG log level.
Review comment:
I think it would be good to explain why the test fails.
##########
File path:
flink-runtime/src/test/java/org/apache/flink/runtime/jobmaster/JobMasterTest.java
##########
@@ -280,6 +286,65 @@ public static void teardownClass() {
}
}
+ /**
+ * This test ensures that the bookkeeping of TaskExecutors in the
JobMaster handles cases where TaskExecutors with the same
+ * ID re-register properly. FLINK-19237 was a bug where the
TaskExecutors and the SlotPool got out of sync, and
+ * slot offers were rejected.
+ */
+ @Test
+ public void testAcceptSlotOfferAfterLeaderchange() throws Exception {
Review comment:
```suggestion
public void testAcceptSlotOfferAfterLeaderChange() throws Exception {
```
##########
File path:
flink-runtime/src/test/java/org/apache/flink/runtime/jobmaster/JobMasterTest.java
##########
@@ -2177,4 +2242,58 @@ public void disposeStorageLocation() throws IOException {
}
}
+
+
+ private static class TestingJobMaster extends JobMaster {
+
+ public TestingJobMaster(RpcService rpcService,
+ JobMasterConfiguration jobMasterConfiguration,
+ ResourceID resourceId,
+ JobGraph jobGraph,
+ HighAvailabilityServices
highAvailabilityService,
+ SlotPoolFactory slotPoolFactory,
+ JobManagerSharedServices
jobManagerSharedServices,
+ HeartbeatServices heartbeatServices,
+ JobManagerJobMetricGroupFactory
jobMetricGroupFactory,
+ OnCompletionActions jobCompletionActions,
+ FatalErrorHandler fatalErrorHandler,
+ ClassLoader userCodeLoader,
+ SchedulerNGFactory schedulerNGFactory,
+ ShuffleMaster<?> shuffleMaster,
+ PartitionTrackerFactory partitionTrackerFactory,
+ ExecutionDeploymentTracker
executionDeploymentTracker,
+ ExecutionDeploymentReconciler.Factory
executionDeploymentReconcilerFactory,
+ long initializationTimestamp) throws Exception {
+ super(rpcService,
+ jobMasterConfiguration,
+ resourceId,
+ jobGraph,
+ highAvailabilityService,
+ slotPoolFactory,
+ jobManagerSharedServices,
+ heartbeatServices,
+ jobMetricGroupFactory,
+ jobCompletionActions,
+ fatalErrorHandler,
+ userCodeLoader,
+ schedulerNGFactory,
+ shuffleMaster,
+ partitionTrackerFactory,
+ executionDeploymentTracker,
+ executionDeploymentReconcilerFactory,
+ initializationTimestamp);
+ }
+
+ public <T> CompletableFuture<T>
executeInMainThreadExecutor(SupplierWithException<T, Throwable> runnable) {
+ CompletableFuture<T> result = new CompletableFuture<>();
+ getMainThreadExecutor().execute(() -> {
+ try {
+ result.complete(runnable.get());
+ } catch (Throwable throwable) {
+ result.completeExceptionally(throwable);
+ }
+ });
+ return result;
+ }
+ }
Review comment:
This class should not be needed.
##########
File path:
flink-runtime/src/test/java/org/apache/flink/runtime/jobmaster/JobMasterTest.java
##########
@@ -280,6 +286,65 @@ public static void teardownClass() {
}
}
+ /**
+ * This test ensures that the bookkeeping of TaskExecutors in the
JobMaster handles cases where TaskExecutors with the same
+ * ID re-register properly. FLINK-19237 was a bug where the
TaskExecutors and the SlotPool got out of sync, and
+ * slot offers were rejected.
+ */
+ @Test
+ public void testAcceptSlotOfferAfterLeaderchange() throws Exception {
+
+ final JobManagerSharedServices jobManagerSharedServices = new
TestingJobManagerSharedServicesBuilder().build();
+ final JobMasterConfiguration jobMasterConfiguration =
JobMasterConfiguration.fromConfiguration(configuration);
+
+ final SchedulerNGFactory schedulerNGFactory =
SchedulerNGFactoryFactory.createSchedulerNGFactory(configuration);
+
+ final TestingJobMaster jobMaster = new TestingJobMaster(
+ rpcService,
+ jobMasterConfiguration,
+ jmResourceId,
+ jobGraph,
+ haServices,
+ SlotPoolFactory.fromConfiguration(configuration),
+ jobManagerSharedServices,
+ heartbeatServices,
+ UnregisteredJobManagerJobMetricGroupFactory.INSTANCE,
+ new JobMasterBuilder.TestingOnCompletionActions(),
+ testingFatalErrorHandler,
+ JobMasterTest.class.getClassLoader(),
+ schedulerNGFactory,
+ NettyShuffleMaster.INSTANCE,
+ NoOpJobMasterPartitionTracker.FACTORY,
+ new DefaultExecutionDeploymentTracker(),
+ DefaultExecutionDeploymentReconciler::new,
+ System.currentTimeMillis());
+
+ jobMaster.start(jobMasterId).get();
+
+ log.info("Register TaskManager");
+
+ String testingTaskManagerAddress = "fake";
+ UnresolvedTaskManagerLocation unresolvedTaskManagerLocation =
new LocalUnresolvedTaskManagerLocation();
+ TestingTaskExecutorGateway testingTaskExecutorGateway = new
TestingTaskExecutorGatewayBuilder().createTestingTaskExecutorGateway();
+ rpcService.registerGateway(testingTaskManagerAddress,
testingTaskExecutorGateway);
+
Assert.assertThat(jobMaster.registerTaskManager(testingTaskManagerAddress,
unresolvedTaskManagerLocation, testingTimeout).get(),
instanceOf(RegistrationResponse.Success.class));
+
+ log.info("Revoke leadership & re-grant leadership");
+ jobMaster.suspend(new FlinkException("Lost leadership")).get();
+
+ jobMaster.start(JobMasterId.generate()).get();
+
+ log.info("re-register same TaskManager");
+
Assert.assertThat(jobMaster.registerTaskManager(testingTaskManagerAddress,
unresolvedTaskManagerLocation, testingTimeout).get(),
instanceOf(RegistrationResponse.Success.class));
+
+ log.info("Ensure JobMaster accepts slot offer");
+ final SlotOffer slotOffer = new SlotOffer(new AllocationID(),
0, ResourceProfile.ANY);
+
+ Collection<SlotOffer> acceptedSlots =
jobMaster.executeInMainThreadExecutor(() ->
jobMaster.offerSlots(unresolvedTaskManagerLocation.getResourceID(),
Collections.singleton(slotOffer), testingTimeout).get()).get();
+ Assert.assertThat(acceptedSlots.size(), is(1));
Review comment:
Instead of calling methods directly on the `JobMaster` I suggest to
retrieve the self gateway via
`jobMaster.getSelfGateway(JobMasterGateway.class)` and then to use this
interface to send RPCs. That way, we also don't need
`executeInMainThreadExecutor`.
----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
For queries about this service, please contact Infrastructure at:
[email protected]