gaborgsomogyi commented on code in PR #513:
URL:
https://github.com/apache/flink-kubernetes-operator/pull/513#discussion_r1090406757
##########
flink-kubernetes-operator/src/main/java/org/apache/flink/kubernetes/operator/observer/ClusterHealthEvaluator.java:
##########
@@ -75,72 +77,139 @@ public void evaluate(
var lastValidClusterHealthInfo =
getLastValidClusterHealthInfo(clusterInfo);
if (lastValidClusterHealthInfo == null) {
LOG.debug("No last valid health info, skipping health check");
+ observedClusterHealthInfo.setNumRestartsEvaluationTimeStamp(
+ observedClusterHealthInfo.getTimeStamp());
+
observedClusterHealthInfo.setNumCompletedCheckpointsIncreasedTimeStamp(
+ observedClusterHealthInfo.getTimeStamp());
setLastValidClusterHealthInfo(clusterInfo,
observedClusterHealthInfo);
} else if (observedClusterHealthInfo.getTimeStamp()
< lastValidClusterHealthInfo.getTimeStamp()) {
String msg =
"Observed health info timestamp is less than the last
valid health info timestamp, this indicates a bug...";
LOG.error(msg);
throw new IllegalStateException(msg);
- } else if (observedClusterHealthInfo.getNumRestarts()
- < lastValidClusterHealthInfo.getNumRestarts()) {
- LOG.debug(
- "Observed health info number of restarts is less than
the last valid health info number of restarts, skipping health check");
- setLastValidClusterHealthInfo(clusterInfo,
observedClusterHealthInfo);
} else {
- boolean isHealthy = true;
-
LOG.debug("Valid health info exist, checking cluster health");
LOG.debug("Last valid health info: {}",
lastValidClusterHealthInfo);
LOG.debug("Observed health info: {}",
observedClusterHealthInfo);
- var timestampDiffMs =
- observedClusterHealthInfo.getTimeStamp()
- - lastValidClusterHealthInfo.getTimeStamp();
- LOG.debug(
- "Time difference between health infos: {}",
- Duration.ofMillis(timestampDiffMs));
-
- var restartCheckWindow =
-
configuration.get(OPERATOR_CLUSTER_HEALTH_CHECK_RESTARTS_WINDOW);
- var restartCheckWindowMs = restartCheckWindow.toMillis();
- double countMultiplier = (double) restartCheckWindowMs /
(double) timestampDiffMs;
- // If the 2 health info timestamp difference is within the
window then no
- // scaling needed
- if (countMultiplier > 1) {
- countMultiplier = 1;
- }
- long numRestarts =
- (long)
- ((double)
-
(observedClusterHealthInfo.getNumRestarts()
- -
lastValidClusterHealthInfo
-
.getNumRestarts())
- * countMultiplier);
- LOG.debug(
- "Calculated restart count for {} window: {}",
- restartCheckWindow,
- numRestarts);
-
- if (lastValidClusterHealthInfo.getTimeStamp()
- < clock.millis() - restartCheckWindowMs) {
- LOG.debug("Last valid health info timestamp is outside of
the window");
- setLastValidClusterHealthInfo(clusterInfo,
observedClusterHealthInfo);
- }
-
- var restartThreshold =
-
configuration.get(OPERATOR_CLUSTER_HEALTH_CHECK_RESTARTS_THRESHOLD);
- if (numRestarts > restartThreshold) {
- LOG.info("Restart count hit threshold: {}",
restartThreshold);
- setLastValidClusterHealthInfo(clusterInfo,
observedClusterHealthInfo);
- isHealthy = false;
- }
-
- // Update the health flag
- lastValidClusterHealthInfo =
getLastValidClusterHealthInfo(clusterInfo);
+ boolean isHealthy =
+ evaluateRestarts(
+ configuration,
+ clusterInfo,
+ lastValidClusterHealthInfo,
+ observedClusterHealthInfo);
+ isHealthy &=
Review Comment:
Changed.
--
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
To unsubscribe, e-mail: [email protected]
For queries about this service, please contact Infrastructure at:
[email protected]