This is an automated email from the ASF dual-hosted git repository. mapohl pushed a commit to branch release-1.18 in repository https://gitbox.apache.org/repos/asf/flink.git
commit 38690d06e00bb17fc13709ec1b7582c896ccb034 Author: Matthias Pohl <[email protected]> AuthorDate: Mon Jan 29 09:28:27 2024 +0100 [hotfix][test] Refactors TestingLeaderCallbackHandler to allow async calls This way, we can use FlinkAssertions#assertThatFuture and use assertion messages instead of comments. --- .../resources/TestingLeaderCallbackHandler.java | 42 ++++++++++++++++------ 1 file changed, 32 insertions(+), 10 deletions(-) diff --git a/flink-kubernetes/src/test/java/org/apache/flink/kubernetes/kubeclient/resources/TestingLeaderCallbackHandler.java b/flink-kubernetes/src/test/java/org/apache/flink/kubernetes/kubeclient/resources/TestingLeaderCallbackHandler.java index bdfd86ff534..997128efd55 100644 --- a/flink-kubernetes/src/test/java/org/apache/flink/kubernetes/kubeclient/resources/TestingLeaderCallbackHandler.java +++ b/flink-kubernetes/src/test/java/org/apache/flink/kubernetes/kubeclient/resources/TestingLeaderCallbackHandler.java @@ -18,9 +18,11 @@ package org.apache.flink.kubernetes.kubeclient.resources; -import org.apache.flink.runtime.testutils.CommonTestUtils; +import org.apache.flink.util.Preconditions; import java.util.concurrent.BlockingQueue; +import java.util.concurrent.CompletableFuture; +import java.util.concurrent.CompletionException; import java.util.concurrent.LinkedBlockingQueue; /** Testing implementation for {@link KubernetesLeaderElector.LeaderCallbackHandler}. */ @@ -60,22 +62,42 @@ public class TestingLeaderCallbackHandler extends KubernetesLeaderElector.Leader } public static String waitUntilNewLeaderAppears() throws Exception { - return sharedQueue.take(); + return retrieveNextEventAsync(sharedQueue).get(); + } + + public static CompletableFuture<String> retrieveNextEventAsync( + BlockingQueue<String> eventQueue) { + return CompletableFuture.supplyAsync( + () -> { + try { + return eventQueue.take(); + } catch (InterruptedException e) { + Thread.currentThread().interrupt(); + throw new CompletionException(e); + } + }); } public void waitForNewLeader() throws Exception { - poll(leaderQueue); + waitForNewLeaderAsync().get(); + } + + public CompletableFuture<Void> waitForNewLeaderAsync() { + return waitForNextEvent(leaderQueue); } public void waitForRevokeLeader() throws Exception { - poll(revokeQueue); + waitForRevokeLeaderAsync().get(); } - private void poll(BlockingQueue<String> queue) throws Exception { - CommonTestUtils.waitUntilCondition( - () -> { - final String lockIdentity = queue.take(); - return this.lockIdentity.equals(lockIdentity); - }); + public CompletableFuture<Void> waitForRevokeLeaderAsync() { + return waitForNextEvent(revokeQueue); + } + + private CompletableFuture<Void> waitForNextEvent(BlockingQueue<String> eventQueue) { + return retrieveNextEventAsync(eventQueue) + .thenAccept( + eventLockIdentity -> + Preconditions.checkState(eventLockIdentity.equals(lockIdentity))); } }
