gyfora commented on code in PR #513:
URL: 
https://github.com/apache/flink-kubernetes-operator/pull/513#discussion_r1090351779


##########
flink-kubernetes-operator/src/main/java/org/apache/flink/kubernetes/operator/config/KubernetesOperatorConfigOptions.java:
##########
@@ -379,6 +379,24 @@ public static String operatorConfigKey(String key) {
                             "The threshold which is checked against job 
restart count within a configured window. "
                                     + "If the restart count is reaching the 
threshold then full cluster restart is initiated.");
 
+    @Documentation.Section(SECTION_DYNAMIC)
+    public static final ConfigOption<Boolean>
+            OPERATOR_CLUSTER_HEALTH_CHECK_CHECKPOINT_PROGRESS_ENABLED =
+                    
operatorConfig("cluster.health-check.checkpoint-progress.enabled")
+                            .booleanType()
+                            .defaultValue(false)
+                            .withDescription(
+                                    "Whether to enable checkpoint progress 
health check for clusters.");
+
+    @Documentation.Section(SECTION_DYNAMIC)
+    public static final ConfigOption<Duration>
+            OPERATOR_CLUSTER_HEALTH_CHECK_CHECKPOINT_PROGRESS_WINDOW =
+                    
operatorConfig("cluster.health-check.checkpoint-progress.window")
+                            .durationType()
+                            .defaultValue(Duration.ofMinutes(5))
+                            .withDescription(
+                                    "The duration of the time window where job 
completed checkpoint count measured. This must be bigger than checkpointing 
interval.");

Review Comment:
   This description should be something like:
   ```
   If no checkpoints are completed within the defined time window, the job is 
considered unhealthy. This must be bigger than checkpointing interval.
   ```



##########
flink-kubernetes-operator/src/main/java/org/apache/flink/kubernetes/operator/observer/ClusterHealthEvaluator.java:
##########
@@ -75,72 +77,139 @@ public void evaluate(
             var lastValidClusterHealthInfo = 
getLastValidClusterHealthInfo(clusterInfo);
             if (lastValidClusterHealthInfo == null) {
                 LOG.debug("No last valid health info, skipping health check");

Review Comment:
   I think we should clear all health info (restarts, checkpoint progress) 
during an upgrade/suspend, so we guarantee that it is reinitialized after the 
jm starts after the upgrade.



##########
flink-kubernetes-operator/src/main/java/org/apache/flink/kubernetes/operator/observer/ClusterHealthEvaluator.java:
##########
@@ -75,72 +77,139 @@ public void evaluate(
             var lastValidClusterHealthInfo = 
getLastValidClusterHealthInfo(clusterInfo);
             if (lastValidClusterHealthInfo == null) {
                 LOG.debug("No last valid health info, skipping health check");
+                observedClusterHealthInfo.setNumRestartsEvaluationTimeStamp(
+                        observedClusterHealthInfo.getTimeStamp());
+                
observedClusterHealthInfo.setNumCompletedCheckpointsIncreasedTimeStamp(
+                        observedClusterHealthInfo.getTimeStamp());
                 setLastValidClusterHealthInfo(clusterInfo, 
observedClusterHealthInfo);
             } else if (observedClusterHealthInfo.getTimeStamp()
                     < lastValidClusterHealthInfo.getTimeStamp()) {
                 String msg =
                         "Observed health info timestamp is less than the last 
valid health info timestamp, this indicates a bug...";
                 LOG.error(msg);
                 throw new IllegalStateException(msg);
-            } else if (observedClusterHealthInfo.getNumRestarts()
-                    < lastValidClusterHealthInfo.getNumRestarts()) {
-                LOG.debug(
-                        "Observed health info number of restarts is less than 
the last valid health info number of restarts, skipping health check");
-                setLastValidClusterHealthInfo(clusterInfo, 
observedClusterHealthInfo);
             } else {
-                boolean isHealthy = true;
-
                 LOG.debug("Valid health info exist, checking cluster health");
                 LOG.debug("Last valid health info: {}", 
lastValidClusterHealthInfo);
                 LOG.debug("Observed health info: {}", 
observedClusterHealthInfo);
 
-                var timestampDiffMs =
-                        observedClusterHealthInfo.getTimeStamp()
-                                - lastValidClusterHealthInfo.getTimeStamp();
-                LOG.debug(
-                        "Time difference between health infos: {}",
-                        Duration.ofMillis(timestampDiffMs));
-
-                var restartCheckWindow =
-                        
configuration.get(OPERATOR_CLUSTER_HEALTH_CHECK_RESTARTS_WINDOW);
-                var restartCheckWindowMs = restartCheckWindow.toMillis();
-                double countMultiplier = (double) restartCheckWindowMs / 
(double) timestampDiffMs;
-                // If the 2 health info timestamp difference is within the 
window then no
-                // scaling needed
-                if (countMultiplier > 1) {
-                    countMultiplier = 1;
-                }
-                long numRestarts =
-                        (long)
-                                ((double)
-                                                
(observedClusterHealthInfo.getNumRestarts()
-                                                        - 
lastValidClusterHealthInfo
-                                                                
.getNumRestarts())
-                                        * countMultiplier);
-                LOG.debug(
-                        "Calculated restart count for {} window: {}",
-                        restartCheckWindow,
-                        numRestarts);
-
-                if (lastValidClusterHealthInfo.getTimeStamp()
-                        < clock.millis() - restartCheckWindowMs) {
-                    LOG.debug("Last valid health info timestamp is outside of 
the window");
-                    setLastValidClusterHealthInfo(clusterInfo, 
observedClusterHealthInfo);
-                }
-
-                var restartThreshold =
-                        
configuration.get(OPERATOR_CLUSTER_HEALTH_CHECK_RESTARTS_THRESHOLD);
-                if (numRestarts > restartThreshold) {
-                    LOG.info("Restart count hit threshold: {}", 
restartThreshold);
-                    setLastValidClusterHealthInfo(clusterInfo, 
observedClusterHealthInfo);
-                    isHealthy = false;
-                }
-
-                // Update the health flag
-                lastValidClusterHealthInfo = 
getLastValidClusterHealthInfo(clusterInfo);
+                boolean isHealthy =
+                        evaluateRestarts(
+                                configuration,
+                                clusterInfo,
+                                lastValidClusterHealthInfo,
+                                observedClusterHealthInfo);
+                isHealthy &=

Review Comment:
   let's use
   ```
   isHealthy = evaluteRestarts(...) && evaluateCheckpoints(...)
   ```
   for clarity



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: [email protected]

For queries about this service, please contact Infrastructure at:
[email protected]

Reply via email to