tillrohrmann commented on a change in pull request #13644:
URL: https://github.com/apache/flink/pull/13644#discussion_r516800376
##########
File path:
flink-runtime/src/main/java/org/apache/flink/runtime/leaderelection/LeaderInformation.java
##########
@@ -74,7 +78,7 @@ public int hashCode() {
return result;
}
- public static LeaderInformation known(@Nullable UUID leaderSessionID,
@Nullable String leaderAddress) {
+ public static LeaderInformation known(UUID leaderSessionID, String
leaderAddress) {
Review comment:
add assert that `leaderSessionID` and `leaderAddress` is not null.
Otherwise this method will allow us to break `isEmpty`.
##########
File path:
flink-runtime/src/test/java/org/apache/flink/runtime/concurrent/FutureUtilsTest.java
##########
@@ -189,7 +190,7 @@ public void testStopAtNonRetryableException() {
retryFuture.get();
fail("Exception should be thrown.");
} catch (Exception ex) {
- assertThat(ExceptionUtils.findThrowableWithMessage(ex,
notRetryExceptionMsg).isPresent(), is(true));
+ assertThat(ex, FlinkMatchers.containsCause(new
FlinkRuntimeException(notRetryExceptionMsg)));
Review comment:
nit: `new FlinkRuntimeException(notRetryExceptionMsg)` could be
deduplicated.
##########
File path:
flink-runtime/src/test/java/org/apache/flink/runtime/leaderelection/DefaultLeaderElectionServiceTest.java
##########
@@ -177,25 +178,32 @@ public void testOldConfirmLeaderInformation() throws
Exception {
public void testErrorForwarding() throws Exception {
new Context() {{
runTest(() -> {
- final Exception testException = new
Exception("test leader Exeption");
+ final Exception testException = new
Exception("test leader exception");
-
testingLeaderElectionDriver.getFatalErrorHandler().onFatalError(testException);
+
testingLeaderElectionDriver.onFatalError(testException);
testingContender.waitForError(timeout);
assertThat(testingContender.getError(),
is(notNullValue()));
-
assertThat(testingContender.getError().getMessage(),
containsString(testException.getMessage()));
+ assertThat(testingContender.getError(),
FlinkMatchers.containsCause(testException));
});
}};
}
@Test
- public void testErrorHappenAfterStop() throws Exception {
+ public void testErrorIsIgnoredAfterBeingStop() throws Exception {
new Context() {{
runTest(() -> {
- final Exception testException = new
Exception("test leader Exeption");
+ final Exception testException = new
Exception("test leader exception");
leaderElectionService.stop();
-
testingLeaderElectionDriver.getFatalErrorHandler().onFatalError(testException);
+
testingLeaderElectionDriver.onFatalError(testException);
+
+ try {
+ testingContender.waitForError(timeout);
Review comment:
Is the timeout really 3 seconds long? This might be a bit long for this
test case. I would suggest to wait at most 100 or even 50 ms.
##########
File path:
flink-kubernetes/src/test/java/org/apache/flink/kubernetes/kubeclient/Fabric8FlinkKubeClientITCase.java
##########
@@ -107,7 +108,7 @@ public void testCheckAndUpdateConfigMapConcurrently()
throws Exception {
// noop
}
}
- }, kubernetesResource.getExecutorService()));
+ }));
Review comment:
I think we should not use the `ForkJoinPool#commonPool` here. Instead I
would suggest to start a testing executor.
##########
File path:
flink-runtime/src/test/java/org/apache/flink/runtime/leaderretrieval/DefaultLeaderRetrievalServiceTest.java
##########
@@ -75,24 +77,31 @@ public void testNotifyLeaderAddressEmpty() throws Exception
{
public void testErrorForwarding() throws Exception {
new Context() {{
runTest(() -> {
- final Exception testException = new
Exception("test Exeption");
+ final Exception testException = new
Exception("test exception");
-
testingLeaderRetrievalDriver.getFatalErrorHandler().onFatalError(testException);
+
testingLeaderRetrievalDriver.onFatalError(testException);
-
assertThat(testingListener.getError().getMessage(),
containsString(testException.getMessage()));
+ testingListener.waitForError(timeout);
+ assertThat(testingListener.getError(),
FlinkMatchers.containsCause(testException));
});
}};
}
@Test
- public void testErrorHappenAfterStop() throws Exception {
+ public void testErrorIsIgnoredAfterBeingStop() throws Exception {
new Context() {{
runTest(() -> {
- final Exception testException = new
Exception("test Exeption");
+ final Exception testException = new
Exception("test exception");
leaderRetrievalService.stop();
-
testingLeaderRetrievalDriver.getFatalErrorHandler().onFatalError(testException);
-
+
testingLeaderRetrievalDriver.onFatalError(testException);
+
+ try {
+ testingListener.waitForError(timeout);
Review comment:
Same here with the timeout.
##########
File path:
flink-kubernetes/src/test/java/org/apache/flink/kubernetes/highavailability/KubernetesLeaderElectionDriverTest.java
##########
@@ -118,7 +122,9 @@ public void
testWriteLeaderInformationWhenConfigMapNotExist() throws Exception {
final String errorMsg = "Could not
write leader information since ConfigMap "
+ LEADER_CONFIGMAP_NAME + "
does not exist.";
assertThat(electionEventHandler.getError(), is(notNullValue()));
-
assertThat(electionEventHandler.getError().getMessage(),
containsString(errorMsg));
+ assertThat(
+ electionEventHandler.getError(),
+ FlinkMatchers.containsCause(new
KubernetesException(errorMsg)));
Review comment:
Maybe it makes sense to add a new matcher which only checks the message:
`FlinkMatchers.containsMessage(errorMsg)`. That way the test would not depend
on the actually thrown exception type (here `KubernetesException`).
##########
File path:
flink-kubernetes/src/test/java/org/apache/flink/kubernetes/kubeclient/resources/KubernetesLeaderElectorITCase.java
##########
@@ -80,21 +75,25 @@ public void testMultipleKubernetesLeaderElectors() throws
Exception {
}
// Wait for the first leader
- final String firstLockIdentity =
leaderStore.poll(TIMEOUT, TimeUnit.MILLISECONDS);
+ final String firstLockIdentity =
TestingLeaderCallbackHandler.waitUntilNewLeaderAppear(TIMEOUT);
Review comment:
nit: `waitUntilNewLeaderAppears`
----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
For queries about this service, please contact Infrastructure at:
[email protected]