wangyang0918 commented on a change in pull request #28:
URL: 
https://github.com/apache/flink-kubernetes-operator/pull/28#discussion_r816390801



##########
File path: 
flink-kubernetes-operator/src/main/java/org/apache/flink/kubernetes/operator/utils/FlinkUtils.java
##########
@@ -108,21 +116,109 @@ private static void mergeInto(JsonNode toNode, JsonNode 
fromNode) {
         }
     }
 
-    public static void deleteCluster(FlinkDeployment flinkApp, 
KubernetesClient kubernetesClient) {
+    public static void deleteCluster(
+            FlinkDeployment flinkApp,
+            KubernetesClient kubernetesClient,
+            boolean deleteHaConfigmaps) {
         deleteCluster(
                 flinkApp.getMetadata().getNamespace(),
                 flinkApp.getMetadata().getName(),
-                kubernetesClient);
+                kubernetesClient,
+                deleteHaConfigmaps);
     }
 
+    /**
+     * Delete Flink kubernetes cluster by deleting the kubernetes resources 
directly. Optionally
+     * allows deleting the native kubernetes HA resources as well.
+     *
+     * @param namespace Namespace where the Flink cluster is deployed
+     * @param clusterId ClusterId of the Flink cluster
+     * @param kubernetesClient Kubernetes client
+     * @param deleteHaConfigmaps Flag to indicate whether k8s HA metadata 
should be removed as well
+     */
     public static void deleteCluster(
-            String namespace, String clusterId, KubernetesClient 
kubernetesClient) {
+            String namespace,
+            String clusterId,
+            KubernetesClient kubernetesClient,
+            boolean deleteHaConfigmaps) {
         kubernetesClient
                 .apps()
                 .deployments()
                 .inNamespace(namespace)
-                .withName(clusterId)
+                .withName(KubernetesUtils.getDeploymentName(clusterId))
                 .cascading(true)
                 .delete();
+
+        if (deleteHaConfigmaps) {
+            // We need to wait for cluster shutdown otherwise confimaps might 
be recreated

Review comment:
       ```suggestion
               // We need to wait for cluster shutdown otherwise configmaps 
might be recreated
   ```
   
   Typo

##########
File path: 
flink-kubernetes-operator/src/main/java/org/apache/flink/kubernetes/operator/utils/FlinkUtils.java
##########
@@ -108,21 +116,109 @@ private static void mergeInto(JsonNode toNode, JsonNode 
fromNode) {
         }
     }
 
-    public static void deleteCluster(FlinkDeployment flinkApp, 
KubernetesClient kubernetesClient) {
+    public static void deleteCluster(
+            FlinkDeployment flinkApp,
+            KubernetesClient kubernetesClient,
+            boolean deleteHaConfigmaps) {
         deleteCluster(
                 flinkApp.getMetadata().getNamespace(),
                 flinkApp.getMetadata().getName(),
-                kubernetesClient);
+                kubernetesClient,
+                deleteHaConfigmaps);
     }
 
+    /**
+     * Delete Flink kubernetes cluster by deleting the kubernetes resources 
directly. Optionally
+     * allows deleting the native kubernetes HA resources as well.
+     *
+     * @param namespace Namespace where the Flink cluster is deployed
+     * @param clusterId ClusterId of the Flink cluster
+     * @param kubernetesClient Kubernetes client
+     * @param deleteHaConfigmaps Flag to indicate whether k8s HA metadata 
should be removed as well
+     */
     public static void deleteCluster(
-            String namespace, String clusterId, KubernetesClient 
kubernetesClient) {
+            String namespace,
+            String clusterId,
+            KubernetesClient kubernetesClient,
+            boolean deleteHaConfigmaps) {
         kubernetesClient
                 .apps()
                 .deployments()
                 .inNamespace(namespace)
-                .withName(clusterId)
+                .withName(KubernetesUtils.getDeploymentName(clusterId))
                 .cascading(true)
                 .delete();
+
+        if (deleteHaConfigmaps) {
+            // We need to wait for cluster shutdown otherwise confimaps might 
be recreated
+            waitForClusterShutdown(kubernetesClient, namespace, clusterId);
+            kubernetesClient
+                    .configMaps()
+                    .inNamespace(namespace)
+                    .withLabels(
+                            KubernetesUtils.getConfigMapLabels(
+                                    clusterId, 
LABEL_CONFIGMAP_TYPE_HIGH_AVAILABILITY))
+                    .delete();
+        }
+    }
+
+    /** We need this due to the buggy flink kube cluster client behaviour for 
now. */

Review comment:
       I am confusing about this comment. @gyfora Do you know what is the buggy 
of flink kube cluster client?

##########
File path: 
flink-kubernetes-operator/src/main/java/org/apache/flink/kubernetes/operator/utils/FlinkUtils.java
##########
@@ -108,21 +116,109 @@ private static void mergeInto(JsonNode toNode, JsonNode 
fromNode) {
         }
     }
 
-    public static void deleteCluster(FlinkDeployment flinkApp, 
KubernetesClient kubernetesClient) {
+    public static void deleteCluster(
+            FlinkDeployment flinkApp,
+            KubernetesClient kubernetesClient,
+            boolean deleteHaConfigmaps) {
         deleteCluster(
                 flinkApp.getMetadata().getNamespace(),
                 flinkApp.getMetadata().getName(),
-                kubernetesClient);
+                kubernetesClient,
+                deleteHaConfigmaps);
     }
 
+    /**
+     * Delete Flink kubernetes cluster by deleting the kubernetes resources 
directly. Optionally
+     * allows deleting the native kubernetes HA resources as well.
+     *
+     * @param namespace Namespace where the Flink cluster is deployed
+     * @param clusterId ClusterId of the Flink cluster
+     * @param kubernetesClient Kubernetes client
+     * @param deleteHaConfigmaps Flag to indicate whether k8s HA metadata 
should be removed as well
+     */
     public static void deleteCluster(
-            String namespace, String clusterId, KubernetesClient 
kubernetesClient) {
+            String namespace,
+            String clusterId,
+            KubernetesClient kubernetesClient,
+            boolean deleteHaConfigmaps) {
         kubernetesClient
                 .apps()
                 .deployments()
                 .inNamespace(namespace)
-                .withName(clusterId)
+                .withName(KubernetesUtils.getDeploymentName(clusterId))
                 .cascading(true)
                 .delete();
+
+        if (deleteHaConfigmaps) {
+            // We need to wait for cluster shutdown otherwise confimaps might 
be recreated
+            waitForClusterShutdown(kubernetesClient, namespace, clusterId);
+            kubernetesClient
+                    .configMaps()
+                    .inNamespace(namespace)
+                    .withLabels(
+                            KubernetesUtils.getConfigMapLabels(
+                                    clusterId, 
LABEL_CONFIGMAP_TYPE_HIGH_AVAILABILITY))
+                    .delete();
+        }
+    }
+
+    /** We need this due to the buggy flink kube cluster client behaviour for 
now. */
+    public static void waitForClusterShutdown(
+            KubernetesClient kubernetesClient, String namespace, String 
clusterId) {
+
+        boolean jobManagerRunning = true;
+        boolean serviceRunning = true;
+
+        for (int i = 0; i < 60; i++) {
+            if (jobManagerRunning) {

Review comment:
       This minor refactor also fixes 
[FLINK-26344](https://issues.apache.org/jira/browse/FLINK-26344).

##########
File path: 
flink-kubernetes-operator/src/main/java/org/apache/flink/kubernetes/operator/utils/FlinkUtils.java
##########
@@ -108,21 +116,109 @@ private static void mergeInto(JsonNode toNode, JsonNode 
fromNode) {
         }
     }
 
-    public static void deleteCluster(FlinkDeployment flinkApp, 
KubernetesClient kubernetesClient) {
+    public static void deleteCluster(
+            FlinkDeployment flinkApp,
+            KubernetesClient kubernetesClient,
+            boolean deleteHaConfigmaps) {
         deleteCluster(
                 flinkApp.getMetadata().getNamespace(),
                 flinkApp.getMetadata().getName(),
-                kubernetesClient);
+                kubernetesClient,
+                deleteHaConfigmaps);
     }
 
+    /**
+     * Delete Flink kubernetes cluster by deleting the kubernetes resources 
directly. Optionally
+     * allows deleting the native kubernetes HA resources as well.
+     *
+     * @param namespace Namespace where the Flink cluster is deployed
+     * @param clusterId ClusterId of the Flink cluster
+     * @param kubernetesClient Kubernetes client
+     * @param deleteHaConfigmaps Flag to indicate whether k8s HA metadata 
should be removed as well
+     */
     public static void deleteCluster(
-            String namespace, String clusterId, KubernetesClient 
kubernetesClient) {
+            String namespace,
+            String clusterId,
+            KubernetesClient kubernetesClient,
+            boolean deleteHaConfigmaps) {
         kubernetesClient
                 .apps()
                 .deployments()
                 .inNamespace(namespace)
-                .withName(clusterId)
+                .withName(KubernetesUtils.getDeploymentName(clusterId))
                 .cascading(true)
                 .delete();
+
+        if (deleteHaConfigmaps) {
+            // We need to wait for cluster shutdown otherwise confimaps might 
be recreated
+            waitForClusterShutdown(kubernetesClient, namespace, clusterId);
+            kubernetesClient
+                    .configMaps()
+                    .inNamespace(namespace)
+                    .withLabels(
+                            KubernetesUtils.getConfigMapLabels(
+                                    clusterId, 
LABEL_CONFIGMAP_TYPE_HIGH_AVAILABILITY))
+                    .delete();
+        }
+    }
+
+    /** We need this due to the buggy flink kube cluster client behaviour for 
now. */
+    public static void waitForClusterShutdown(
+            KubernetesClient kubernetesClient, String namespace, String 
clusterId) {
+
+        boolean jobManagerRunning = true;
+        boolean serviceRunning = true;
+
+        for (int i = 0; i < 60; i++) {
+            if (jobManagerRunning) {
+                PodList jmPodList =
+                        kubernetesClient
+                                .pods()
+                                .inNamespace(namespace)
+                                .withLabel(
+                                        Constants.LABEL_TYPE_KEY, 
Constants.LABEL_TYPE_NATIVE_TYPE)
+                                .withLabel(
+                                        Constants.LABEL_COMPONENT_KEY,
+                                        Constants.LABEL_COMPONENT_JOB_MANAGER)
+                                .withLabel(Constants.LABEL_APP_KEY, clusterId)

Review comment:
       Could be simplified by 
`KubernetesUtils.getJobManagerSelectors(clusterId)`




-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: [email protected]

For queries about this service, please contact Infrastructure at:
[email protected]


Reply via email to