wangyang0918 commented on code in PR #195:
URL:
https://github.com/apache/flink-kubernetes-operator/pull/195#discussion_r866602275
##########
flink-kubernetes-operator/src/main/java/org/apache/flink/kubernetes/operator/observer/JobStatusObserver.java:
##########
@@ -57,7 +55,7 @@ public boolean observe(JobStatus jobStatus, Configuration
deployedConfig, CTX ct
clusterJobStatuses = new
ArrayList<>(flinkService.listJobs(deployedConfig));
} catch (Exception e) {
LOG.error("Exception while listing jobs", e);
- jobStatus.setState(JOB_STATE_UNKNOWN);
+
jobStatus.setState(org.apache.flink.api.common.JobStatus.RECONCILING.name());
Review Comment:
I still hesitate to replace the `UNKNOWN` with `RECONCILING` since we could
not differentiate whether the job is really reconciling or just failing to get
the job status.
##########
flink-kubernetes-operator/src/main/java/org/apache/flink/kubernetes/operator/reconciler/deployment/ApplicationReconciler.java:
##########
@@ -173,6 +174,16 @@ private void rollbackApplication(FlinkDeployment flinkApp)
throws Exception {
flinkApp.getMetadata(), rollbackSpec, rollbackConfig,
kubernetesClient);
}
+ private void recoverJmDeployment(FlinkDeployment deployment) throws
Exception {
+ LOG.info("Missing Flink Cluster deployment, trying to recover...");
Review Comment:
We might also need to print the FlinkDeployment name here.
##########
flink-kubernetes-operator/src/main/java/org/apache/flink/kubernetes/operator/config/FlinkConfigBuilder.java:
##########
@@ -112,30 +132,31 @@ protected FlinkConfigBuilder applyFlinkConfiguration() {
}
// Adapt default rest service type from 1.15+
- if (!effectiveConfig.contains(REST_SERVICE_EXPOSED_TYPE)) {
- effectiveConfig.set(
- REST_SERVICE_EXPOSED_TYPE,
- KubernetesConfigOptions.ServiceExposedType.ClusterIP);
- }
+ setDefaultConf(
+ REST_SERVICE_EXPOSED_TYPE,
KubernetesConfigOptions.ServiceExposedType.ClusterIP);
if (spec.getJob() != null) {
- if (!effectiveConfig.contains(CANCEL_ENABLE)) {
- // Set 'web.cancel.enable' to false for application
deployments to avoid users
- // accidentally cancelling jobs.
- effectiveConfig.set(CANCEL_ENABLE, false);
- }
+ // Set 'web.cancel.enable' to false for application deployments to
avoid users
+ // accidentally cancelling jobs.
+ setDefaultConf(CANCEL_ENABLE, false);
+
// With last-state upgrade mode, set the default value of
// 'execution.checkpointing.interval'
// to 5 minutes when HA is enabled.
- if (spec.getJob().getUpgradeMode() == UpgradeMode.LAST_STATE
- && !effectiveConfig.contains(
-
ExecutionCheckpointingOptions.CHECKPOINTING_INTERVAL)) {
- effectiveConfig.set(
+ if (spec.getJob().getUpgradeMode() == UpgradeMode.LAST_STATE) {
+ setDefaultConf(
ExecutionCheckpointingOptions.CHECKPOINTING_INTERVAL,
DEFAULT_CHECKPOINTING_INTERVAL);
}
+
+ // We need to keep the application clusters around for proper
operator behaviour
+ effectiveConfig.set(SHUTDOWN_ON_APPLICATION_FINISH, false);
+ if
(HighAvailabilityMode.isHighAvailabilityModeActivated(effectiveConfig)) {
Review Comment:
Why do we not need to submit a failed job when HA disabled?
##########
flink-kubernetes-operator/src/main/java/org/apache/flink/kubernetes/operator/observer/deployment/AbstractDeploymentObserver.java:
##########
@@ -211,6 +220,22 @@ protected boolean isSuspendedJob(FlinkDeployment
deployment) {
&& lastReconciledSpec.getJob().getState() ==
JobState.SUSPENDED;
}
+ private void onMissingDeployment(FlinkDeployment deployment) {
+ String err = "Missing JobManager deployment";
+ logger.error(err);
+ Event event =
+ DeploymentFailedException.asEvent(
+ new DeploymentFailedException(
+
DeploymentFailedException.COMPONENT_JOBMANAGER, "Error", err),
+ deployment);
+ kubernetesClient
Review Comment:
The age of the missing deloyment event is unknown.
```
Events:
Type Reason Age From
Message
---- ------ ---- ----
-------
Error Missing JobManager deployment <unknown> JobManagerDeployment
Missing JobManager deployment
```
##########
flink-kubernetes-operator/src/main/java/org/apache/flink/kubernetes/operator/observer/deployment/AbstractDeploymentObserver.java:
##########
@@ -211,6 +220,22 @@ protected boolean isSuspendedJob(FlinkDeployment
deployment) {
&& lastReconciledSpec.getJob().getState() ==
JobState.SUSPENDED;
}
+ private void onMissingDeployment(FlinkDeployment deployment) {
+ String err = "Missing JobManager deployment";
+ logger.error(err);
Review Comment:
It will be great if we also print the missing FlinkDeployment name.
##########
flink-kubernetes-operator/src/main/java/org/apache/flink/kubernetes/operator/reconciler/ReconciliationUtils.java:
##########
@@ -255,4 +259,40 @@ public static boolean shouldRollBack(
.minus(readinessTimeout)
.isAfter(Instant.ofEpochMilli(reconciliationStatus.getReconciliationTimestamp()));
}
+
+ public static boolean deploymentRecoveryEnabled(Configuration conf) {
+ return conf.getOptional(
+
KubernetesOperatorConfigOptions.OPERATOR_RECOVER_JM_DEPLOYMENT_ENABLED)
+ .orElse(
+ conf.get(FlinkConfigBuilder.FLINK_VERSION)
+ .isNewerVersionThan(FlinkVersion.v1_14)
+ ? true
+ : false);
+ }
+
+ public static boolean
jobManagerMissingForRunningDeployment(FlinkDeploymentStatus status) {
+ return
status.getReconciliationStatus().deserializeLastReconciledSpec().getJob().getState()
+ == JobState.RUNNING
+ && status.getJobManagerDeploymentStatus() ==
JobManagerDeploymentStatus.MISSING;
+ }
+
+ public static boolean isJobInTerminalState(FlinkDeploymentStatus status) {
+ JobManagerDeploymentStatus deploymentStatus =
status.getJobManagerDeploymentStatus();
+ if (deploymentStatus == JobManagerDeploymentStatus.MISSING) {
+ return true;
+ }
+
+ String jobState = status.getJobStatus().getState();
+
+ return deploymentStatus == JobManagerDeploymentStatus.READY
+ &&
org.apache.flink.api.common.JobStatus.valueOf(jobState).isTerminalState();
Review Comment:
I am afraid we might need to use `isGloballyTerminalState` here.
--
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
To unsubscribe, e-mail: [email protected]
For queries about this service, please contact Infrastructure at:
[email protected]