wangyang0918 commented on code in PR #195:
URL: 
https://github.com/apache/flink-kubernetes-operator/pull/195#discussion_r866602275


##########
flink-kubernetes-operator/src/main/java/org/apache/flink/kubernetes/operator/observer/JobStatusObserver.java:
##########
@@ -57,7 +55,7 @@ public boolean observe(JobStatus jobStatus, Configuration 
deployedConfig, CTX ct
             clusterJobStatuses = new 
ArrayList<>(flinkService.listJobs(deployedConfig));
         } catch (Exception e) {
             LOG.error("Exception while listing jobs", e);
-            jobStatus.setState(JOB_STATE_UNKNOWN);
+            
jobStatus.setState(org.apache.flink.api.common.JobStatus.RECONCILING.name());

Review Comment:
   I still hesitate to replace the `UNKNOWN` with `RECONCILING` since we could 
not differentiate whether the job is really reconciling or just failing to get 
the job status.



##########
flink-kubernetes-operator/src/main/java/org/apache/flink/kubernetes/operator/reconciler/deployment/ApplicationReconciler.java:
##########
@@ -173,6 +174,16 @@ private void rollbackApplication(FlinkDeployment flinkApp) 
throws Exception {
                 flinkApp.getMetadata(), rollbackSpec, rollbackConfig, 
kubernetesClient);
     }
 
+    private void recoverJmDeployment(FlinkDeployment deployment) throws 
Exception {
+        LOG.info("Missing Flink Cluster deployment, trying to recover...");

Review Comment:
   We might also need to print the FlinkDeployment name here.



##########
flink-kubernetes-operator/src/main/java/org/apache/flink/kubernetes/operator/config/FlinkConfigBuilder.java:
##########
@@ -112,30 +132,31 @@ protected FlinkConfigBuilder applyFlinkConfiguration() {
         }
 
         // Adapt default rest service type from 1.15+
-        if (!effectiveConfig.contains(REST_SERVICE_EXPOSED_TYPE)) {
-            effectiveConfig.set(
-                    REST_SERVICE_EXPOSED_TYPE,
-                    KubernetesConfigOptions.ServiceExposedType.ClusterIP);
-        }
+        setDefaultConf(
+                REST_SERVICE_EXPOSED_TYPE, 
KubernetesConfigOptions.ServiceExposedType.ClusterIP);
 
         if (spec.getJob() != null) {
-            if (!effectiveConfig.contains(CANCEL_ENABLE)) {
-                // Set 'web.cancel.enable' to false for application 
deployments to avoid users
-                // accidentally cancelling jobs.
-                effectiveConfig.set(CANCEL_ENABLE, false);
-            }
+            // Set 'web.cancel.enable' to false for application deployments to 
avoid users
+            // accidentally cancelling jobs.
+            setDefaultConf(CANCEL_ENABLE, false);
+
             // With last-state upgrade mode, set the default value of
             // 'execution.checkpointing.interval'
             // to 5 minutes when HA is enabled.
-            if (spec.getJob().getUpgradeMode() == UpgradeMode.LAST_STATE
-                    && !effectiveConfig.contains(
-                            
ExecutionCheckpointingOptions.CHECKPOINTING_INTERVAL)) {
-                effectiveConfig.set(
+            if (spec.getJob().getUpgradeMode() == UpgradeMode.LAST_STATE) {
+                setDefaultConf(
                         ExecutionCheckpointingOptions.CHECKPOINTING_INTERVAL,
                         DEFAULT_CHECKPOINTING_INTERVAL);
             }
+
+            // We need to keep the application clusters around for proper 
operator behaviour
+            effectiveConfig.set(SHUTDOWN_ON_APPLICATION_FINISH, false);
+            if 
(HighAvailabilityMode.isHighAvailabilityModeActivated(effectiveConfig)) {

Review Comment:
   Why do we not need to submit a failed job when HA disabled?



##########
flink-kubernetes-operator/src/main/java/org/apache/flink/kubernetes/operator/observer/deployment/AbstractDeploymentObserver.java:
##########
@@ -211,6 +220,22 @@ protected boolean isSuspendedJob(FlinkDeployment 
deployment) {
                 && lastReconciledSpec.getJob().getState() == 
JobState.SUSPENDED;
     }
 
+    private void onMissingDeployment(FlinkDeployment deployment) {
+        String err = "Missing JobManager deployment";
+        logger.error(err);
+        Event event =
+                DeploymentFailedException.asEvent(
+                        new DeploymentFailedException(
+                                
DeploymentFailedException.COMPONENT_JOBMANAGER, "Error", err),
+                        deployment);
+        kubernetesClient

Review Comment:
   The age of the missing deloyment event is unknown.
   
   ```
   Events:
     Type     Reason                         Age        From                  
Message
     ----     ------                         ----       ----                  
-------
     Error    Missing JobManager deployment  <unknown>  JobManagerDeployment  
Missing JobManager deployment
   ```



##########
flink-kubernetes-operator/src/main/java/org/apache/flink/kubernetes/operator/observer/deployment/AbstractDeploymentObserver.java:
##########
@@ -211,6 +220,22 @@ protected boolean isSuspendedJob(FlinkDeployment 
deployment) {
                 && lastReconciledSpec.getJob().getState() == 
JobState.SUSPENDED;
     }
 
+    private void onMissingDeployment(FlinkDeployment deployment) {
+        String err = "Missing JobManager deployment";
+        logger.error(err);

Review Comment:
   It will be great if we also print the missing FlinkDeployment name.



##########
flink-kubernetes-operator/src/main/java/org/apache/flink/kubernetes/operator/reconciler/ReconciliationUtils.java:
##########
@@ -255,4 +259,40 @@ public static boolean shouldRollBack(
                 .minus(readinessTimeout)
                 
.isAfter(Instant.ofEpochMilli(reconciliationStatus.getReconciliationTimestamp()));
     }
+
+    public static boolean deploymentRecoveryEnabled(Configuration conf) {
+        return conf.getOptional(
+                        
KubernetesOperatorConfigOptions.OPERATOR_RECOVER_JM_DEPLOYMENT_ENABLED)
+                .orElse(
+                        conf.get(FlinkConfigBuilder.FLINK_VERSION)
+                                        .isNewerVersionThan(FlinkVersion.v1_14)
+                                ? true
+                                : false);
+    }
+
+    public static boolean 
jobManagerMissingForRunningDeployment(FlinkDeploymentStatus status) {
+        return 
status.getReconciliationStatus().deserializeLastReconciledSpec().getJob().getState()
+                        == JobState.RUNNING
+                && status.getJobManagerDeploymentStatus() == 
JobManagerDeploymentStatus.MISSING;
+    }
+
+    public static boolean isJobInTerminalState(FlinkDeploymentStatus status) {
+        JobManagerDeploymentStatus deploymentStatus = 
status.getJobManagerDeploymentStatus();
+        if (deploymentStatus == JobManagerDeploymentStatus.MISSING) {
+            return true;
+        }
+
+        String jobState = status.getJobStatus().getState();
+
+        return deploymentStatus == JobManagerDeploymentStatus.READY
+                && 
org.apache.flink.api.common.JobStatus.valueOf(jobState).isTerminalState();

Review Comment:
   I am afraid we might need to use `isGloballyTerminalState` here.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscr...@flink.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org

Reply via email to