1996fanrui commented on code in PR #23973: URL: https://github.com/apache/flink/pull/23973#discussion_r1432906607
########## flink-runtime/src/test/java/org/apache/flink/runtime/resourcemanager/ResourceManagerJobMasterTest.java: ########## @@ -191,12 +184,12 @@ public void testRegisterJobMasterWithUnmatchedLeaderSessionId2() throws Exceptio jobMasterGateway.getAddress(), jobId, TIMEOUT); - assertTrue(unMatchedLeaderFuture.get() instanceof RegistrationResponse.Failure); + assertThat(unMatchedLeaderFuture.get()).isInstanceOf(RegistrationResponse.Failure.class); Review Comment: ```suggestion assertThatFuture(unMatchedLeaderFuture) .withThrowableOfType(RegistrationResponse.Failure.class) ``` ########## flink-runtime/src/test/java/org/apache/flink/runtime/resourcemanager/ResourceManagerJobMasterTest.java: ########## @@ -169,18 +166,14 @@ public void testRegisterJobMasterWithUnmatchedLeaderSessionId1() throws Exceptio jobMasterGateway.getAddress(), jobId, TIMEOUT); - - try { - unMatchedLeaderFuture.get(5L, TimeUnit.SECONDS); - fail("Should fail because we are using the wrong fencing token."); - } catch (ExecutionException e) { - assertTrue(ExceptionUtils.stripExecutionException(e) instanceof FencingTokenException); - } + assertThatThrownBy(() -> unMatchedLeaderFuture.get(5L, TimeUnit.SECONDS)) + .withFailMessage("Should fail because we are using the wrong fencing token.") + .hasCauseInstanceOf(FencingTokenException.class); Review Comment: How about using `assertThatFuture(invalidAddressFuture)`? ########## flink-runtime/src/test/java/org/apache/flink/runtime/resourcemanager/ResourceManagerServiceImplTest.java: ########## @@ -309,12 +305,11 @@ public void revokeLeadership_stopExistLeader() throws Exception { leaderElection.notLeader(); // should terminate RM - assertThat(terminateRmFuture.get(), is(leaderSessionId)); + assertThat(terminateRmFuture.get()).isSameAs(leaderSessionId); Review Comment: How about assertThatFuture? ########## flink-runtime/src/test/java/org/apache/flink/runtime/resourcemanager/ResourceManagerJobMasterTest.java: ########## @@ -207,17 +200,16 @@ public void testRegisterJobMasterFromInvalidAddress() throws Exception { invalidAddress, jobId, TIMEOUT); - assertTrue( - invalidAddressFuture.get(5, TimeUnit.SECONDS) - instanceof RegistrationResponse.Failure); + assertThat(invalidAddressFuture.get(5, TimeUnit.SECONDS)) + .isInstanceOf(RegistrationResponse.Failure.class); Review Comment: ```suggestion assertThatFuture(invalidAddressFuture) .failsWithin(5, TimeUnit.SECONDS) .withThrowableOfType(RegistrationResponse.Failure.class) ``` ########## flink-runtime/src/test/java/org/apache/flink/runtime/resourcemanager/active/ActiveResourceManagerTest.java: ########## @@ -871,32 +868,34 @@ public void testRecoverWorkerFromPreviousAttempt() throws Exception { CompletableFuture<RegistrationResponse> registerTaskExecutorFuture = registerTaskExecutor(tmResourceId); assertThat( - registerTaskExecutorFuture.get(TIMEOUT_SEC, TimeUnit.SECONDS), - instanceOf(RegistrationResponse.Success.class)); + registerTaskExecutorFuture.get( + TIMEOUT_SEC, TimeUnit.SECONDS)) + .isInstanceOf(RegistrationResponse.Success.class); }); } }; } /** Tests decline unknown worker registration. */ @Test - public void testRegisterUnknownWorker() throws Exception { + void testRegisterUnknownWorker() throws Exception { new Context() { { runTest( () -> { CompletableFuture<RegistrationResponse> registerTaskExecutorFuture = registerTaskExecutor(ResourceID.generate()); assertThat( - registerTaskExecutorFuture.get(TIMEOUT_SEC, TimeUnit.SECONDS), - instanceOf(RegistrationResponse.Rejection.class)); + registerTaskExecutorFuture.get( Review Comment: Is the get needed? ########## flink-runtime/src/test/java/org/apache/flink/runtime/resourcemanager/active/ActiveResourceManagerTest.java: ########## @@ -940,16 +939,15 @@ public void testWorkerRegistrationTimeout() throws Exception { .requestNewWorker(WORKER_RESOURCE_SPEC)); // verify worker is released due to not registered in time - assertThat( - releaseResourceFuture.get(TIMEOUT_SEC, TimeUnit.SECONDS), - is(tmResourceId)); + assertThat(releaseResourceFuture.get(TIMEOUT_SEC, TimeUnit.SECONDS)) Review Comment: Is the get needed? ########## flink-runtime/src/test/java/org/apache/flink/runtime/resourcemanager/active/ActiveResourceManagerTest.java: ########## @@ -193,15 +186,15 @@ public void testLessThanDeclareResource() throws Exception { declareResourceFuture.get(TIMEOUT_SEC, TimeUnit.SECONDS); // request new worker. - assertThat(requestCount.get(), is(3)); + assertThat(requestCount.get()).isEqualTo(3); Review Comment: This class has lots of `AtomicInteger`, please update all of them, thanks~ ########## flink-runtime/src/test/java/org/apache/flink/runtime/resourcemanager/ResourceManagerServiceImplTest.java: ########## @@ -254,12 +249,13 @@ public void grantLeadership_withExistingLeader_waitTerminationOfExistingLeader() finishRmTerminationFuture.complete(null); // should start new RM and confirm leader session - assertThat(startRmFuture2.get(), is(leaderSessionId2)); - assertThat(confirmedLeaderInformation.get().getLeaderSessionID(), is(leaderSessionId2)); + assertThat(startRmFuture2.get()).isSameAs(leaderSessionId2); Review Comment: How about assertThatFuture? ########## flink-runtime/src/test/java/org/apache/flink/runtime/resourcemanager/ResourceManagerServiceImplTest.java: ########## @@ -365,7 +360,7 @@ public void nonLeaderRmTerminated_doseNotTerminateService() throws Exception { // revoke leadership leaderElection.notLeader(); - assertThat(terminateRmFuture.get(), is(leaderSessionId)); + assertThat(terminateRmFuture.get()).isSameAs(leaderSessionId); Review Comment: How about assertThatFuture? ########## flink-runtime/src/test/java/org/apache/flink/runtime/resourcemanager/DefaultJobLeaderIdServiceTest.java: ########## @@ -122,23 +117,19 @@ public void testRemovingJob() throws Exception { // remove the job before we could find a leader jobLeaderIdService.removeJob(jobId); - assertFalse(jobLeaderIdService.containsJob(jobId)); - - try { - leaderIdFuture.get(); + assertThat(jobLeaderIdService.containsJob(jobId)).isFalse(); - fail("The leader id future should be completed exceptionally."); - } catch (ExecutionException ignored) { - // expected exception - } + assertThatThrownBy(leaderIdFuture::get) + .withFailMessage("The leader id future should be completed exceptionally.") + .isInstanceOf(ExecutionException.class); Review Comment: How about `assertThatFuture`? ########## flink-runtime/src/test/java/org/apache/flink/runtime/resourcemanager/ResourceManagerServiceImplTest.java: ########## @@ -213,14 +208,14 @@ public void grantLeadership_withExistingLeader_stopExistLeader() throws Exceptio leaderElection.isLeader(leaderSessionId2); // should terminate first RM, start a new RM and confirm leader session - assertThat(terminateRmFuture.get(), is(leaderSessionId1)); - assertThat(startRmFuture2.get(), is(leaderSessionId2)); - assertThat(confirmedLeaderInformation.get().getLeaderSessionID(), is(leaderSessionId2)); + assertThat(terminateRmFuture.get()).isSameAs(leaderSessionId1); + assertThat(startRmFuture2.get()).isSameAs(leaderSessionId2); Review Comment: How about assertThatFuture? ########## flink-runtime/src/test/java/org/apache/flink/runtime/resourcemanager/ResourceManagerJobMasterTest.java: ########## @@ -229,13 +221,12 @@ public void testRegisterJobMasterWithFailureLeaderListener() throws Exception { unknownJobIDToHAServices, TIMEOUT); - try { - registrationFuture.get(TIMEOUT.toMilliseconds(), TimeUnit.MILLISECONDS); - fail("Expected to fail with a ResourceManagerException."); - } catch (ExecutionException e) { - assertTrue( - ExceptionUtils.stripExecutionException(e) instanceof ResourceManagerException); - } + assertThatThrownBy( + () -> + registrationFuture.get( + TIMEOUT.toMilliseconds(), TimeUnit.MILLISECONDS)) + .withFailMessage("Expected to fail with a ResourceManagerException.") + .isInstanceOf(ResourceManagerException.class); Review Comment: How about assertThatFuture? ########## flink-runtime/src/test/java/org/apache/flink/runtime/resourcemanager/ResourceManagerPartitionLifecycleTest.java: ########## @@ -195,7 +192,7 @@ public static void registerTaskExecutor( resourceManagerGateway.registerTaskExecutor( taskExecutorRegistration, TestingUtils.TIMEOUT); - assertThat(registrationFuture.get(), instanceOf(RegistrationResponse.Success.class)); + assertThat(registrationFuture.get()).isInstanceOf(RegistrationResponse.Success.class); Review Comment: How about assertThatFuture? ########## flink-runtime/src/test/java/org/apache/flink/runtime/resourcemanager/ResourceManagerServiceImplTest.java: ########## @@ -497,40 +492,31 @@ public void grantAndRevokeLeadership_verifyMetrics() throws Exception { MetricNames.NUM_REGISTERED_TASK_MANAGERS, MetricNames.TASK_SLOTS_TOTAL, MetricNames.TASK_SLOTS_AVAILABLE); - Assert.assertTrue( - "Expected RM to register leader metrics", - registeredMetrics.containsAll(expectedMetrics)); + assertThat(registeredMetrics) + .as("Expected RM to register leader metrics") + .containsAll(expectedMetrics); // revoke leadership, block until old rm is terminated revokeLeadership(); Set<String> intersection = new HashSet<>(registeredMetrics); intersection.retainAll(expectedMetrics); - Assert.assertTrue("Expected RM to unregister leader metrics", intersection.isEmpty()); + assertThat(intersection).as("Expected RM to unregister leader metrics").isEmpty(); leaderElection.isLeader(UUID.randomUUID()).join(); - Assert.assertTrue( - "Expected RM to re-register leader metrics", - registeredMetrics.containsAll(expectedMetrics)); + assertThat(registeredMetrics) + .as("Expected RM to re-register leader metrics") + .containsAll(expectedMetrics); } private static void blockOnFuture(CompletableFuture<?> future) { - try { - future.get(); - } catch (Exception e) { - e.printStackTrace(); - fail(); - } + assertThat(future).isCompletedExceptionally(); } private static void assertNotComplete(CompletableFuture<?> future) throws Exception { - try { - future.get(50, TimeUnit.MILLISECONDS); - fail(); - } catch (TimeoutException e) { - // expected - } + assertThatThrownBy(() -> future.get(50, TimeUnit.MILLISECONDS)) Review Comment: How about assertThatFuture? ########## flink-runtime/src/test/java/org/apache/flink/runtime/resourcemanager/ResourceManagerServiceImplTest.java: ########## @@ -156,12 +151,12 @@ public void grantLeadership_startRmAndConfirmLeaderSession() throws Exception { leaderElection.isLeader(leaderSessionId); // should start new RM and confirm leader session - assertThat(startRmFuture.get(), is(leaderSessionId)); - assertThat(confirmedLeaderInformation.get().getLeaderSessionID(), is(leaderSessionId)); + assertThat(startRmFuture.get()).isSameAs(leaderSessionId); Review Comment: How about assertThatFuture? ########## flink-runtime/src/test/java/org/apache/flink/runtime/resourcemanager/ResourceManagerTaskExecutorTest.java: ########## @@ -336,31 +321,28 @@ private Collection<SlotStatus> createSlots(int numberSlots) { /** Test receive registration with unmatched leadershipId from task executor. */ @Test - public void testRegisterTaskExecutorWithUnmatchedLeaderSessionId() throws Exception { + void testRegisterTaskExecutorWithUnmatchedLeaderSessionId() throws Exception { // test throw exception when receive a registration from taskExecutor which takes unmatched // leaderSessionId CompletableFuture<RegistrationResponse> unMatchedLeaderFuture = registerTaskExecutor(wronglyFencedGateway, taskExecutorGateway.getAddress()); - try { - unMatchedLeaderFuture.get(); - fail( - "Should have failed because we are using a wrongly fenced ResourceManagerGateway."); - } catch (ExecutionException e) { - assertTrue(ExceptionUtils.stripExecutionException(e) instanceof FencingTokenException); - } + assertThatThrownBy(unMatchedLeaderFuture::get) Review Comment: How about assertThatFuture? ########## flink-runtime/src/test/java/org/apache/flink/runtime/resourcemanager/ResourceManagerServiceImplTest.java: ########## @@ -385,18 +380,18 @@ public void closeService_stopRmAndLeaderElection() throws Exception { // grant leadership leaderElection.isLeader(UUID.randomUUID()).join(); - assertFalse(leaderElection.isStopped()); + assertThat(leaderElection.isStopped()).isFalse(); // close service resourceManagerService.close(); // should stop RM and leader election - assertTrue(terminateRmFuture.isDone()); - assertTrue(leaderElection.isStopped()); + assertThat(terminateRmFuture.isDone()).isTrue(); Review Comment: How about assertThatFuture? ########## flink-runtime/src/test/java/org/apache/flink/runtime/resourcemanager/ResourceManagerTaskExecutorTest.java: ########## @@ -234,17 +225,12 @@ public void testDelayedRegisterTaskExecutor() throws Exception { CompletableFuture<RegistrationResponse> firstFuture = rmGateway.registerTaskExecutor(taskExecutorRegistration, fastTimeout); - try { - firstFuture.get(); - fail( - "Should have failed because connection to taskmanager is delayed beyond timeout"); - } catch (Exception e) { - final Throwable cause = ExceptionUtils.stripExecutionException(e); - assertThat(cause, instanceOf(TimeoutException.class)); - assertThat( - cause.getMessage(), - containsString("ResourceManagerGateway.registerTaskExecutor")); - } + assertThatThrownBy(firstFuture::get) Review Comment: How about assertThatFuture? ########## flink-runtime/src/test/java/org/apache/flink/runtime/resourcemanager/active/ActiveResourceManagerTest.java: ########## @@ -193,15 +186,15 @@ public void testLessThanDeclareResource() throws Exception { declareResourceFuture.get(TIMEOUT_SEC, TimeUnit.SECONDS); // request new worker. - assertThat(requestCount.get(), is(3)); + assertThat(requestCount.get()).isEqualTo(3); Review Comment: get isn't needed. ########## flink-runtime/src/test/java/org/apache/flink/runtime/resourcemanager/ResourceManagerTaskExecutorTest.java: ########## @@ -336,31 +321,28 @@ private Collection<SlotStatus> createSlots(int numberSlots) { /** Test receive registration with unmatched leadershipId from task executor. */ @Test - public void testRegisterTaskExecutorWithUnmatchedLeaderSessionId() throws Exception { + void testRegisterTaskExecutorWithUnmatchedLeaderSessionId() throws Exception { // test throw exception when receive a registration from taskExecutor which takes unmatched // leaderSessionId CompletableFuture<RegistrationResponse> unMatchedLeaderFuture = registerTaskExecutor(wronglyFencedGateway, taskExecutorGateway.getAddress()); - try { - unMatchedLeaderFuture.get(); - fail( - "Should have failed because we are using a wrongly fenced ResourceManagerGateway."); - } catch (ExecutionException e) { - assertTrue(ExceptionUtils.stripExecutionException(e) instanceof FencingTokenException); - } + assertThatThrownBy(unMatchedLeaderFuture::get) + .withFailMessage( + "Should have failed because we are using a wrongly fenced ResourceManagerGateway.") + .hasCauseInstanceOf(FencingTokenException.class); } /** Test receive registration with invalid address from task executor. */ @Test - public void testRegisterTaskExecutorFromInvalidAddress() throws Exception { + void testRegisterTaskExecutorFromInvalidAddress() throws Exception { // test throw exception when receive a registration from taskExecutor which takes invalid // address String invalidAddress = "/taskExecutor2"; CompletableFuture<RegistrationResponse> invalidAddressFuture = registerTaskExecutor(rmGateway, invalidAddress); - assertTrue(invalidAddressFuture.get() instanceof RegistrationResponse.Failure); + assertThat(invalidAddressFuture.get()).isInstanceOf(RegistrationResponse.Failure.class); Review Comment: How about assertThatFuture? ########## flink-runtime/src/test/java/org/apache/flink/runtime/resourcemanager/active/ActiveResourceManagerTest.java: ########## @@ -175,7 +168,7 @@ public void testLessThanDeclareResource() throws Exception { .requestNewWorker(WORKER_RESOURCE_SPEC)) .get(TIMEOUT_SEC, TimeUnit.SECONDS); - assertThat(requestCount.get(), is(2)); + assertThat(requestCount.get()).isEqualTo(2); Review Comment: ```suggestion assertThat(requestCount).hasValue(2); ``` ########## flink-runtime/src/test/java/org/apache/flink/runtime/resourcemanager/active/ActiveResourceManagerTest.java: ########## @@ -557,22 +553,23 @@ public void testWorkerTerminatedAfterRegister() throws Exception { .get(1) .get(TIMEOUT_SEC, TimeUnit.SECONDS); - assertThat(taskExecutorProcessSpec2, is(taskExecutorProcessSpec1)); + assertThat(taskExecutorProcessSpec2).isSameAs(taskExecutorProcessSpec1); // second worker registered, verify registration succeed CompletableFuture<RegistrationResponse> registerTaskExecutorFuture2 = registerTaskExecutor(tmResourceIds.get(1)); assertThat( - registerTaskExecutorFuture2.get(TIMEOUT_SEC, TimeUnit.SECONDS), - instanceOf(RegistrationResponse.Success.class)); + registerTaskExecutorFuture2.get( Review Comment: Is the get needed? ########## flink-runtime/src/test/java/org/apache/flink/runtime/resourcemanager/active/ActiveResourceManagerTest.java: ########## @@ -531,19 +527,19 @@ public void testWorkerTerminatedAfterRegister() throws Exception { .get(TIMEOUT_SEC, TimeUnit.SECONDS); startNewWorkerFuture.get(TIMEOUT_SEC, TimeUnit.SECONDS); - assertThat( - taskExecutorProcessSpec1, - is( + assertThat(taskExecutorProcessSpec1) + .isSameAs( TaskExecutorProcessUtils .processSpecFromWorkerResourceSpec( - flinkConfig, WORKER_RESOURCE_SPEC))); + flinkConfig, WORKER_RESOURCE_SPEC)); // first worker registered, verify registration succeed CompletableFuture<RegistrationResponse> registerTaskExecutorFuture1 = registerTaskExecutor(tmResourceIds.get(0)); assertThat( - registerTaskExecutorFuture1.get(TIMEOUT_SEC, TimeUnit.SECONDS), - instanceOf(RegistrationResponse.Success.class)); + registerTaskExecutorFuture1.get( Review Comment: Is the get needed? ########## flink-runtime/src/test/java/org/apache/flink/runtime/resourcemanager/active/ActiveResourceManagerTest.java: ########## @@ -609,19 +606,19 @@ public void testWorkerTerminatedNoLongerRequired() throws Exception { .get(TIMEOUT_SEC, TimeUnit.SECONDS); startNewWorkerFuture.get(TIMEOUT_SEC, TimeUnit.SECONDS); - assertThat( - taskExecutorProcessSpec, - is( + assertThat(taskExecutorProcessSpec) + .isSameAs( TaskExecutorProcessUtils .processSpecFromWorkerResourceSpec( - flinkConfig, WORKER_RESOURCE_SPEC))); + flinkConfig, WORKER_RESOURCE_SPEC)); // worker registered, verify registration succeed CompletableFuture<RegistrationResponse> registerTaskExecutorFuture = registerTaskExecutor(tmResourceId); assertThat( - registerTaskExecutorFuture.get(TIMEOUT_SEC, TimeUnit.SECONDS), - instanceOf(RegistrationResponse.Success.class)); + registerTaskExecutorFuture.get( Review Comment: Is the get needed? ########## flink-runtime/src/test/java/org/apache/flink/runtime/resourcemanager/active/ActiveResourceManagerTest.java: ########## @@ -471,22 +466,23 @@ public void testWorkerTerminatedBeforeRegister() throws Exception { .get(1) .get(TIMEOUT_SEC, TimeUnit.SECONDS); - assertThat(taskExecutorProcessSpec2, is(taskExecutorProcessSpec1)); + assertThat(taskExecutorProcessSpec2).isSameAs(taskExecutorProcessSpec1); // second worker registered, verify registration succeed CompletableFuture<RegistrationResponse> registerTaskExecutorFuture = registerTaskExecutor(tmResourceIds.get(1)); assertThat( - registerTaskExecutorFuture.get(TIMEOUT_SEC, TimeUnit.SECONDS), - instanceOf(RegistrationResponse.Success.class)); + registerTaskExecutorFuture.get( Review Comment: Is the get needed? ########## flink-runtime/src/test/java/org/apache/flink/runtime/resourcemanager/active/ActiveResourceManagerTest.java: ########## @@ -756,23 +753,23 @@ public void testStartWorkerIntervalOnWorkerTerminationExceedFailureRate() throws .get(TIMEOUT_SEC, TimeUnit.SECONDS); // validate trying creating worker twice, with proper interval - assertThat( - (t2 - t1), - greaterThanOrEqualTo( - TESTING_START_WORKER_INTERVAL.toMilliseconds())); + assertThat((t2 - t1)) + .isGreaterThanOrEqualTo( + TESTING_START_WORKER_INTERVAL.toMilliseconds()); // second worker registered, verify registration succeed CompletableFuture<RegistrationResponse> registerTaskExecutorFuture = registerTaskExecutor(tmResourceIds.get(1)); assertThat( - registerTaskExecutorFuture.get(TIMEOUT_SEC, TimeUnit.SECONDS), - instanceOf(RegistrationResponse.Success.class)); + registerTaskExecutorFuture.get( Review Comment: Is the get needed? ########## flink-runtime/src/test/java/org/apache/flink/runtime/resourcemanager/active/ActiveResourceManagerTest.java: ########## @@ -837,26 +834,26 @@ public void testStartWorkerIntervalOnRequestWorkerFailure() throws Exception { .get(TIMEOUT_SEC, TimeUnit.SECONDS); // validate trying creating worker twice, with proper interval - assertThat( - (t2 - t1), - greaterThanOrEqualTo( - TESTING_START_WORKER_INTERVAL.toMilliseconds())); + assertThat((t2 - t1)) + .isGreaterThanOrEqualTo( + TESTING_START_WORKER_INTERVAL.toMilliseconds()); // second worker registered, verify registration succeed resourceIdFutures.get(1).complete(tmResourceId); CompletableFuture<RegistrationResponse> registerTaskExecutorFuture = registerTaskExecutor(tmResourceId); assertThat( - registerTaskExecutorFuture.get(TIMEOUT_SEC, TimeUnit.SECONDS), - instanceOf(RegistrationResponse.Success.class)); + registerTaskExecutorFuture.get( Review Comment: Is the get needed? ########## flink-runtime/src/test/java/org/apache/flink/runtime/resourcemanager/active/ActiveResourceManagerTest.java: ########## @@ -390,23 +385,24 @@ public void testStartNewWorkerFailedRequesting() throws Exception { .get(1) .get(TIMEOUT_SEC, TimeUnit.SECONDS); - assertThat(taskExecutorProcessSpec2, is(taskExecutorProcessSpec1)); + assertThat(taskExecutorProcessSpec2).isSameAs(taskExecutorProcessSpec1); // second request allocated, verify registration succeed runInMainThread(() -> resourceIdFutures.get(1).complete(tmResourceId)); CompletableFuture<RegistrationResponse> registerTaskExecutorFuture = registerTaskExecutor(tmResourceId); assertThat( - registerTaskExecutorFuture.get(TIMEOUT_SEC, TimeUnit.SECONDS), - instanceOf(RegistrationResponse.Success.class)); + registerTaskExecutorFuture.get( Review Comment: Is the get needed? ########## flink-runtime/src/test/java/org/apache/flink/runtime/resourcemanager/active/ActiveResourceManagerTest.java: ########## @@ -871,32 +868,34 @@ public void testRecoverWorkerFromPreviousAttempt() throws Exception { CompletableFuture<RegistrationResponse> registerTaskExecutorFuture = registerTaskExecutor(tmResourceId); assertThat( - registerTaskExecutorFuture.get(TIMEOUT_SEC, TimeUnit.SECONDS), - instanceOf(RegistrationResponse.Success.class)); + registerTaskExecutorFuture.get( Review Comment: Is the get needed? -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: issues-unsubscr...@flink.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org