xintongsong commented on code in PR #24163:
URL: https://github.com/apache/flink/pull/24163#discussion_r1496940478


##########
flink-kubernetes/src/main/java/org/apache/flink/kubernetes/KubernetesResourceManagerDriver.java:
##########
@@ -90,7 +92,7 @@ public class KubernetesResourceManagerDriver
     /** Current max pod index. When creating a new pod, it should increase 
one. */
     private long currentMaxPodId = 0;
 
-    private Optional<KubernetesWatch> podsWatchOpt;
+    private CompletableFuture<KubernetesWatch> podsWatchOptFuture;

Review Comment:
   ```suggestion
       private CompletableFuture<KubernetesWatch> podsWatchOptFuture =
               FutureUtils.completedExceptionally(
                       new ResourceManagerException(
                               "KubernetesResourceManagerDriver is not 
initialized."));
   ```



##########
flink-kubernetes/src/main/java/org/apache/flink/kubernetes/KubernetesResourceManagerDriver.java:
##########
@@ -450,17 +462,30 @@ public void handleError(Throwable throwable) {
             if (throwable instanceof KubernetesTooOldResourceVersionException) 
{
                 getMainThreadExecutor()
                         .execute(
-                                () -> {
-                                    if (running) {
-                                        
podsWatchOpt.ifPresent(KubernetesWatch::close);
-                                        log.info("Creating a new watch on 
TaskManager pods.");
-                                        try {
-                                            podsWatchOpt = 
watchTaskManagerPods();
-                                        } catch (Exception e) {
-                                            
getResourceEventHandler().onError(e);
-                                        }
-                                    }
-                                });
+                                () ->
+                                        podsWatchOptFuture.whenCompleteAsync(
+                                                (KubernetesWatch watch, 
Throwable throwable1) -> {
+                                                    if (running) {
+                                                        try {
+                                                            if (watch != null) 
{
+                                                                watch.close();
+                                                            }
+                                                        } catch (Exception e) {
+                                                            log.warn(
+                                                                    "Error 
when get old watch to close, which is not supposed to happen",
+                                                                    e);
+                                                        }
+                                                        log.info(
+                                                                "Creating a 
new watch on TaskManager pods.");
+                                                        try {
+                                                            podsWatchOptFuture 
=
+                                                                    
watchTaskManagerPods();
+                                                        } catch (Exception e) {
+                                                            
getResourceEventHandler().onError(e);
+                                                        }
+                                                    }
+                                                },
+                                                getMainThreadExecutor()));

Review Comment:
   The outer `getMainThreadExecutor().execute()` is unnecessary.



##########
flink-kubernetes/src/main/java/org/apache/flink/kubernetes/KubernetesResourceManagerDriver.java:
##########
@@ -90,7 +92,7 @@ public class KubernetesResourceManagerDriver
     /** Current max pod index. When creating a new pod, it should increase 
one. */
     private long currentMaxPodId = 0;
 
-    private Optional<KubernetesWatch> podsWatchOpt;
+    private CompletableFuture<KubernetesWatch> podsWatchOptFuture;

Review Comment:
   This helps prevent NPE if `podsWatchOptFuture` is accidentally being used 
before being initialized.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscr...@flink.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org

Reply via email to