Aitozi commented on a change in pull request #28:
URL: 
https://github.com/apache/flink-kubernetes-operator/pull/28#discussion_r815701617



##########
File path: 
flink-kubernetes-operator/src/main/java/org/apache/flink/kubernetes/operator/reconciler/JobDeploymentStatus.java
##########
@@ -0,0 +1,58 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.kubernetes.operator.reconciler;
+
+import org.apache.flink.kubernetes.operator.crd.FlinkDeployment;
+
+import io.javaoperatorsdk.operator.api.reconciler.UpdateControl;
+
+import java.util.concurrent.TimeUnit;
+
+import static 
org.apache.flink.kubernetes.operator.reconciler.BaseReconciler.PORT_READY_DELAY_SECONDS;
+import static 
org.apache.flink.kubernetes.operator.reconciler.BaseReconciler.REFRESH_SECONDS;
+
+/** Status of the Flink job deployment. */
+public enum JobDeploymentStatus {
+
+    /** JobManager is running and ready to receive REST API calls. */
+    READY,
+
+    /** JobManager is running but not ready yet to receive REST API calls. */
+    DEPLOYED_NOT_READY,
+
+    /** JobManager process is starting up. */
+    DEPLOYING,
+
+    /** JobManager deployment not found, probably not started or killed by 
user. */
+    MISSING;
+
+    public UpdateControl<FlinkDeployment> toUpdateControl(FlinkDeployment 
flinkDeployment) {
+        switch (this) {
+            case DEPLOYING:
+                return UpdateControl.updateStatus(flinkDeployment)
+                        .rescheduleAfter(REFRESH_SECONDS, TimeUnit.SECONDS);
+            case DEPLOYED_NOT_READY:
+                return UpdateControl.updateStatus(flinkDeployment)
+                        .rescheduleAfter(PORT_READY_DELAY_SECONDS, 
TimeUnit.SECONDS);

Review comment:
       Maybe make this configurable , I will create a ticket for this.

##########
File path: 
flink-kubernetes-operator/src/main/java/org/apache/flink/kubernetes/operator/utils/FlinkUtils.java
##########
@@ -108,21 +111,38 @@ private static void mergeInto(JsonNode toNode, JsonNode 
fromNode) {
         }
     }
 
-    public static void deleteCluster(FlinkDeployment flinkApp, 
KubernetesClient kubernetesClient) {
+    public static void deleteCluster(
+            FlinkDeployment flinkApp,
+            KubernetesClient kubernetesClient,
+            boolean deleteHaConfigmaps) {
         deleteCluster(
                 flinkApp.getMetadata().getNamespace(),
                 flinkApp.getMetadata().getName(),
-                kubernetesClient);
+                kubernetesClient,
+                deleteHaConfigmaps);
     }
 
     public static void deleteCluster(
-            String namespace, String clusterId, KubernetesClient 
kubernetesClient) {
+            String namespace,
+            String clusterId,
+            KubernetesClient kubernetesClient,
+            boolean deleteHaConfigmaps) {
         kubernetesClient
                 .apps()
                 .deployments()
                 .inNamespace(namespace)
                 .withName(clusterId)
                 .cascading(true)
                 .delete();
+
+        if (deleteHaConfigmaps) {

Review comment:
       This seems only handle the situation for HA based on ConfigMap. Will 
this also work for the HA based on ZK ?

##########
File path: 
flink-kubernetes-operator/src/main/java/org/apache/flink/kubernetes/operator/reconciler/BaseReconciler.java
##########
@@ -75,29 +83,45 @@ public boolean removeDeployment(FlinkDeployment flinkApp) {
                                 flinkApp.getMetadata().getName(),
                                 flinkApp.getMetadata().getNamespace());
                         
jobManagerDeployments.add(flinkApp.getMetadata().getUid());
-                        if (flinkApp.getStatus().getJobStatus() != null) {
-                            // pre-existing deployments on operator restart - 
proceed with
-                            // reconciliation
-                            return null;
-                        }
+                        return JobDeploymentStatus.READY;
                     }
                     LOG.info(
                             "JobManager deployment {} in namespace {} port not 
ready",
                             flinkApp.getMetadata().getName(),
                             flinkApp.getMetadata().getNamespace());
-                    return UpdateControl.updateStatus(flinkApp)
-                            .rescheduleAfter(PORT_READY_DELAY_SECONDS, 
TimeUnit.SECONDS);
+                    return JobDeploymentStatus.DEPLOYED_NOT_READY;
                 }
                 LOG.info(
                         "JobManager deployment {} in namespace {} not yet 
ready, status {}",
                         flinkApp.getMetadata().getName(),
                         flinkApp.getMetadata().getNamespace(),
                         status);
-                // TODO: how frequently do we want here
-                return UpdateControl.updateStatus(flinkApp)
-                        .rescheduleAfter(REFRESH_SECONDS, TimeUnit.SECONDS);
+
+                return JobDeploymentStatus.DEPLOYING;
             }
+            return JobDeploymentStatus.MISSING;
+        }
+        return JobDeploymentStatus.READY;
+    }
+
+    public DeleteControl shutdownAndDelete(

Review comment:
       Can we document here to clarify that what is to be deleted ? Like HA 
data and cluster resource ?




-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: [email protected]

For queries about this service, please contact Infrastructure at:
[email protected]


Reply via email to