gyfora commented on a change in pull request #26:
URL: 
https://github.com/apache/flink-kubernetes-operator/pull/26#discussion_r817676014



##########
File path: 
flink-kubernetes-operator/src/main/java/org/apache/flink/kubernetes/operator/reconciler/BaseReconciler.java
##########
@@ -54,81 +47,22 @@ public BaseReconciler(KubernetesClient kubernetesClient, 
FlinkService flinkServi
         this.flinkService = flinkService;
     }
 
-    public boolean removeDeployment(FlinkDeployment flinkApp) {
-        return jobManagerDeployments.remove(flinkApp.getMetadata().getUid());
-    }
-
     public abstract UpdateControl<FlinkDeployment> reconcile(
             String operatorNamespace,
             FlinkDeployment flinkApp,
             Context context,
             Configuration effectiveConfig)
             throws Exception;
 
-    protected JobManagerDeploymentStatus checkJobManagerDeployment(
-            FlinkDeployment flinkApp, Context context, Configuration 
effectiveConfig) {
-        if (!jobManagerDeployments.contains(flinkApp.getMetadata().getUid())) {
-            Optional<Deployment> deployment = 
context.getSecondaryResource(Deployment.class);
-            if (deployment.isPresent()) {
-                DeploymentStatus status = deployment.get().getStatus();
-                DeploymentSpec spec = deployment.get().getSpec();
-                if (status != null
-                        && status.getAvailableReplicas() != null
-                        && spec.getReplicas().intValue() == 
status.getReplicas()
-                        && spec.getReplicas().intValue() == 
status.getAvailableReplicas()) {
-                    // typically it takes a few seconds for the REST server to 
be ready
-                    if (flinkService.isJobManagerPortReady(effectiveConfig)) {
-                        LOG.info(
-                                "JobManager deployment {} in namespace {} is 
ready",
-                                flinkApp.getMetadata().getName(),
-                                flinkApp.getMetadata().getNamespace());
-                        
jobManagerDeployments.add(flinkApp.getMetadata().getUid());
-                        if (flinkApp.getStatus().getJobStatus() != null) {
-                            // pre-existing deployments on operator restart - 
proceed with
-                            // reconciliation
-                            return JobManagerDeploymentStatus.READY;
-                        }
-                    }
-                    LOG.info(
-                            "JobManager deployment {} in namespace {} port not 
ready",
-                            flinkApp.getMetadata().getName(),
-                            flinkApp.getMetadata().getNamespace());
-                    return JobManagerDeploymentStatus.DEPLOYED_NOT_READY;
-                }
-                LOG.info(
-                        "JobManager deployment {} in namespace {} not yet 
ready, status {}",
-                        flinkApp.getMetadata().getName(),
-                        flinkApp.getMetadata().getNamespace(),
-                        status);
-
-                return JobManagerDeploymentStatus.DEPLOYING;
-            }
-            return JobManagerDeploymentStatus.MISSING;
-        }
-        return JobManagerDeploymentStatus.READY;
-    }
-
-    /**
-     * Shuts down the job and deletes all kubernetes resources including k8s 
HA resources. It will
-     * first perform a graceful shutdown (cancel) before deleting the data if 
that is unsuccessful
-     * in order to avoid leaking HA metadata to durable storage.
-     *
-     * <p>This feature is limited at the moment to cleaning up native 
kubernetes HA resources, other
-     * HA providers like ZK need to be cleaned up manually after deletion.
-     */
     public DeleteControl shutdownAndDelete(
-            String operatorNamespace,
-            FlinkDeployment flinkApp,
-            Context context,
-            Configuration effectiveConfig) {
+            String operatorNamespace, FlinkDeployment flinkApp, Configuration 
effectiveConfig) {
 
-        if (checkJobManagerDeployment(flinkApp, context, effectiveConfig)
-                == JobManagerDeploymentStatus.READY) {
+        if (JobManagerDeploymentStatus.READY
+                == flinkApp.getStatus().getJobManagerDeploymentStatus()) {
             shutdown(flinkApp, effectiveConfig);
         } else {
             FlinkUtils.deleteCluster(flinkApp, kubernetesClient, true);
         }

Review comment:
       FlinkUtils.deleteCluster will call waitForClusterShutdown internally if 
deleteHaConfigmaps is set to true. This has been introduced recently to clean 
up properly on deletion




-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: [email protected]

For queries about this service, please contact Infrastructure at:
[email protected]


Reply via email to