SteNicholas commented on a change in pull request #26:
URL:
https://github.com/apache/flink-kubernetes-operator/pull/26#discussion_r817487152
##########
File path:
flink-kubernetes-operator/src/main/java/org/apache/flink/kubernetes/operator/reconciler/BaseReconciler.java
##########
@@ -54,81 +47,22 @@ public BaseReconciler(KubernetesClient kubernetesClient,
FlinkService flinkServi
this.flinkService = flinkService;
}
- public boolean removeDeployment(FlinkDeployment flinkApp) {
- return jobManagerDeployments.remove(flinkApp.getMetadata().getUid());
- }
-
public abstract UpdateControl<FlinkDeployment> reconcile(
String operatorNamespace,
FlinkDeployment flinkApp,
Context context,
Configuration effectiveConfig)
throws Exception;
- protected JobManagerDeploymentStatus checkJobManagerDeployment(
- FlinkDeployment flinkApp, Context context, Configuration
effectiveConfig) {
- if (!jobManagerDeployments.contains(flinkApp.getMetadata().getUid())) {
- Optional<Deployment> deployment =
context.getSecondaryResource(Deployment.class);
- if (deployment.isPresent()) {
- DeploymentStatus status = deployment.get().getStatus();
- DeploymentSpec spec = deployment.get().getSpec();
- if (status != null
- && status.getAvailableReplicas() != null
- && spec.getReplicas().intValue() ==
status.getReplicas()
- && spec.getReplicas().intValue() ==
status.getAvailableReplicas()) {
- // typically it takes a few seconds for the REST server to
be ready
- if (flinkService.isJobManagerPortReady(effectiveConfig)) {
- LOG.info(
- "JobManager deployment {} in namespace {} is
ready",
- flinkApp.getMetadata().getName(),
- flinkApp.getMetadata().getNamespace());
-
jobManagerDeployments.add(flinkApp.getMetadata().getUid());
- if (flinkApp.getStatus().getJobStatus() != null) {
- // pre-existing deployments on operator restart -
proceed with
- // reconciliation
- return JobManagerDeploymentStatus.READY;
- }
- }
- LOG.info(
- "JobManager deployment {} in namespace {} port not
ready",
- flinkApp.getMetadata().getName(),
- flinkApp.getMetadata().getNamespace());
- return JobManagerDeploymentStatus.DEPLOYED_NOT_READY;
- }
- LOG.info(
- "JobManager deployment {} in namespace {} not yet
ready, status {}",
- flinkApp.getMetadata().getName(),
- flinkApp.getMetadata().getNamespace(),
- status);
-
- return JobManagerDeploymentStatus.DEPLOYING;
- }
- return JobManagerDeploymentStatus.MISSING;
- }
- return JobManagerDeploymentStatus.READY;
- }
-
- /**
- * Shuts down the job and deletes all kubernetes resources including k8s
HA resources. It will
- * first perform a graceful shutdown (cancel) before deleting the data if
that is unsuccessful
- * in order to avoid leaking HA metadata to durable storage.
- *
- * <p>This feature is limited at the moment to cleaning up native
kubernetes HA resources, other
- * HA providers like ZK need to be cleaned up manually after deletion.
- */
public DeleteControl shutdownAndDelete(
- String operatorNamespace,
- FlinkDeployment flinkApp,
- Context context,
- Configuration effectiveConfig) {
+ String operatorNamespace, FlinkDeployment flinkApp, Configuration
effectiveConfig) {
- if (checkJobManagerDeployment(flinkApp, context, effectiveConfig)
- == JobManagerDeploymentStatus.READY) {
+ if (JobManagerDeploymentStatus.READY
+ == flinkApp.getStatus().getJobManagerDeploymentStatus()) {
shutdown(flinkApp, effectiveConfig);
Review comment:
Does this need to delete cluster after shutdown?
--
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
To unsubscribe, e-mail: [email protected]
For queries about this service, please contact Infrastructure at:
[email protected]