gyfora commented on code in PR #264:
URL:
https://github.com/apache/flink-kubernetes-operator/pull/264#discussion_r893110295
##########
flink-kubernetes-operator/src/main/java/org/apache/flink/kubernetes/operator/config/KubernetesOperatorConfigOptions.java:
##########
@@ -145,12 +145,25 @@ public class KubernetesOperatorConfigOptions {
.withDescription(
"Whether to enable recovery of missing/deleted
jobmanager deployments.");
+ public static final ConfigOption<Integer>
OPERATOR_SAVEPOINT_HISTORY_COUNT_THRESHOLD =
+
ConfigOptions.key("kubernetes.operator.savepoint.history.count.threshold")
+ .intType()
+ .noDefaultValue()
+ .withDescription("Number threshold of savepoint history
entries to retain.");
+
public static final ConfigOption<Integer>
OPERATOR_SAVEPOINT_HISTORY_MAX_COUNT =
ConfigOptions.key("kubernetes.operator.savepoint.history.max.count")
.intType()
.defaultValue(10)
.withDescription("Maximum number of savepoint history
entries to retain.");
+ public static final ConfigOption<Duration>
OPERATOR_SAVEPOINT_HISTORY_AGE_THRESHOLD =
+
ConfigOptions.key("kubernetes.operator.savepoint.history.age.threshold")
+ .durationType()
+ .noDefaultValue()
+ .withDescription(
+ "Age threshold for savepoint history entries to
retain. Due to lazy clean-up, the most recent savepoint may live longer than
the max age.");
+
Review Comment:
I think the new config keys should be:
`OPERATOR_SAVEPOINT_HISTORY_MAX_AGE.key() + ".threshold"`
`OPERATOR_SAVEPOINT_HISTORY_MAX_COUNT.key() + ".threshold"`
To make this straightforward to users and admins
##########
flink-kubernetes-operator/src/main/java/org/apache/flink/kubernetes/operator/observer/SavepointObserver.java:
##########
@@ -148,13 +149,38 @@ void cleanupSavepointHistory(
// maintain history
List<Savepoint> savepointHistory =
currentSavepointInfo.getSavepointHistory();
- int maxCount =
configManager.getOperatorConfiguration().getSavepointHistoryMaxCount();
+ Integer countThreshold =
+
configManager.getOperatorConfiguration().getSavepointHistoryCountThreshold();
+ int maxCount =
+ deployedConfig.get(
+
KubernetesOperatorConfigOptions.OPERATOR_SAVEPOINT_HISTORY_MAX_COUNT);
+ if (countThreshold != null && maxCount > countThreshold) {
+ LOG.warn(
+ "Maximum number of savepoint history entries to retain
changes to {}. The value of '{}' exceeds the value of '{}'",
+ countThreshold,
+
KubernetesOperatorConfigOptions.OPERATOR_SAVEPOINT_HISTORY_MAX_COUNT.key(),
+
KubernetesOperatorConfigOptions.OPERATOR_SAVEPOINT_HISTORY_COUNT_THRESHOLD
+ .key());
+ maxCount = countThreshold;
+ }
Review Comment:
Can we refactor this pattern into a utility method?
```
public static <T extends Comparable<T>> T
getValueWithThreshold(ConfigOption<T> option, ConfigOption<T> threshold,
Configuraiton config) {
...
}
```
--
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
To unsubscribe, e-mail: [email protected]
For queries about this service, please contact Infrastructure at:
[email protected]