bgeng777 commented on a change in pull request #26:
URL: 
https://github.com/apache/flink-kubernetes-operator/pull/26#discussion_r817672374



##########
File path: 
flink-kubernetes-operator/src/main/java/org/apache/flink/kubernetes/operator/reconciler/BaseReconciler.java
##########
@@ -54,81 +47,22 @@ public BaseReconciler(KubernetesClient kubernetesClient, 
FlinkService flinkServi
         this.flinkService = flinkService;
     }
 
-    public boolean removeDeployment(FlinkDeployment flinkApp) {
-        return jobManagerDeployments.remove(flinkApp.getMetadata().getUid());
-    }
-
     public abstract UpdateControl<FlinkDeployment> reconcile(
             String operatorNamespace,
             FlinkDeployment flinkApp,
             Context context,
             Configuration effectiveConfig)
             throws Exception;
 
-    protected JobManagerDeploymentStatus checkJobManagerDeployment(
-            FlinkDeployment flinkApp, Context context, Configuration 
effectiveConfig) {
-        if (!jobManagerDeployments.contains(flinkApp.getMetadata().getUid())) {
-            Optional<Deployment> deployment = 
context.getSecondaryResource(Deployment.class);
-            if (deployment.isPresent()) {
-                DeploymentStatus status = deployment.get().getStatus();
-                DeploymentSpec spec = deployment.get().getSpec();
-                if (status != null
-                        && status.getAvailableReplicas() != null
-                        && spec.getReplicas().intValue() == 
status.getReplicas()
-                        && spec.getReplicas().intValue() == 
status.getAvailableReplicas()) {
-                    // typically it takes a few seconds for the REST server to 
be ready
-                    if (flinkService.isJobManagerPortReady(effectiveConfig)) {
-                        LOG.info(
-                                "JobManager deployment {} in namespace {} is 
ready",
-                                flinkApp.getMetadata().getName(),
-                                flinkApp.getMetadata().getNamespace());
-                        
jobManagerDeployments.add(flinkApp.getMetadata().getUid());
-                        if (flinkApp.getStatus().getJobStatus() != null) {
-                            // pre-existing deployments on operator restart - 
proceed with
-                            // reconciliation
-                            return JobManagerDeploymentStatus.READY;
-                        }
-                    }
-                    LOG.info(
-                            "JobManager deployment {} in namespace {} port not 
ready",
-                            flinkApp.getMetadata().getName(),
-                            flinkApp.getMetadata().getNamespace());
-                    return JobManagerDeploymentStatus.DEPLOYED_NOT_READY;
-                }
-                LOG.info(
-                        "JobManager deployment {} in namespace {} not yet 
ready, status {}",
-                        flinkApp.getMetadata().getName(),
-                        flinkApp.getMetadata().getNamespace(),
-                        status);
-
-                return JobManagerDeploymentStatus.DEPLOYING;
-            }
-            return JobManagerDeploymentStatus.MISSING;
-        }
-        return JobManagerDeploymentStatus.READY;
-    }
-
-    /**
-     * Shuts down the job and deletes all kubernetes resources including k8s 
HA resources. It will
-     * first perform a graceful shutdown (cancel) before deleting the data if 
that is unsuccessful
-     * in order to avoid leaking HA metadata to durable storage.
-     *
-     * <p>This feature is limited at the moment to cleaning up native 
kubernetes HA resources, other
-     * HA providers like ZK need to be cleaned up manually after deletion.
-     */
     public DeleteControl shutdownAndDelete(
-            String operatorNamespace,
-            FlinkDeployment flinkApp,
-            Context context,
-            Configuration effectiveConfig) {
+            String operatorNamespace, FlinkDeployment flinkApp, Configuration 
effectiveConfig) {
 
-        if (checkJobManagerDeployment(flinkApp, context, effectiveConfig)
-                == JobManagerDeploymentStatus.READY) {
+        if (JobManagerDeploymentStatus.READY
+                == flinkApp.getStatus().getJobManagerDeploymentStatus()) {
             shutdown(flinkApp, effectiveConfig);
         } else {
             FlinkUtils.deleteCluster(flinkApp, kubernetesClient, true);
         }

Review comment:
       Should we also call `FlinkUtils.waitForClusterShutdown()` here? I find 
`FlinkUtils.waitForClusterShutdown()` is only used for session cluster. And In 
`waitForClusterShutdown`, I notice that it will wait until JM and other service 
to be cleared. Why `deleteCluster` is not enough? If it cannot fullfill our 
requirements, should we wait for other services to be cleared in the `else` 
branch here?




-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: [email protected]

For queries about this service, please contact Infrastructure at:
[email protected]


Reply via email to