wangyang0918 commented on a change in pull request #26:
URL:
https://github.com/apache/flink-kubernetes-operator/pull/26#discussion_r815556799
##########
File path:
flink-kubernetes-operator/src/main/java/org/apache/flink/kubernetes/operator/reconciler/BaseReconciler.java
##########
@@ -75,29 +83,44 @@ public boolean removeDeployment(FlinkDeployment flinkApp) {
flinkApp.getMetadata().getName(),
flinkApp.getMetadata().getNamespace());
jobManagerDeployments.add(flinkApp.getMetadata().getUid());
- if (flinkApp.getStatus().getJobStatus() != null) {
- // pre-existing deployments on operator restart -
proceed with
- // reconciliation
- return null;
- }
+ return JobDeploymentStatus.READY;
}
LOG.info(
"JobManager deployment {} in namespace {} port not
ready",
flinkApp.getMetadata().getName(),
flinkApp.getMetadata().getNamespace());
- return UpdateControl.updateStatus(flinkApp)
- .rescheduleAfter(PORT_READY_DELAY_SECONDS,
TimeUnit.SECONDS);
+ return JobDeploymentStatus.DEPLOYED_NOT_READY;
}
LOG.info(
"JobManager deployment {} in namespace {} not yet
ready, status {}",
flinkApp.getMetadata().getName(),
flinkApp.getMetadata().getNamespace(),
status);
- // TODO: how frequently do we want here
- return UpdateControl.updateStatus(flinkApp)
- .rescheduleAfter(REFRESH_SECONDS, TimeUnit.SECONDS);
+
+ return JobDeploymentStatus.DEPLOYING;
}
}
- return null;
+ return JobDeploymentStatus.MISSING;
Review comment:
The `MISSING` is confusing here. In my mind, the `MISSING` means
jobmanager deployment could not be found and needs also to trigger a
reconciliation after `REFRESH_SECONDS`.
##########
File path:
flink-kubernetes-operator/src/main/java/org/apache/flink/kubernetes/operator/controller/FlinkDeploymentController.java
##########
@@ -86,16 +85,14 @@ public FlinkDeploymentController(
@Override
public DeleteControl cleanup(FlinkDeployment flinkApp, Context context) {
- LOG.info("Cleaning up application cluster {}",
flinkApp.getMetadata().getName());
- FlinkUtils.deleteCluster(flinkApp, kubernetesClient);
- IngressUtils.updateIngressRules(
- flinkApp,
- FlinkUtils.getEffectiveConfig(flinkApp,
defaultConfig.getFlinkConfig()),
- operatorNamespace,
- kubernetesClient,
- true);
+ LOG.info("Stopping cluster {}", flinkApp.getMetadata().getName());
getReconciler(flinkApp).removeDeployment(flinkApp);
Review comment:
We already remove the deployment in
`BaseReconciler#shutdownAndDelete()`. Right?
##########
File path:
flink-kubernetes-operator/src/main/java/org/apache/flink/kubernetes/operator/reconciler/JobReconciler.java
##########
@@ -74,11 +72,14 @@ public JobReconciler(KubernetesClient kubernetesClient,
FlinkService flinkServic
Optional.ofNullable(jobSpec.getInitialSavepointPath()));
IngressUtils.updateIngressRules(
flinkApp, effectiveConfig, operatorNamespace,
kubernetesClient, false);
+ return UpdateControl.updateStatus(flinkApp)
Review comment:
Maybe we need to return
`JobDeploymentStatus.DEPLOYING.toUpdateControl(flinkApp)` since we just call
the `deployFlinkJob` above. The jobmanager deployment is still deploying.
##########
File path:
flink-kubernetes-operator/src/main/java/org/apache/flink/kubernetes/operator/reconciler/JobReconciler.java
##########
@@ -179,4 +180,22 @@ private void printCancelLogs(UpgradeMode upgradeMode,
String name) {
savepointOpt.ifPresent(jobStatus::setSavepointLocation);
return savepointOpt;
}
+
+ @Override
+ protected void shutdown(FlinkDeployment flinkApp, Configuration
effectiveConfig) {
+ if (flinkApp.getStatus().getJobStatus() != null
+ && flinkApp.getStatus().getJobStatus().getJobId() != null) {
+ try {
+ flinkService.cancelJob(
+
JobID.fromHexString(flinkApp.getStatus().getJobStatus().getJobId()),
+ UpgradeMode.STATELESS,
+ effectiveConfig);
+ return;
+ } catch (Exception e) {
+ LOG.error("Could not shut down cluster gracefully,
deleting...", e);
+ }
+ }
+
+ FlinkUtils.deleteCluster(flinkApp, kubernetesClient, true);
Review comment:
Do we only need to manually delete the cluster when cancellation failed?
##########
File path:
flink-kubernetes-operator/src/main/java/org/apache/flink/kubernetes/operator/service/FlinkService.java
##########
@@ -171,14 +171,14 @@ public boolean isJobManagerPortReady(Configuration
config) {
return savepointOpt;
}
- public void stopSessionCluster(FlinkDeployment deployment, Configuration
conf)
- throws Exception {
- FlinkUtils.deleteCluster(deployment, kubernetesClient);
+ public void stopSessionCluster(
+ FlinkDeployment deployment, Configuration conf, boolean
deleteHaData) {
+ FlinkUtils.deleteCluster(deployment, kubernetesClient, deleteHaData);
waitForClusterShutdown(conf);
}
/** We need this due to the buggy flink kube cluster client behaviour for
now. */
Review comment:
Out the scope of this PR.
I am curious what is the bug of flink kube cluster client so that we could
fix it in the upstream project.
##########
File path:
flink-kubernetes-operator/src/main/java/org/apache/flink/kubernetes/operator/reconciler/BaseReconciler.java
##########
@@ -54,11 +65,8 @@ public boolean removeDeployment(FlinkDeployment flinkApp) {
Configuration effectiveConfig)
throws Exception;
- protected UpdateControl<FlinkDeployment> checkJobManagerDeployment(
- FlinkDeployment flinkApp,
- Context context,
- Configuration effectiveConfig,
- FlinkService flinkService) {
+ protected JobDeploymentStatus checkJobManagerDeployment(
+ FlinkDeployment flinkApp, Context context, Configuration
effectiveConfig) {
if (!jobManagerDeployments.contains(flinkApp.getMetadata().getUid())) {
Review comment:
Out the scope of this PR.
Could we check the JobManager deployment for every reconciliation instead of
using the cached `jobManagerDeployments`? We are already using an informer to
cache the deployments locally. So it is not a heavy operation. After then, we
could also handle the jobmanager pod crash scenario.
--
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
To unsubscribe, e-mail: [email protected]
For queries about this service, please contact Infrastructure at:
[email protected]