gyfora commented on code in PR #407:
URL: 
https://github.com/apache/flink-kubernetes-operator/pull/407#discussion_r1002922589


##########
docs/content/docs/custom-resource/job-management.md:
##########
@@ -241,6 +241,21 @@ In order this feature to work one must enable [recovery of 
missing job deploymen
 At the moment deployment is considered unhealthy when Flink's restarts count 
reaches `kubernetes.operator.cluster.health-check.restarts.threshold` (default: 
`64`)
 within time window of 
`kubernetes.operator.cluster.health-check.restarts.window` (default: 2 minutes).
 
+## Restart failed job deployments
+
+The operator can restart a failed Flink cluster deployment. This could be 
useful in cases when the job main task is
+able to reconfigure the job to handle these failures.
+
+For example a job could dynamically create the DAG based on some job 
configuration which job configuration could
+change over time. When a task detects a record which could not be handled with 
the current configuration then the task
+should throw a `SuppressRestartsException` to fail the job. If 
`kubernetes.operator.cluster.restart.failed` is set to 
+`true` (default: `false`) then the operator detects the failed job and 
restarts it. When the job restarts then it reads
+the new job configuration and creates the new DAG based on this new 
configuration. The new deployment could handle the
+incoming records and no manual intervention is needed.

Review Comment:
   Let's remove this paragraph, this is very use-case specific.



##########
flink-kubernetes-operator/src/main/java/org/apache/flink/kubernetes/operator/config/KubernetesOperatorConfigOptions.java:
##########
@@ -339,4 +339,11 @@ public static String operatorConfigKey(String key) {
                     .withDescription(
                             "The threshold which is checked against job 
restart count within a configured window. "
                                     + "If the restart count is reaching the 
threshold then full cluster restart is initiated.");
+
+    @Documentation.Section(SECTION_DYNAMIC)
+    public static final ConfigOption<Boolean> OPERATOR_CLUSTER_RESTART_FAILED =
+            operatorConfig("cluster.restart.failed")

Review Comment:
   this should be:
   
   ```
   job.restart.failed
   ```



##########
flink-kubernetes-operator/src/main/java/org/apache/flink/kubernetes/operator/reconciler/deployment/ApplicationReconciler.java:
##########
@@ -262,26 +264,44 @@ public boolean reconcileOtherChanges(
             return true;
         }
 
+        if 
(JobStatus.valueOf(deployment.getStatus().getJobStatus().getState()) == 
JobStatus.FAILED

Review Comment:
   Maybe we could move this logic up one level to the `AbstractJobReconciler` 
this logic and config makes sense for both Applications and SessionJobs



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: [email protected]

For queries about this service, please contact Infrastructure at:
[email protected]

Reply via email to