mateczagany commented on code in PR #777:
URL: 
https://github.com/apache/flink-kubernetes-operator/pull/777#discussion_r1489494523


##########
flink-kubernetes-operator/src/main/java/org/apache/flink/kubernetes/operator/service/AbstractFlinkService.java:
##########
@@ -898,58 +901,54 @@ public JobDetailsInfo getJobDetailsInfo(JobID jobID, 
Configuration conf) throws
         }
     }
 
-    /** Wait until the FLink cluster has completely shut down. */
-    @VisibleForTesting
-    void waitForClusterShutdown(String namespace, String clusterId, long 
shutdownTimeout) {
-        LOG.info("Waiting for cluster shutdown...");
-
-        boolean jobManagerRunning = true;
-        boolean taskManagerRunning = true;
-        boolean serviceRunning = true;
+    /** Returns a list of Kubernetes Deployment names for given cluster. */
+    protected abstract List<String> getDeploymentNames(String namespace, 
String clusterId);
 
-        for (int i = 0; i < shutdownTimeout; i++) {
-            if (jobManagerRunning) {
-                PodList jmPodList = getJmPodList(namespace, clusterId);
+    /** Wait until the FLink cluster has completely shut down. */
+    protected void waitForClusterShutdown(
+            String namespace, String clusterId, long shutdownTimeout) {
+        long timeoutAt = System.currentTimeMillis() + shutdownTimeout * 1000;
+        LOG.info("Waiting {} seconds for cluster shutdown...", 
shutdownTimeout);
 
-                if (jmPodList == null || jmPodList.getItems().isEmpty()) {
-                    jobManagerRunning = false;
-                }
-            }
-            if (taskManagerRunning) {
-                PodList tmPodList = getTmPodList(namespace, clusterId);
+        for (var deploymentName : getDeploymentNames(namespace, clusterId)) {
+            long deploymentTimeout = timeoutAt - System.currentTimeMillis();
 
-                if (tmPodList.getItems().isEmpty()) {
-                    taskManagerRunning = false;
-                }
+            if (!waitForDeploymentToBeRemoved(namespace, deploymentName, 
deploymentTimeout)) {
+                LOG.error(
+                        "Failed to shut down cluster {} (deployment {}) in {} 
seconds, proceeding...",
+                        clusterId,
+                        deploymentName,
+                        shutdownTimeout);
+                return;
             }
+        }
+    }
 
-            if (serviceRunning) {
-                Service service =
-                        kubernetesClient
-                                .services()
-                                .inNamespace(namespace)
-                                .withName(
-                                        
ExternalServiceDecorator.getExternalServiceName(clusterId))
-                                .get();
-                if (service == null) {
-                    serviceRunning = false;
-                }
-            }
+    /** Wait until Deployment is removed, return false if timed out, otherwise 
return true. */
+    @VisibleForTesting
+    boolean waitForDeploymentToBeRemoved(String namespace, String 
deploymentName, long timeout) {
+        ScheduledExecutorService logger = 
Executors.newSingleThreadScheduledExecutor();
+        logger.scheduleWithFixedDelay(
+                () -> LOG.info("Waiting for Deployment {} to shut down...", 
deploymentName),
+                5,
+                5,
+                TimeUnit.SECONDS);

Review Comment:
   You are right, I will remove it. I don't think it's worth to keep those 
logs, since it's easy to figure out what we're waiting for if we have 
before/after logs as in your second suggestion.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: [email protected]

For queries about this service, please contact Infrastructure at:
[email protected]

Reply via email to