bgeng777 commented on a change in pull request #26:
URL:
https://github.com/apache/flink-kubernetes-operator/pull/26#discussion_r817672374
##########
File path:
flink-kubernetes-operator/src/main/java/org/apache/flink/kubernetes/operator/reconciler/BaseReconciler.java
##########
@@ -54,81 +47,22 @@ public BaseReconciler(KubernetesClient kubernetesClient,
FlinkService flinkServi
this.flinkService = flinkService;
}
- public boolean removeDeployment(FlinkDeployment flinkApp) {
- return jobManagerDeployments.remove(flinkApp.getMetadata().getUid());
- }
-
public abstract UpdateControl<FlinkDeployment> reconcile(
String operatorNamespace,
FlinkDeployment flinkApp,
Context context,
Configuration effectiveConfig)
throws Exception;
- protected JobManagerDeploymentStatus checkJobManagerDeployment(
- FlinkDeployment flinkApp, Context context, Configuration
effectiveConfig) {
- if (!jobManagerDeployments.contains(flinkApp.getMetadata().getUid())) {
- Optional<Deployment> deployment =
context.getSecondaryResource(Deployment.class);
- if (deployment.isPresent()) {
- DeploymentStatus status = deployment.get().getStatus();
- DeploymentSpec spec = deployment.get().getSpec();
- if (status != null
- && status.getAvailableReplicas() != null
- && spec.getReplicas().intValue() ==
status.getReplicas()
- && spec.getReplicas().intValue() ==
status.getAvailableReplicas()) {
- // typically it takes a few seconds for the REST server to
be ready
- if (flinkService.isJobManagerPortReady(effectiveConfig)) {
- LOG.info(
- "JobManager deployment {} in namespace {} is
ready",
- flinkApp.getMetadata().getName(),
- flinkApp.getMetadata().getNamespace());
-
jobManagerDeployments.add(flinkApp.getMetadata().getUid());
- if (flinkApp.getStatus().getJobStatus() != null) {
- // pre-existing deployments on operator restart -
proceed with
- // reconciliation
- return JobManagerDeploymentStatus.READY;
- }
- }
- LOG.info(
- "JobManager deployment {} in namespace {} port not
ready",
- flinkApp.getMetadata().getName(),
- flinkApp.getMetadata().getNamespace());
- return JobManagerDeploymentStatus.DEPLOYED_NOT_READY;
- }
- LOG.info(
- "JobManager deployment {} in namespace {} not yet
ready, status {}",
- flinkApp.getMetadata().getName(),
- flinkApp.getMetadata().getNamespace(),
- status);
-
- return JobManagerDeploymentStatus.DEPLOYING;
- }
- return JobManagerDeploymentStatus.MISSING;
- }
- return JobManagerDeploymentStatus.READY;
- }
-
- /**
- * Shuts down the job and deletes all kubernetes resources including k8s
HA resources. It will
- * first perform a graceful shutdown (cancel) before deleting the data if
that is unsuccessful
- * in order to avoid leaking HA metadata to durable storage.
- *
- * <p>This feature is limited at the moment to cleaning up native
kubernetes HA resources, other
- * HA providers like ZK need to be cleaned up manually after deletion.
- */
public DeleteControl shutdownAndDelete(
- String operatorNamespace,
- FlinkDeployment flinkApp,
- Context context,
- Configuration effectiveConfig) {
+ String operatorNamespace, FlinkDeployment flinkApp, Configuration
effectiveConfig) {
- if (checkJobManagerDeployment(flinkApp, context, effectiveConfig)
- == JobManagerDeploymentStatus.READY) {
+ if (JobManagerDeploymentStatus.READY
+ == flinkApp.getStatus().getJobManagerDeploymentStatus()) {
shutdown(flinkApp, effectiveConfig);
} else {
FlinkUtils.deleteCluster(flinkApp, kubernetesClient, true);
}
Review comment:
Should we also call `FlinkUtils.waitForClusterShutdown()` here? I find
`FlinkUtils.waitForClusterShutdown()` is only used for session cluster. And In
`waitForClusterShutdown`, I notice that it will wait until JM and other service
to be cleared. Why `deleteCluster` is not enough? If it cannot fullfill our
requirements, should we wait for other services to be cleared in the `else`
branch here?
--
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
To unsubscribe, e-mail: [email protected]
For queries about this service, please contact Infrastructure at:
[email protected]