wangyang0918 commented on a change in pull request #28:
URL:
https://github.com/apache/flink-kubernetes-operator/pull/28#discussion_r816390801
##########
File path:
flink-kubernetes-operator/src/main/java/org/apache/flink/kubernetes/operator/utils/FlinkUtils.java
##########
@@ -108,21 +116,109 @@ private static void mergeInto(JsonNode toNode, JsonNode
fromNode) {
}
}
- public static void deleteCluster(FlinkDeployment flinkApp,
KubernetesClient kubernetesClient) {
+ public static void deleteCluster(
+ FlinkDeployment flinkApp,
+ KubernetesClient kubernetesClient,
+ boolean deleteHaConfigmaps) {
deleteCluster(
flinkApp.getMetadata().getNamespace(),
flinkApp.getMetadata().getName(),
- kubernetesClient);
+ kubernetesClient,
+ deleteHaConfigmaps);
}
+ /**
+ * Delete Flink kubernetes cluster by deleting the kubernetes resources
directly. Optionally
+ * allows deleting the native kubernetes HA resources as well.
+ *
+ * @param namespace Namespace where the Flink cluster is deployed
+ * @param clusterId ClusterId of the Flink cluster
+ * @param kubernetesClient Kubernetes client
+ * @param deleteHaConfigmaps Flag to indicate whether k8s HA metadata
should be removed as well
+ */
public static void deleteCluster(
- String namespace, String clusterId, KubernetesClient
kubernetesClient) {
+ String namespace,
+ String clusterId,
+ KubernetesClient kubernetesClient,
+ boolean deleteHaConfigmaps) {
kubernetesClient
.apps()
.deployments()
.inNamespace(namespace)
- .withName(clusterId)
+ .withName(KubernetesUtils.getDeploymentName(clusterId))
.cascading(true)
.delete();
+
+ if (deleteHaConfigmaps) {
+ // We need to wait for cluster shutdown otherwise confimaps might
be recreated
Review comment:
```suggestion
// We need to wait for cluster shutdown otherwise configmaps
might be recreated
```
Typo
##########
File path:
flink-kubernetes-operator/src/main/java/org/apache/flink/kubernetes/operator/utils/FlinkUtils.java
##########
@@ -108,21 +116,109 @@ private static void mergeInto(JsonNode toNode, JsonNode
fromNode) {
}
}
- public static void deleteCluster(FlinkDeployment flinkApp,
KubernetesClient kubernetesClient) {
+ public static void deleteCluster(
+ FlinkDeployment flinkApp,
+ KubernetesClient kubernetesClient,
+ boolean deleteHaConfigmaps) {
deleteCluster(
flinkApp.getMetadata().getNamespace(),
flinkApp.getMetadata().getName(),
- kubernetesClient);
+ kubernetesClient,
+ deleteHaConfigmaps);
}
+ /**
+ * Delete Flink kubernetes cluster by deleting the kubernetes resources
directly. Optionally
+ * allows deleting the native kubernetes HA resources as well.
+ *
+ * @param namespace Namespace where the Flink cluster is deployed
+ * @param clusterId ClusterId of the Flink cluster
+ * @param kubernetesClient Kubernetes client
+ * @param deleteHaConfigmaps Flag to indicate whether k8s HA metadata
should be removed as well
+ */
public static void deleteCluster(
- String namespace, String clusterId, KubernetesClient
kubernetesClient) {
+ String namespace,
+ String clusterId,
+ KubernetesClient kubernetesClient,
+ boolean deleteHaConfigmaps) {
kubernetesClient
.apps()
.deployments()
.inNamespace(namespace)
- .withName(clusterId)
+ .withName(KubernetesUtils.getDeploymentName(clusterId))
.cascading(true)
.delete();
+
+ if (deleteHaConfigmaps) {
+ // We need to wait for cluster shutdown otherwise confimaps might
be recreated
+ waitForClusterShutdown(kubernetesClient, namespace, clusterId);
+ kubernetesClient
+ .configMaps()
+ .inNamespace(namespace)
+ .withLabels(
+ KubernetesUtils.getConfigMapLabels(
+ clusterId,
LABEL_CONFIGMAP_TYPE_HIGH_AVAILABILITY))
+ .delete();
+ }
+ }
+
+ /** We need this due to the buggy flink kube cluster client behaviour for
now. */
Review comment:
I am confusing about this comment. @gyfora Do you know what is the buggy
of flink kube cluster client?
##########
File path:
flink-kubernetes-operator/src/main/java/org/apache/flink/kubernetes/operator/utils/FlinkUtils.java
##########
@@ -108,21 +116,109 @@ private static void mergeInto(JsonNode toNode, JsonNode
fromNode) {
}
}
- public static void deleteCluster(FlinkDeployment flinkApp,
KubernetesClient kubernetesClient) {
+ public static void deleteCluster(
+ FlinkDeployment flinkApp,
+ KubernetesClient kubernetesClient,
+ boolean deleteHaConfigmaps) {
deleteCluster(
flinkApp.getMetadata().getNamespace(),
flinkApp.getMetadata().getName(),
- kubernetesClient);
+ kubernetesClient,
+ deleteHaConfigmaps);
}
+ /**
+ * Delete Flink kubernetes cluster by deleting the kubernetes resources
directly. Optionally
+ * allows deleting the native kubernetes HA resources as well.
+ *
+ * @param namespace Namespace where the Flink cluster is deployed
+ * @param clusterId ClusterId of the Flink cluster
+ * @param kubernetesClient Kubernetes client
+ * @param deleteHaConfigmaps Flag to indicate whether k8s HA metadata
should be removed as well
+ */
public static void deleteCluster(
- String namespace, String clusterId, KubernetesClient
kubernetesClient) {
+ String namespace,
+ String clusterId,
+ KubernetesClient kubernetesClient,
+ boolean deleteHaConfigmaps) {
kubernetesClient
.apps()
.deployments()
.inNamespace(namespace)
- .withName(clusterId)
+ .withName(KubernetesUtils.getDeploymentName(clusterId))
.cascading(true)
.delete();
+
+ if (deleteHaConfigmaps) {
+ // We need to wait for cluster shutdown otherwise confimaps might
be recreated
+ waitForClusterShutdown(kubernetesClient, namespace, clusterId);
+ kubernetesClient
+ .configMaps()
+ .inNamespace(namespace)
+ .withLabels(
+ KubernetesUtils.getConfigMapLabels(
+ clusterId,
LABEL_CONFIGMAP_TYPE_HIGH_AVAILABILITY))
+ .delete();
+ }
+ }
+
+ /** We need this due to the buggy flink kube cluster client behaviour for
now. */
+ public static void waitForClusterShutdown(
+ KubernetesClient kubernetesClient, String namespace, String
clusterId) {
+
+ boolean jobManagerRunning = true;
+ boolean serviceRunning = true;
+
+ for (int i = 0; i < 60; i++) {
+ if (jobManagerRunning) {
Review comment:
This minor refactor also fixes
[FLINK-26344](https://issues.apache.org/jira/browse/FLINK-26344).
##########
File path:
flink-kubernetes-operator/src/main/java/org/apache/flink/kubernetes/operator/utils/FlinkUtils.java
##########
@@ -108,21 +116,109 @@ private static void mergeInto(JsonNode toNode, JsonNode
fromNode) {
}
}
- public static void deleteCluster(FlinkDeployment flinkApp,
KubernetesClient kubernetesClient) {
+ public static void deleteCluster(
+ FlinkDeployment flinkApp,
+ KubernetesClient kubernetesClient,
+ boolean deleteHaConfigmaps) {
deleteCluster(
flinkApp.getMetadata().getNamespace(),
flinkApp.getMetadata().getName(),
- kubernetesClient);
+ kubernetesClient,
+ deleteHaConfigmaps);
}
+ /**
+ * Delete Flink kubernetes cluster by deleting the kubernetes resources
directly. Optionally
+ * allows deleting the native kubernetes HA resources as well.
+ *
+ * @param namespace Namespace where the Flink cluster is deployed
+ * @param clusterId ClusterId of the Flink cluster
+ * @param kubernetesClient Kubernetes client
+ * @param deleteHaConfigmaps Flag to indicate whether k8s HA metadata
should be removed as well
+ */
public static void deleteCluster(
- String namespace, String clusterId, KubernetesClient
kubernetesClient) {
+ String namespace,
+ String clusterId,
+ KubernetesClient kubernetesClient,
+ boolean deleteHaConfigmaps) {
kubernetesClient
.apps()
.deployments()
.inNamespace(namespace)
- .withName(clusterId)
+ .withName(KubernetesUtils.getDeploymentName(clusterId))
.cascading(true)
.delete();
+
+ if (deleteHaConfigmaps) {
+ // We need to wait for cluster shutdown otherwise confimaps might
be recreated
+ waitForClusterShutdown(kubernetesClient, namespace, clusterId);
+ kubernetesClient
+ .configMaps()
+ .inNamespace(namespace)
+ .withLabels(
+ KubernetesUtils.getConfigMapLabels(
+ clusterId,
LABEL_CONFIGMAP_TYPE_HIGH_AVAILABILITY))
+ .delete();
+ }
+ }
+
+ /** We need this due to the buggy flink kube cluster client behaviour for
now. */
+ public static void waitForClusterShutdown(
+ KubernetesClient kubernetesClient, String namespace, String
clusterId) {
+
+ boolean jobManagerRunning = true;
+ boolean serviceRunning = true;
+
+ for (int i = 0; i < 60; i++) {
+ if (jobManagerRunning) {
+ PodList jmPodList =
+ kubernetesClient
+ .pods()
+ .inNamespace(namespace)
+ .withLabel(
+ Constants.LABEL_TYPE_KEY,
Constants.LABEL_TYPE_NATIVE_TYPE)
+ .withLabel(
+ Constants.LABEL_COMPONENT_KEY,
+ Constants.LABEL_COMPONENT_JOB_MANAGER)
+ .withLabel(Constants.LABEL_APP_KEY, clusterId)
Review comment:
Could be simplified by
`KubernetesUtils.getJobManagerSelectors(clusterId)`
--
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
To unsubscribe, e-mail: [email protected]
For queries about this service, please contact Infrastructure at:
[email protected]