xintongsong commented on code in PR #24163:
URL: https://github.com/apache/flink/pull/24163#discussion_r1494040971


##########
flink-kubernetes/src/main/java/org/apache/flink/kubernetes/KubernetesResourceManagerDriver.java:
##########
@@ -412,11 +415,9 @@ private void stopPod(String podName) {
                         });
     }
 
-    private Optional<KubernetesWatch> watchTaskManagerPods() throws Exception {
-        return Optional.of(
-                flinkKubeClient.watchPodsAndDoCallback(
-                        KubernetesUtils.getTaskManagerSelectors(clusterId),
-                        new PodCallbackHandlerImpl()));
+    private CompletableFuture<KubernetesWatch> watchTaskManagerPods() throws 
Exception {
+        return flinkKubeClient.watchPodsAndDoCallback(
+                KubernetesUtils.getTaskManagerSelectors(clusterId), new 
PodCallbackHandlerImpl());

Review Comment:
   It would be helpful to print a log when the future is completed.



##########
flink-kubernetes/src/main/java/org/apache/flink/kubernetes/KubernetesResourceManagerDriver.java:
##########
@@ -114,7 +116,8 @@ public KubernetesResourceManagerDriver(
 
     @Override
     protected void initializeInternal() throws Exception {
-        podsWatchOpt = watchTaskManagerPods();
+        podsWatchOptFuture = watchTaskManagerPods();
+        podsWatchOptFuture.get();

Review Comment:
   Why do we need to wait for the `podsWatchOptFuture.get()` here? This blocks 
the RPC main thread.



##########
flink-kubernetes/src/main/java/org/apache/flink/kubernetes/KubernetesResourceManagerDriver.java:
##########
@@ -452,10 +453,22 @@ public void handleError(Throwable throwable) {
                         .execute(
                                 () -> {
                                     if (running) {
-                                        
podsWatchOpt.ifPresent(KubernetesWatch::close);
+                                        try {
+                                            if (podsWatchOptFuture.get() != 
null) {

Review Comment:
   This blocks the main thread. Is it necessary to wait for the previous watch 
to be closed before creating the new one? If yes, we should create the new 
watch in `podsWatchOptFuture.whenCompleteAsync()` to avoid blocking the main 
thread.



##########
flink-kubernetes/src/test/java/org/apache/flink/kubernetes/kubeclient/Fabric8FlinkKubeClientTest.java:
##########
@@ -454,6 +451,21 @@ void testWatchPodsAndDoCallback() throws Exception {
         assertThat(podModifiedAction.get()).isEqualTo(Action.MODIFIED);
     }
 
+    @Test
+    void testWatchPodsAndDoCallbackFail() throws Exception {
+        mockWatchPodSuccessAfterFailTwoTimes(
+                NAMESPACE, KUBERNETES_ZERO_RESOURCE_VERSION, TESTING_LABELS);
+        TestingWatchCallbackHandler<KubernetesPod> watchCallbackHandler =
+                TestingWatchCallbackHandler.<KubernetesPod>builder().build();
+        // disable the retry of kubeClient
+        kubeClient.getConfiguration().setRequestRetryBackoffLimit(0);
+        assertThat(
+                        flinkKubeClient
+                                .watchPodsAndDoCallback(TESTING_LABELS, 
watchCallbackHandler)
+                                .get())

Review Comment:
   We should add a timeout, or the test case may hang for a long time.



##########
flink-kubernetes/src/test/java/org/apache/flink/kubernetes/kubeclient/TestingFlinkKubeClient.java:
##########
@@ -149,9 +149,10 @@ public List<KubernetesPod> getPodsWithLabels(Map<String, 
String> labels) {
     }
 
     @Override
-    public KubernetesWatch watchPodsAndDoCallback(
+    public CompletableFuture<KubernetesWatch> watchPodsAndDoCallback(
             Map<String, String> labels, WatchCallbackHandler<KubernetesPod> 
podCallbackHandler) {
-        return watchPodsAndDoCallbackFunction.apply(labels, 
podCallbackHandler);
+        return CompletableFuture.supplyAsync(
+                () -> watchPodsAndDoCallbackFunction.apply(labels, 
podCallbackHandler));

Review Comment:
   I think we should change the return value type of 
`watchPodsAndDoCallbackFunction` to align with the method signature.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: [email protected]

For queries about this service, please contact Infrastructure at:
[email protected]

Reply via email to