gyfora commented on code in PR #264:
URL: 
https://github.com/apache/flink-kubernetes-operator/pull/264#discussion_r893110295


##########
flink-kubernetes-operator/src/main/java/org/apache/flink/kubernetes/operator/config/KubernetesOperatorConfigOptions.java:
##########
@@ -145,12 +145,25 @@ public class KubernetesOperatorConfigOptions {
                     .withDescription(
                             "Whether to enable recovery of missing/deleted 
jobmanager deployments.");
 
+    public static final ConfigOption<Integer> 
OPERATOR_SAVEPOINT_HISTORY_COUNT_THRESHOLD =
+            
ConfigOptions.key("kubernetes.operator.savepoint.history.count.threshold")
+                    .intType()
+                    .noDefaultValue()
+                    .withDescription("Number threshold of savepoint history 
entries to retain.");
+
     public static final ConfigOption<Integer> 
OPERATOR_SAVEPOINT_HISTORY_MAX_COUNT =
             
ConfigOptions.key("kubernetes.operator.savepoint.history.max.count")
                     .intType()
                     .defaultValue(10)
                     .withDescription("Maximum number of savepoint history 
entries to retain.");
 
+    public static final ConfigOption<Duration> 
OPERATOR_SAVEPOINT_HISTORY_AGE_THRESHOLD =
+            
ConfigOptions.key("kubernetes.operator.savepoint.history.age.threshold")
+                    .durationType()
+                    .noDefaultValue()
+                    .withDescription(
+                            "Age threshold for savepoint history entries to 
retain. Due to lazy clean-up, the most recent savepoint may live longer than 
the max age.");
+

Review Comment:
   I think the new config keys should be:
   `OPERATOR_SAVEPOINT_HISTORY_MAX_AGE.key() + ".threshold"`
   `OPERATOR_SAVEPOINT_HISTORY_MAX_COUNT.key() + ".threshold"`
   
   To make this straightforward to users and admins



##########
flink-kubernetes-operator/src/main/java/org/apache/flink/kubernetes/operator/observer/SavepointObserver.java:
##########
@@ -148,13 +149,38 @@ void cleanupSavepointHistory(
 
         // maintain history
         List<Savepoint> savepointHistory = 
currentSavepointInfo.getSavepointHistory();
-        int maxCount = 
configManager.getOperatorConfiguration().getSavepointHistoryMaxCount();
+        Integer countThreshold =
+                
configManager.getOperatorConfiguration().getSavepointHistoryCountThreshold();
+        int maxCount =
+                deployedConfig.get(
+                        
KubernetesOperatorConfigOptions.OPERATOR_SAVEPOINT_HISTORY_MAX_COUNT);
+        if (countThreshold != null && maxCount > countThreshold) {
+            LOG.warn(
+                    "Maximum number of savepoint history entries to retain 
changes to {}. The value of '{}' exceeds the value of '{}'",
+                    countThreshold,
+                    
KubernetesOperatorConfigOptions.OPERATOR_SAVEPOINT_HISTORY_MAX_COUNT.key(),
+                    
KubernetesOperatorConfigOptions.OPERATOR_SAVEPOINT_HISTORY_COUNT_THRESHOLD
+                            .key());
+            maxCount = countThreshold;
+        }

Review Comment:
   Can we refactor this pattern into a utility method? 
   ```
   public static <T extends Comparable<T>> T 
getValueWithThreshold(ConfigOption<T> option, ConfigOption<T> threshold, 
Configuraiton config) {
   ...
   }
   ```



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: [email protected]

For queries about this service, please contact Infrastructure at:
[email protected]

Reply via email to