zhuzhurk commented on code in PR #20169:
URL: https://github.com/apache/flink/pull/20169#discussion_r914370881
##########
flink-runtime/src/test/java/org/apache/flink/runtime/blocklist/DefaultBlocklistHandlerTest.java:
##########
@@ -77,24 +87,53 @@ void testAddNewBlockedNodes() throws Exception {
@Test
void testRemoveTimeoutNodes() throws Exception {
+ final TestingComponentMainThreadExecutor mainThreadExecutor =
+ new TestingComponentMainThreadExecutor(
+
ComponentMainThreadExecutorServiceAdapter.forSingleThreadExecutor(
+ EXECUTOR_EXTENSION.getExecutor()));
+
long currentTimestamp = System.currentTimeMillis();
BlockedNode node1 = new BlockedNode("node1", "cause", currentTimestamp
+ 1000L);
BlockedNode node2 = new BlockedNode("node2", "cause", currentTimestamp
+ 3000L);
TestBlocklistContext context = new TestBlocklistContext();
- try (DefaultBlocklistHandler handler =
createDefaultBlocklistHandler(context)) {
-
- handler.addNewBlockedNodes(Arrays.asList(node1, node2));
- assertThat(handler.getAllBlockedNodeIds()).hasSize(2);
- assertThat(context.allUnBlockedNodes).hasSize(0);
+ try (DefaultBlocklistHandler handler =
+ createDefaultBlocklistHandler(
+ context, mainThreadExecutor.getMainThreadExecutor())) {
+ mainThreadExecutor.execute(
+ () -> {
+ handler.addNewBlockedNodes(Arrays.asList(node1,
node2));
+ assertThat(handler.getAllBlockedNodeIds()).hasSize(2);
+ assertThat(context.allUnBlockedNodes).hasSize(0);
+ });
// wait node1 timeout
- CommonTestUtils.waitUntilCondition(() ->
handler.getAllBlockedNodeIds().size() == 1);
- assertThat(context.allUnBlockedNodes).containsExactly(node1);
+ CommonTestUtils.waitUntilCondition(
+ () ->
+ mainThreadExecutor.execute(
+ () -> {
+ int nodes =
handler.getAllBlockedNodeIds().size();
+ if (nodes == 1) {
+
assertThat(context.allUnBlockedNodes)
+
.containsExactly(node1);
+ }
+ return nodes;
+ })
+ == 1);
// wait node2 timeout
- CommonTestUtils.waitUntilCondition(() ->
handler.getAllBlockedNodeIds().size() == 0);
- assertThat(context.allUnBlockedNodes).containsExactly(node1,
node2);
+ CommonTestUtils.waitUntilCondition(
Review Comment:
I'm afraid the case can still be unstable in the case that 2 nodes are
unblocked when conducting the first check (nodes == 1). This is possible to
happen if the environment is very slow.
I would propose to change the test by getting the job to one stable status
and do the first check, and then take some action to get the job to another
stable status and do the second check.
--
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
To unsubscribe, e-mail: [email protected]
For queries about this service, please contact Infrastructure at:
[email protected]