tillrohrmann commented on a change in pull request #13644:
URL: https://github.com/apache/flink/pull/13644#discussion_r516800376



##########
File path: 
flink-runtime/src/main/java/org/apache/flink/runtime/leaderelection/LeaderInformation.java
##########
@@ -74,7 +78,7 @@ public int hashCode() {
                return result;
        }
 
-       public static LeaderInformation known(@Nullable UUID leaderSessionID, 
@Nullable String leaderAddress) {
+       public static LeaderInformation known(UUID leaderSessionID, String 
leaderAddress) {

Review comment:
       add assert that `leaderSessionID` and `leaderAddress` is not null. 
Otherwise this method will allow us to break `isEmpty`.

##########
File path: 
flink-runtime/src/test/java/org/apache/flink/runtime/concurrent/FutureUtilsTest.java
##########
@@ -189,7 +190,7 @@ public void testStopAtNonRetryableException() {
                        retryFuture.get();
                        fail("Exception should be thrown.");
                } catch (Exception ex) {
-                       assertThat(ExceptionUtils.findThrowableWithMessage(ex, 
notRetryExceptionMsg).isPresent(), is(true));
+                       assertThat(ex, FlinkMatchers.containsCause(new 
FlinkRuntimeException(notRetryExceptionMsg)));

Review comment:
       nit: `new FlinkRuntimeException(notRetryExceptionMsg)` could be 
deduplicated.

##########
File path: 
flink-runtime/src/test/java/org/apache/flink/runtime/leaderelection/DefaultLeaderElectionServiceTest.java
##########
@@ -177,25 +178,32 @@ public void testOldConfirmLeaderInformation() throws 
Exception {
        public void testErrorForwarding() throws Exception {
                new Context() {{
                        runTest(() -> {
-                               final Exception testException = new 
Exception("test leader Exeption");
+                               final Exception testException = new 
Exception("test leader exception");
 
-                               
testingLeaderElectionDriver.getFatalErrorHandler().onFatalError(testException);
+                               
testingLeaderElectionDriver.onFatalError(testException);
 
                                testingContender.waitForError(timeout);
                                assertThat(testingContender.getError(), 
is(notNullValue()));
-                               
assertThat(testingContender.getError().getMessage(), 
containsString(testException.getMessage()));
+                               assertThat(testingContender.getError(), 
FlinkMatchers.containsCause(testException));
                        });
                }};
        }
 
        @Test
-       public void testErrorHappenAfterStop() throws Exception {
+       public void testErrorIsIgnoredAfterBeingStop() throws Exception {
                new Context() {{
                        runTest(() -> {
-                               final Exception testException = new 
Exception("test leader Exeption");
+                               final Exception testException = new 
Exception("test leader exception");
 
                                leaderElectionService.stop();
-                               
testingLeaderElectionDriver.getFatalErrorHandler().onFatalError(testException);
+                               
testingLeaderElectionDriver.onFatalError(testException);
+
+                               try {
+                                       testingContender.waitForError(timeout);

Review comment:
       Is the timeout really 3 seconds long? This might be a bit long for this 
test case. I would suggest to wait at most 100 or even 50 ms.

##########
File path: 
flink-kubernetes/src/test/java/org/apache/flink/kubernetes/kubeclient/Fabric8FlinkKubeClientITCase.java
##########
@@ -107,7 +108,7 @@ public void testCheckAndUpdateConfigMapConcurrently() 
throws Exception {
                                                // noop
                                        }
                                }
-                       }, kubernetesResource.getExecutorService()));
+                       }));

Review comment:
       I think we should not use the `ForkJoinPool#commonPool` here. Instead I 
would suggest to start a testing executor.

##########
File path: 
flink-runtime/src/test/java/org/apache/flink/runtime/leaderretrieval/DefaultLeaderRetrievalServiceTest.java
##########
@@ -75,24 +77,31 @@ public void testNotifyLeaderAddressEmpty() throws Exception 
{
        public void testErrorForwarding() throws Exception {
                new Context() {{
                        runTest(() -> {
-                               final Exception testException = new 
Exception("test Exeption");
+                               final Exception testException = new 
Exception("test exception");
 
-                               
testingLeaderRetrievalDriver.getFatalErrorHandler().onFatalError(testException);
+                               
testingLeaderRetrievalDriver.onFatalError(testException);
 
-                               
assertThat(testingListener.getError().getMessage(), 
containsString(testException.getMessage()));
+                               testingListener.waitForError(timeout);
+                               assertThat(testingListener.getError(), 
FlinkMatchers.containsCause(testException));
                        });
                }};
        }
 
        @Test
-       public void testErrorHappenAfterStop() throws Exception {
+       public void testErrorIsIgnoredAfterBeingStop() throws Exception {
                new Context() {{
                        runTest(() -> {
-                               final Exception testException = new 
Exception("test Exeption");
+                               final Exception testException = new 
Exception("test exception");
 
                                leaderRetrievalService.stop();
-                               
testingLeaderRetrievalDriver.getFatalErrorHandler().onFatalError(testException);
-
+                               
testingLeaderRetrievalDriver.onFatalError(testException);
+
+                               try {
+                                       testingListener.waitForError(timeout);

Review comment:
       Same here with the timeout.

##########
File path: 
flink-kubernetes/src/test/java/org/apache/flink/kubernetes/highavailability/KubernetesLeaderElectionDriverTest.java
##########
@@ -118,7 +122,9 @@ public void 
testWriteLeaderInformationWhenConfigMapNotExist() throws Exception {
                                        final String errorMsg = "Could not 
write leader information since ConfigMap "
                                                + LEADER_CONFIGMAP_NAME + " 
does not exist.";
                                        
assertThat(electionEventHandler.getError(), is(notNullValue()));
-                                       
assertThat(electionEventHandler.getError().getMessage(), 
containsString(errorMsg));
+                                       assertThat(
+                                               electionEventHandler.getError(),
+                                               FlinkMatchers.containsCause(new 
KubernetesException(errorMsg)));

Review comment:
       Maybe it makes sense to add a new matcher which only checks the message: 
`FlinkMatchers.containsMessage(errorMsg)`. That way the test would not depend 
on the actually thrown exception type (here `KubernetesException`).

##########
File path: 
flink-kubernetes/src/test/java/org/apache/flink/kubernetes/kubeclient/resources/KubernetesLeaderElectorITCase.java
##########
@@ -80,21 +75,25 @@ public void testMultipleKubernetesLeaderElectors() throws 
Exception {
                        }
 
                        // Wait for the first leader
-                       final String firstLockIdentity = 
leaderStore.poll(TIMEOUT, TimeUnit.MILLISECONDS);
+                       final String firstLockIdentity = 
TestingLeaderCallbackHandler.waitUntilNewLeaderAppear(TIMEOUT);

Review comment:
       nit: `waitUntilNewLeaderAppears`




----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
[email protected]


Reply via email to