This is an automated email from the ASF dual-hosted git repository.

mapohl pushed a commit to branch release-1.18
in repository https://gitbox.apache.org/repos/asf/flink.git

commit 38690d06e00bb17fc13709ec1b7582c896ccb034
Author: Matthias Pohl <[email protected]>
AuthorDate: Mon Jan 29 09:28:27 2024 +0100

    [hotfix][test] Refactors TestingLeaderCallbackHandler to allow async calls
    
    This way, we can use FlinkAssertions#assertThatFuture and use assertion 
messages instead of comments.
---
 .../resources/TestingLeaderCallbackHandler.java    | 42 ++++++++++++++++------
 1 file changed, 32 insertions(+), 10 deletions(-)

diff --git 
a/flink-kubernetes/src/test/java/org/apache/flink/kubernetes/kubeclient/resources/TestingLeaderCallbackHandler.java
 
b/flink-kubernetes/src/test/java/org/apache/flink/kubernetes/kubeclient/resources/TestingLeaderCallbackHandler.java
index bdfd86ff534..997128efd55 100644
--- 
a/flink-kubernetes/src/test/java/org/apache/flink/kubernetes/kubeclient/resources/TestingLeaderCallbackHandler.java
+++ 
b/flink-kubernetes/src/test/java/org/apache/flink/kubernetes/kubeclient/resources/TestingLeaderCallbackHandler.java
@@ -18,9 +18,11 @@
 
 package org.apache.flink.kubernetes.kubeclient.resources;
 
-import org.apache.flink.runtime.testutils.CommonTestUtils;
+import org.apache.flink.util.Preconditions;
 
 import java.util.concurrent.BlockingQueue;
+import java.util.concurrent.CompletableFuture;
+import java.util.concurrent.CompletionException;
 import java.util.concurrent.LinkedBlockingQueue;
 
 /** Testing implementation for {@link 
KubernetesLeaderElector.LeaderCallbackHandler}. */
@@ -60,22 +62,42 @@ public class TestingLeaderCallbackHandler extends 
KubernetesLeaderElector.Leader
     }
 
     public static String waitUntilNewLeaderAppears() throws Exception {
-        return sharedQueue.take();
+        return retrieveNextEventAsync(sharedQueue).get();
+    }
+
+    public static CompletableFuture<String> retrieveNextEventAsync(
+            BlockingQueue<String> eventQueue) {
+        return CompletableFuture.supplyAsync(
+                () -> {
+                    try {
+                        return eventQueue.take();
+                    } catch (InterruptedException e) {
+                        Thread.currentThread().interrupt();
+                        throw new CompletionException(e);
+                    }
+                });
     }
 
     public void waitForNewLeader() throws Exception {
-        poll(leaderQueue);
+        waitForNewLeaderAsync().get();
+    }
+
+    public CompletableFuture<Void> waitForNewLeaderAsync() {
+        return waitForNextEvent(leaderQueue);
     }
 
     public void waitForRevokeLeader() throws Exception {
-        poll(revokeQueue);
+        waitForRevokeLeaderAsync().get();
     }
 
-    private void poll(BlockingQueue<String> queue) throws Exception {
-        CommonTestUtils.waitUntilCondition(
-                () -> {
-                    final String lockIdentity = queue.take();
-                    return this.lockIdentity.equals(lockIdentity);
-                });
+    public CompletableFuture<Void> waitForRevokeLeaderAsync() {
+        return waitForNextEvent(revokeQueue);
+    }
+
+    private CompletableFuture<Void> waitForNextEvent(BlockingQueue<String> 
eventQueue) {
+        return retrieveNextEventAsync(eventQueue)
+                .thenAccept(
+                        eventLockIdentity ->
+                                
Preconditions.checkState(eventLockIdentity.equals(lockIdentity)));
     }
 }

Reply via email to