1996fanrui commented on code in PR #23973:
URL: https://github.com/apache/flink/pull/23973#discussion_r1432906607


##########
flink-runtime/src/test/java/org/apache/flink/runtime/resourcemanager/ResourceManagerJobMasterTest.java:
##########
@@ -191,12 +184,12 @@ public void 
testRegisterJobMasterWithUnmatchedLeaderSessionId2() throws Exceptio
                         jobMasterGateway.getAddress(),
                         jobId,
                         TIMEOUT);
-        assertTrue(unMatchedLeaderFuture.get() instanceof 
RegistrationResponse.Failure);
+        
assertThat(unMatchedLeaderFuture.get()).isInstanceOf(RegistrationResponse.Failure.class);

Review Comment:
   ```suggestion
           assertThatFuture(unMatchedLeaderFuture)
                   .withThrowableOfType(RegistrationResponse.Failure.class)
   ```



##########
flink-runtime/src/test/java/org/apache/flink/runtime/resourcemanager/ResourceManagerJobMasterTest.java:
##########
@@ -169,18 +166,14 @@ public void 
testRegisterJobMasterWithUnmatchedLeaderSessionId1() throws Exceptio
                         jobMasterGateway.getAddress(),
                         jobId,
                         TIMEOUT);
-
-        try {
-            unMatchedLeaderFuture.get(5L, TimeUnit.SECONDS);
-            fail("Should fail because we are using the wrong fencing token.");
-        } catch (ExecutionException e) {
-            assertTrue(ExceptionUtils.stripExecutionException(e) instanceof 
FencingTokenException);
-        }
+        assertThatThrownBy(() -> unMatchedLeaderFuture.get(5L, 
TimeUnit.SECONDS))
+                .withFailMessage("Should fail because we are using the wrong 
fencing token.")
+                .hasCauseInstanceOf(FencingTokenException.class);

Review Comment:
   How about using `assertThatFuture(invalidAddressFuture)`?



##########
flink-runtime/src/test/java/org/apache/flink/runtime/resourcemanager/ResourceManagerServiceImplTest.java:
##########
@@ -309,12 +305,11 @@ public void revokeLeadership_stopExistLeader() throws 
Exception {
         leaderElection.notLeader();
 
         // should terminate RM
-        assertThat(terminateRmFuture.get(), is(leaderSessionId));
+        assertThat(terminateRmFuture.get()).isSameAs(leaderSessionId);

Review Comment:
   How about assertThatFuture?



##########
flink-runtime/src/test/java/org/apache/flink/runtime/resourcemanager/ResourceManagerJobMasterTest.java:
##########
@@ -207,17 +200,16 @@ public void testRegisterJobMasterFromInvalidAddress() 
throws Exception {
                         invalidAddress,
                         jobId,
                         TIMEOUT);
-        assertTrue(
-                invalidAddressFuture.get(5, TimeUnit.SECONDS)
-                        instanceof RegistrationResponse.Failure);
+        assertThat(invalidAddressFuture.get(5, TimeUnit.SECONDS))
+                .isInstanceOf(RegistrationResponse.Failure.class);

Review Comment:
   ```suggestion
           assertThatFuture(invalidAddressFuture)
                   .failsWithin(5, TimeUnit.SECONDS)
                   .withThrowableOfType(RegistrationResponse.Failure.class)
   ```



##########
flink-runtime/src/test/java/org/apache/flink/runtime/resourcemanager/active/ActiveResourceManagerTest.java:
##########
@@ -871,32 +868,34 @@ public void testRecoverWorkerFromPreviousAttempt() throws 
Exception {
                             CompletableFuture<RegistrationResponse> 
registerTaskExecutorFuture =
                                     registerTaskExecutor(tmResourceId);
                             assertThat(
-                                    
registerTaskExecutorFuture.get(TIMEOUT_SEC, TimeUnit.SECONDS),
-                                    
instanceOf(RegistrationResponse.Success.class));
+                                            registerTaskExecutorFuture.get(
+                                                    TIMEOUT_SEC, 
TimeUnit.SECONDS))
+                                    
.isInstanceOf(RegistrationResponse.Success.class);
                         });
             }
         };
     }
 
     /** Tests decline unknown worker registration. */
     @Test
-    public void testRegisterUnknownWorker() throws Exception {
+    void testRegisterUnknownWorker() throws Exception {
         new Context() {
             {
                 runTest(
                         () -> {
                             CompletableFuture<RegistrationResponse> 
registerTaskExecutorFuture =
                                     
registerTaskExecutor(ResourceID.generate());
                             assertThat(
-                                    
registerTaskExecutorFuture.get(TIMEOUT_SEC, TimeUnit.SECONDS),
-                                    
instanceOf(RegistrationResponse.Rejection.class));
+                                            registerTaskExecutorFuture.get(

Review Comment:
   Is the get needed?



##########
flink-runtime/src/test/java/org/apache/flink/runtime/resourcemanager/active/ActiveResourceManagerTest.java:
##########
@@ -940,16 +939,15 @@ public void testWorkerRegistrationTimeout() throws 
Exception {
                                                     
.requestNewWorker(WORKER_RESOURCE_SPEC));
 
                             // verify worker is released due to not registered 
in time
-                            assertThat(
-                                    releaseResourceFuture.get(TIMEOUT_SEC, 
TimeUnit.SECONDS),
-                                    is(tmResourceId));
+                            assertThat(releaseResourceFuture.get(TIMEOUT_SEC, 
TimeUnit.SECONDS))

Review Comment:
   Is the get needed?



##########
flink-runtime/src/test/java/org/apache/flink/runtime/resourcemanager/active/ActiveResourceManagerTest.java:
##########
@@ -193,15 +186,15 @@ public void testLessThanDeclareResource() throws 
Exception {
                             declareResourceFuture.get(TIMEOUT_SEC, 
TimeUnit.SECONDS);
 
                             // request new worker.
-                            assertThat(requestCount.get(), is(3));
+                            assertThat(requestCount.get()).isEqualTo(3);

Review Comment:
   This class has lots of `AtomicInteger`, please update all of them, thanks~



##########
flink-runtime/src/test/java/org/apache/flink/runtime/resourcemanager/ResourceManagerServiceImplTest.java:
##########
@@ -254,12 +249,13 @@ public void 
grantLeadership_withExistingLeader_waitTerminationOfExistingLeader()
         finishRmTerminationFuture.complete(null);
 
         // should start new RM and confirm leader session
-        assertThat(startRmFuture2.get(), is(leaderSessionId2));
-        assertThat(confirmedLeaderInformation.get().getLeaderSessionID(), 
is(leaderSessionId2));
+        assertThat(startRmFuture2.get()).isSameAs(leaderSessionId2);

Review Comment:
   How about assertThatFuture?



##########
flink-runtime/src/test/java/org/apache/flink/runtime/resourcemanager/ResourceManagerServiceImplTest.java:
##########
@@ -365,7 +360,7 @@ public void nonLeaderRmTerminated_doseNotTerminateService() 
throws Exception {
 
         // revoke leadership
         leaderElection.notLeader();
-        assertThat(terminateRmFuture.get(), is(leaderSessionId));
+        assertThat(terminateRmFuture.get()).isSameAs(leaderSessionId);

Review Comment:
   How about assertThatFuture?



##########
flink-runtime/src/test/java/org/apache/flink/runtime/resourcemanager/DefaultJobLeaderIdServiceTest.java:
##########
@@ -122,23 +117,19 @@ public void testRemovingJob() throws Exception {
         // remove the job before we could find a leader
         jobLeaderIdService.removeJob(jobId);
 
-        assertFalse(jobLeaderIdService.containsJob(jobId));
-
-        try {
-            leaderIdFuture.get();
+        assertThat(jobLeaderIdService.containsJob(jobId)).isFalse();
 
-            fail("The leader id future should be completed exceptionally.");
-        } catch (ExecutionException ignored) {
-            // expected exception
-        }
+        assertThatThrownBy(leaderIdFuture::get)
+                .withFailMessage("The leader id future should be completed 
exceptionally.")
+                .isInstanceOf(ExecutionException.class);

Review Comment:
   How about `assertThatFuture`?



##########
flink-runtime/src/test/java/org/apache/flink/runtime/resourcemanager/ResourceManagerServiceImplTest.java:
##########
@@ -213,14 +208,14 @@ public void 
grantLeadership_withExistingLeader_stopExistLeader() throws Exceptio
                 leaderElection.isLeader(leaderSessionId2);
 
         // should terminate first RM, start a new RM and confirm leader session
-        assertThat(terminateRmFuture.get(), is(leaderSessionId1));
-        assertThat(startRmFuture2.get(), is(leaderSessionId2));
-        assertThat(confirmedLeaderInformation.get().getLeaderSessionID(), 
is(leaderSessionId2));
+        assertThat(terminateRmFuture.get()).isSameAs(leaderSessionId1);
+        assertThat(startRmFuture2.get()).isSameAs(leaderSessionId2);

Review Comment:
   How about assertThatFuture?



##########
flink-runtime/src/test/java/org/apache/flink/runtime/resourcemanager/ResourceManagerJobMasterTest.java:
##########
@@ -229,13 +221,12 @@ public void 
testRegisterJobMasterWithFailureLeaderListener() throws Exception {
                         unknownJobIDToHAServices,
                         TIMEOUT);
 
-        try {
-            registrationFuture.get(TIMEOUT.toMilliseconds(), 
TimeUnit.MILLISECONDS);
-            fail("Expected to fail with a ResourceManagerException.");
-        } catch (ExecutionException e) {
-            assertTrue(
-                    ExceptionUtils.stripExecutionException(e) instanceof 
ResourceManagerException);
-        }
+        assertThatThrownBy(
+                        () ->
+                                registrationFuture.get(
+                                        TIMEOUT.toMilliseconds(), 
TimeUnit.MILLISECONDS))
+                .withFailMessage("Expected to fail with a 
ResourceManagerException.")
+                .isInstanceOf(ResourceManagerException.class);

Review Comment:
   How about assertThatFuture?



##########
flink-runtime/src/test/java/org/apache/flink/runtime/resourcemanager/ResourceManagerPartitionLifecycleTest.java:
##########
@@ -195,7 +192,7 @@ public static void registerTaskExecutor(
                 resourceManagerGateway.registerTaskExecutor(
                         taskExecutorRegistration, TestingUtils.TIMEOUT);
 
-        assertThat(registrationFuture.get(), 
instanceOf(RegistrationResponse.Success.class));
+        
assertThat(registrationFuture.get()).isInstanceOf(RegistrationResponse.Success.class);

Review Comment:
   How about assertThatFuture?



##########
flink-runtime/src/test/java/org/apache/flink/runtime/resourcemanager/ResourceManagerServiceImplTest.java:
##########
@@ -497,40 +492,31 @@ public void grantAndRevokeLeadership_verifyMetrics() 
throws Exception {
                         MetricNames.NUM_REGISTERED_TASK_MANAGERS,
                         MetricNames.TASK_SLOTS_TOTAL,
                         MetricNames.TASK_SLOTS_AVAILABLE);
-        Assert.assertTrue(
-                "Expected RM to register leader metrics",
-                registeredMetrics.containsAll(expectedMetrics));
+        assertThat(registeredMetrics)
+                .as("Expected RM to register leader metrics")
+                .containsAll(expectedMetrics);
 
         // revoke leadership, block until old rm is terminated
         revokeLeadership();
 
         Set<String> intersection = new HashSet<>(registeredMetrics);
         intersection.retainAll(expectedMetrics);
-        Assert.assertTrue("Expected RM to unregister leader metrics", 
intersection.isEmpty());
+        assertThat(intersection).as("Expected RM to unregister leader 
metrics").isEmpty();
 
         leaderElection.isLeader(UUID.randomUUID()).join();
 
-        Assert.assertTrue(
-                "Expected RM to re-register leader metrics",
-                registeredMetrics.containsAll(expectedMetrics));
+        assertThat(registeredMetrics)
+                .as("Expected RM to re-register leader metrics")
+                .containsAll(expectedMetrics);
     }
 
     private static void blockOnFuture(CompletableFuture<?> future) {
-        try {
-            future.get();
-        } catch (Exception e) {
-            e.printStackTrace();
-            fail();
-        }
+        assertThat(future).isCompletedExceptionally();
     }
 
     private static void assertNotComplete(CompletableFuture<?> future) throws 
Exception {
-        try {
-            future.get(50, TimeUnit.MILLISECONDS);
-            fail();
-        } catch (TimeoutException e) {
-            // expected
-        }
+        assertThatThrownBy(() -> future.get(50, TimeUnit.MILLISECONDS))

Review Comment:
   How about assertThatFuture?



##########
flink-runtime/src/test/java/org/apache/flink/runtime/resourcemanager/ResourceManagerServiceImplTest.java:
##########
@@ -156,12 +151,12 @@ public void 
grantLeadership_startRmAndConfirmLeaderSession() throws Exception {
                 leaderElection.isLeader(leaderSessionId);
 
         // should start new RM and confirm leader session
-        assertThat(startRmFuture.get(), is(leaderSessionId));
-        assertThat(confirmedLeaderInformation.get().getLeaderSessionID(), 
is(leaderSessionId));
+        assertThat(startRmFuture.get()).isSameAs(leaderSessionId);

Review Comment:
   How about assertThatFuture?



##########
flink-runtime/src/test/java/org/apache/flink/runtime/resourcemanager/ResourceManagerTaskExecutorTest.java:
##########
@@ -336,31 +321,28 @@ private Collection<SlotStatus> createSlots(int 
numberSlots) {
 
     /** Test receive registration with unmatched leadershipId from task 
executor. */
     @Test
-    public void testRegisterTaskExecutorWithUnmatchedLeaderSessionId() throws 
Exception {
+    void testRegisterTaskExecutorWithUnmatchedLeaderSessionId() throws 
Exception {
         // test throw exception when receive a registration from taskExecutor 
which takes unmatched
         // leaderSessionId
         CompletableFuture<RegistrationResponse> unMatchedLeaderFuture =
                 registerTaskExecutor(wronglyFencedGateway, 
taskExecutorGateway.getAddress());
 
-        try {
-            unMatchedLeaderFuture.get();
-            fail(
-                    "Should have failed because we are using a wrongly fenced 
ResourceManagerGateway.");
-        } catch (ExecutionException e) {
-            assertTrue(ExceptionUtils.stripExecutionException(e) instanceof 
FencingTokenException);
-        }
+        assertThatThrownBy(unMatchedLeaderFuture::get)

Review Comment:
   How about assertThatFuture?



##########
flink-runtime/src/test/java/org/apache/flink/runtime/resourcemanager/ResourceManagerServiceImplTest.java:
##########
@@ -385,18 +380,18 @@ public void closeService_stopRmAndLeaderElection() throws 
Exception {
         // grant leadership
         leaderElection.isLeader(UUID.randomUUID()).join();
 
-        assertFalse(leaderElection.isStopped());
+        assertThat(leaderElection.isStopped()).isFalse();
 
         // close service
         resourceManagerService.close();
 
         // should stop RM and leader election
-        assertTrue(terminateRmFuture.isDone());
-        assertTrue(leaderElection.isStopped());
+        assertThat(terminateRmFuture.isDone()).isTrue();

Review Comment:
   How about assertThatFuture?



##########
flink-runtime/src/test/java/org/apache/flink/runtime/resourcemanager/ResourceManagerTaskExecutorTest.java:
##########
@@ -234,17 +225,12 @@ public void testDelayedRegisterTaskExecutor() throws 
Exception {
 
             CompletableFuture<RegistrationResponse> firstFuture =
                     rmGateway.registerTaskExecutor(taskExecutorRegistration, 
fastTimeout);
-            try {
-                firstFuture.get();
-                fail(
-                        "Should have failed because connection to taskmanager 
is delayed beyond timeout");
-            } catch (Exception e) {
-                final Throwable cause = 
ExceptionUtils.stripExecutionException(e);
-                assertThat(cause, instanceOf(TimeoutException.class));
-                assertThat(
-                        cause.getMessage(),
-                        
containsString("ResourceManagerGateway.registerTaskExecutor"));
-            }
+            assertThatThrownBy(firstFuture::get)

Review Comment:
   How about assertThatFuture?



##########
flink-runtime/src/test/java/org/apache/flink/runtime/resourcemanager/active/ActiveResourceManagerTest.java:
##########
@@ -193,15 +186,15 @@ public void testLessThanDeclareResource() throws 
Exception {
                             declareResourceFuture.get(TIMEOUT_SEC, 
TimeUnit.SECONDS);
 
                             // request new worker.
-                            assertThat(requestCount.get(), is(3));
+                            assertThat(requestCount.get()).isEqualTo(3);

Review Comment:
   get isn't needed.



##########
flink-runtime/src/test/java/org/apache/flink/runtime/resourcemanager/ResourceManagerTaskExecutorTest.java:
##########
@@ -336,31 +321,28 @@ private Collection<SlotStatus> createSlots(int 
numberSlots) {
 
     /** Test receive registration with unmatched leadershipId from task 
executor. */
     @Test
-    public void testRegisterTaskExecutorWithUnmatchedLeaderSessionId() throws 
Exception {
+    void testRegisterTaskExecutorWithUnmatchedLeaderSessionId() throws 
Exception {
         // test throw exception when receive a registration from taskExecutor 
which takes unmatched
         // leaderSessionId
         CompletableFuture<RegistrationResponse> unMatchedLeaderFuture =
                 registerTaskExecutor(wronglyFencedGateway, 
taskExecutorGateway.getAddress());
 
-        try {
-            unMatchedLeaderFuture.get();
-            fail(
-                    "Should have failed because we are using a wrongly fenced 
ResourceManagerGateway.");
-        } catch (ExecutionException e) {
-            assertTrue(ExceptionUtils.stripExecutionException(e) instanceof 
FencingTokenException);
-        }
+        assertThatThrownBy(unMatchedLeaderFuture::get)
+                .withFailMessage(
+                        "Should have failed because we are using a wrongly 
fenced ResourceManagerGateway.")
+                .hasCauseInstanceOf(FencingTokenException.class);
     }
 
     /** Test receive registration with invalid address from task executor. */
     @Test
-    public void testRegisterTaskExecutorFromInvalidAddress() throws Exception {
+    void testRegisterTaskExecutorFromInvalidAddress() throws Exception {
         // test throw exception when receive a registration from taskExecutor 
which takes invalid
         // address
         String invalidAddress = "/taskExecutor2";
 
         CompletableFuture<RegistrationResponse> invalidAddressFuture =
                 registerTaskExecutor(rmGateway, invalidAddress);
-        assertTrue(invalidAddressFuture.get() instanceof 
RegistrationResponse.Failure);
+        
assertThat(invalidAddressFuture.get()).isInstanceOf(RegistrationResponse.Failure.class);

Review Comment:
   How about assertThatFuture?



##########
flink-runtime/src/test/java/org/apache/flink/runtime/resourcemanager/active/ActiveResourceManagerTest.java:
##########
@@ -175,7 +168,7 @@ public void testLessThanDeclareResource() throws Exception {
                                                             
.requestNewWorker(WORKER_RESOURCE_SPEC))
                                     .get(TIMEOUT_SEC, TimeUnit.SECONDS);
 
-                            assertThat(requestCount.get(), is(2));
+                            assertThat(requestCount.get()).isEqualTo(2);

Review Comment:
   ```suggestion
                               assertThat(requestCount).hasValue(2);
   ```



##########
flink-runtime/src/test/java/org/apache/flink/runtime/resourcemanager/active/ActiveResourceManagerTest.java:
##########
@@ -557,22 +553,23 @@ public void testWorkerTerminatedAfterRegister() throws 
Exception {
                                             .get(1)
                                             .get(TIMEOUT_SEC, 
TimeUnit.SECONDS);
 
-                            assertThat(taskExecutorProcessSpec2, 
is(taskExecutorProcessSpec1));
+                            
assertThat(taskExecutorProcessSpec2).isSameAs(taskExecutorProcessSpec1);
 
                             // second worker registered, verify registration 
succeed
                             CompletableFuture<RegistrationResponse> 
registerTaskExecutorFuture2 =
                                     registerTaskExecutor(tmResourceIds.get(1));
                             assertThat(
-                                    
registerTaskExecutorFuture2.get(TIMEOUT_SEC, TimeUnit.SECONDS),
-                                    
instanceOf(RegistrationResponse.Success.class));
+                                            registerTaskExecutorFuture2.get(

Review Comment:
   Is the get needed?



##########
flink-runtime/src/test/java/org/apache/flink/runtime/resourcemanager/active/ActiveResourceManagerTest.java:
##########
@@ -531,19 +527,19 @@ public void testWorkerTerminatedAfterRegister() throws 
Exception {
                                             .get(TIMEOUT_SEC, 
TimeUnit.SECONDS);
 
                             startNewWorkerFuture.get(TIMEOUT_SEC, 
TimeUnit.SECONDS);
-                            assertThat(
-                                    taskExecutorProcessSpec1,
-                                    is(
+                            assertThat(taskExecutorProcessSpec1)
+                                    .isSameAs(
                                             TaskExecutorProcessUtils
                                                     
.processSpecFromWorkerResourceSpec(
-                                                            flinkConfig, 
WORKER_RESOURCE_SPEC)));
+                                                            flinkConfig, 
WORKER_RESOURCE_SPEC));
 
                             // first worker registered, verify registration 
succeed
                             CompletableFuture<RegistrationResponse> 
registerTaskExecutorFuture1 =
                                     registerTaskExecutor(tmResourceIds.get(0));
                             assertThat(
-                                    
registerTaskExecutorFuture1.get(TIMEOUT_SEC, TimeUnit.SECONDS),
-                                    
instanceOf(RegistrationResponse.Success.class));
+                                            registerTaskExecutorFuture1.get(

Review Comment:
   Is the get needed?



##########
flink-runtime/src/test/java/org/apache/flink/runtime/resourcemanager/active/ActiveResourceManagerTest.java:
##########
@@ -609,19 +606,19 @@ public void testWorkerTerminatedNoLongerRequired() throws 
Exception {
                                             .get(TIMEOUT_SEC, 
TimeUnit.SECONDS);
 
                             startNewWorkerFuture.get(TIMEOUT_SEC, 
TimeUnit.SECONDS);
-                            assertThat(
-                                    taskExecutorProcessSpec,
-                                    is(
+                            assertThat(taskExecutorProcessSpec)
+                                    .isSameAs(
                                             TaskExecutorProcessUtils
                                                     
.processSpecFromWorkerResourceSpec(
-                                                            flinkConfig, 
WORKER_RESOURCE_SPEC)));
+                                                            flinkConfig, 
WORKER_RESOURCE_SPEC));
 
                             // worker registered, verify registration succeed
                             CompletableFuture<RegistrationResponse> 
registerTaskExecutorFuture =
                                     registerTaskExecutor(tmResourceId);
                             assertThat(
-                                    
registerTaskExecutorFuture.get(TIMEOUT_SEC, TimeUnit.SECONDS),
-                                    
instanceOf(RegistrationResponse.Success.class));
+                                            registerTaskExecutorFuture.get(

Review Comment:
   Is the get needed?



##########
flink-runtime/src/test/java/org/apache/flink/runtime/resourcemanager/active/ActiveResourceManagerTest.java:
##########
@@ -471,22 +466,23 @@ public void testWorkerTerminatedBeforeRegister() throws 
Exception {
                                             .get(1)
                                             .get(TIMEOUT_SEC, 
TimeUnit.SECONDS);
 
-                            assertThat(taskExecutorProcessSpec2, 
is(taskExecutorProcessSpec1));
+                            
assertThat(taskExecutorProcessSpec2).isSameAs(taskExecutorProcessSpec1);
 
                             // second worker registered, verify registration 
succeed
                             CompletableFuture<RegistrationResponse> 
registerTaskExecutorFuture =
                                     registerTaskExecutor(tmResourceIds.get(1));
                             assertThat(
-                                    
registerTaskExecutorFuture.get(TIMEOUT_SEC, TimeUnit.SECONDS),
-                                    
instanceOf(RegistrationResponse.Success.class));
+                                            registerTaskExecutorFuture.get(

Review Comment:
   Is the get needed?



##########
flink-runtime/src/test/java/org/apache/flink/runtime/resourcemanager/active/ActiveResourceManagerTest.java:
##########
@@ -756,23 +753,23 @@ public void 
testStartWorkerIntervalOnWorkerTerminationExceedFailureRate() throws
                                             .get(TIMEOUT_SEC, 
TimeUnit.SECONDS);
 
                             // validate trying creating worker twice, with 
proper interval
-                            assertThat(
-                                    (t2 - t1),
-                                    greaterThanOrEqualTo(
-                                            
TESTING_START_WORKER_INTERVAL.toMilliseconds()));
+                            assertThat((t2 - t1))
+                                    .isGreaterThanOrEqualTo(
+                                            
TESTING_START_WORKER_INTERVAL.toMilliseconds());
                             // second worker registered, verify registration 
succeed
                             CompletableFuture<RegistrationResponse> 
registerTaskExecutorFuture =
                                     registerTaskExecutor(tmResourceIds.get(1));
                             assertThat(
-                                    
registerTaskExecutorFuture.get(TIMEOUT_SEC, TimeUnit.SECONDS),
-                                    
instanceOf(RegistrationResponse.Success.class));
+                                            registerTaskExecutorFuture.get(

Review Comment:
   Is the get needed?



##########
flink-runtime/src/test/java/org/apache/flink/runtime/resourcemanager/active/ActiveResourceManagerTest.java:
##########
@@ -837,26 +834,26 @@ public void 
testStartWorkerIntervalOnRequestWorkerFailure() throws Exception {
                                             .get(TIMEOUT_SEC, 
TimeUnit.SECONDS);
 
                             // validate trying creating worker twice, with 
proper interval
-                            assertThat(
-                                    (t2 - t1),
-                                    greaterThanOrEqualTo(
-                                            
TESTING_START_WORKER_INTERVAL.toMilliseconds()));
+                            assertThat((t2 - t1))
+                                    .isGreaterThanOrEqualTo(
+                                            
TESTING_START_WORKER_INTERVAL.toMilliseconds());
 
                             // second worker registered, verify registration 
succeed
                             resourceIdFutures.get(1).complete(tmResourceId);
                             CompletableFuture<RegistrationResponse> 
registerTaskExecutorFuture =
                                     registerTaskExecutor(tmResourceId);
                             assertThat(
-                                    
registerTaskExecutorFuture.get(TIMEOUT_SEC, TimeUnit.SECONDS),
-                                    
instanceOf(RegistrationResponse.Success.class));
+                                            registerTaskExecutorFuture.get(

Review Comment:
   Is the get needed?



##########
flink-runtime/src/test/java/org/apache/flink/runtime/resourcemanager/active/ActiveResourceManagerTest.java:
##########
@@ -390,23 +385,24 @@ public void testStartNewWorkerFailedRequesting() throws 
Exception {
                                             .get(1)
                                             .get(TIMEOUT_SEC, 
TimeUnit.SECONDS);
 
-                            assertThat(taskExecutorProcessSpec2, 
is(taskExecutorProcessSpec1));
+                            
assertThat(taskExecutorProcessSpec2).isSameAs(taskExecutorProcessSpec1);
 
                             // second request allocated, verify registration 
succeed
                             runInMainThread(() -> 
resourceIdFutures.get(1).complete(tmResourceId));
                             CompletableFuture<RegistrationResponse> 
registerTaskExecutorFuture =
                                     registerTaskExecutor(tmResourceId);
                             assertThat(
-                                    
registerTaskExecutorFuture.get(TIMEOUT_SEC, TimeUnit.SECONDS),
-                                    
instanceOf(RegistrationResponse.Success.class));
+                                            registerTaskExecutorFuture.get(

Review Comment:
   Is the get needed?



##########
flink-runtime/src/test/java/org/apache/flink/runtime/resourcemanager/active/ActiveResourceManagerTest.java:
##########
@@ -871,32 +868,34 @@ public void testRecoverWorkerFromPreviousAttempt() throws 
Exception {
                             CompletableFuture<RegistrationResponse> 
registerTaskExecutorFuture =
                                     registerTaskExecutor(tmResourceId);
                             assertThat(
-                                    
registerTaskExecutorFuture.get(TIMEOUT_SEC, TimeUnit.SECONDS),
-                                    
instanceOf(RegistrationResponse.Success.class));
+                                            registerTaskExecutorFuture.get(

Review Comment:
   Is the get needed?



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscr...@flink.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org

Reply via email to