This is an automated email from the ASF dual-hosted git repository.

gyfora pushed a commit to branch main
in repository https://gitbox.apache.org/repos/asf/flink-kubernetes-operator.git


The following commit(s) were added to refs/heads/main by this push:
     new 6df91ae1 [FLINK-31795] Add config to disable savepoint cleanup
6df91ae1 is described below

commit 6df91ae17873c274b2b993ab7bef30f1297e5c3e
Author: darenwkt <108749182+daren...@users.noreply.github.com>
AuthorDate: Wed May 3 16:39:16 2023 +0100

    [FLINK-31795] Add config to disable savepoint cleanup
---
 .../shortcodes/generated/dynamic_section.html      |  6 ++++
 .../kubernetes_operator_config_configuration.html  |  6 ++++
 .../config/FlinkOperatorConfiguration.java         |  2 +-
 .../config/KubernetesOperatorConfigOptions.java    |  7 ++++
 .../operator/observer/SavepointObserver.java       | 12 +++++--
 .../utils/FlinkResourceExceptionUtils.java         |  2 +-
 .../operator/observer/SavepointObserverTest.java   | 38 ++++++++++++++++++++++
 .../utils/FlinkResourceExceptionUtilsTest.java     |  4 +--
 8 files changed, 71 insertions(+), 6 deletions(-)

diff --git a/docs/layouts/shortcodes/generated/dynamic_section.html 
b/docs/layouts/shortcodes/generated/dynamic_section.html
index 3179183f..9662e8f1 100644
--- a/docs/layouts/shortcodes/generated/dynamic_section.html
+++ b/docs/layouts/shortcodes/generated/dynamic_section.html
@@ -98,6 +98,12 @@
             <td>Boolean</td>
             <td>Configure the array merge behaviour during pod merging. Arrays 
can be either merged by position or name matching.</td>
         </tr>
+        <tr>
+            <td><h5>kubernetes.operator.savepoint.cleanup.enabled</h5></td>
+            <td style="word-wrap: break-word;">true</td>
+            <td>Boolean</td>
+            <td>Whether to enable clean up of savepoint history.</td>
+        </tr>
         <tr>
             <td><h5>kubernetes.operator.savepoint.format.type</h5></td>
             <td style="word-wrap: break-word;">CANONICAL</td>
diff --git 
a/docs/layouts/shortcodes/generated/kubernetes_operator_config_configuration.html
 
b/docs/layouts/shortcodes/generated/kubernetes_operator_config_configuration.html
index 5495b2af..7d87aa71 100644
--- 
a/docs/layouts/shortcodes/generated/kubernetes_operator_config_configuration.html
+++ 
b/docs/layouts/shortcodes/generated/kubernetes_operator_config_configuration.html
@@ -272,6 +272,12 @@
             <td>Integer</td>
             <td>Max attempts of automatic reconcile retries on recoverable 
errors.</td>
         </tr>
+        <tr>
+            <td><h5>kubernetes.operator.savepoint.cleanup.enabled</h5></td>
+            <td style="word-wrap: break-word;">true</td>
+            <td>Boolean</td>
+            <td>Whether to enable clean up of savepoint history.</td>
+        </tr>
         <tr>
             <td><h5>kubernetes.operator.savepoint.format.type</h5></td>
             <td style="word-wrap: break-word;">CANONICAL</td>
diff --git 
a/flink-kubernetes-operator/src/main/java/org/apache/flink/kubernetes/operator/config/FlinkOperatorConfiguration.java
 
b/flink-kubernetes-operator/src/main/java/org/apache/flink/kubernetes/operator/config/FlinkOperatorConfiguration.java
index b29077aa..f32e57d8 100644
--- 
a/flink-kubernetes-operator/src/main/java/org/apache/flink/kubernetes/operator/config/FlinkOperatorConfiguration.java
+++ 
b/flink-kubernetes-operator/src/main/java/org/apache/flink/kubernetes/operator/config/FlinkOperatorConfiguration.java
@@ -62,7 +62,7 @@ public class FlinkOperatorConfiguration {
     Integer savepointHistoryCountThreshold;
     Duration savepointHistoryAgeThreshold;
     RetryConfiguration retryConfiguration;
-    Boolean exceptionStackTraceEnabled;
+    boolean exceptionStackTraceEnabled;
     int exceptionStackTraceLengthThreshold;
     int exceptionFieldLengthThreshold;
     int exceptionThrowableCountThreshold;
diff --git 
a/flink-kubernetes-operator/src/main/java/org/apache/flink/kubernetes/operator/config/KubernetesOperatorConfigOptions.java
 
b/flink-kubernetes-operator/src/main/java/org/apache/flink/kubernetes/operator/config/KubernetesOperatorConfigOptions.java
index 820eb221..3b296758 100644
--- 
a/flink-kubernetes-operator/src/main/java/org/apache/flink/kubernetes/operator/config/KubernetesOperatorConfigOptions.java
+++ 
b/flink-kubernetes-operator/src/main/java/org/apache/flink/kubernetes/operator/config/KubernetesOperatorConfigOptions.java
@@ -189,6 +189,13 @@ public class KubernetesOperatorConfigOptions {
                     .withDescription(
                             "Whether to enable recovery of missing/deleted 
jobmanager deployments.");
 
+    @Documentation.Section(SECTION_DYNAMIC)
+    public static final ConfigOption<Boolean> 
OPERATOR_SAVEPOINT_CLEANUP_ENABLED =
+            operatorConfig("savepoint.cleanup.enabled")
+                    .booleanType()
+                    .defaultValue(true)
+                    .withDescription("Whether to enable clean up of savepoint 
history.");
+
     @Documentation.Section(SECTION_DYNAMIC)
     public static final ConfigOption<Integer> 
OPERATOR_SAVEPOINT_HISTORY_MAX_COUNT =
             operatorConfig("savepoint.history.max.count")
diff --git 
a/flink-kubernetes-operator/src/main/java/org/apache/flink/kubernetes/operator/observer/SavepointObserver.java
 
b/flink-kubernetes-operator/src/main/java/org/apache/flink/kubernetes/operator/observer/SavepointObserver.java
index 8b7042e1..fef3a0a0 100644
--- 
a/flink-kubernetes-operator/src/main/java/org/apache/flink/kubernetes/operator/observer/SavepointObserver.java
+++ 
b/flink-kubernetes-operator/src/main/java/org/apache/flink/kubernetes/operator/observer/SavepointObserver.java
@@ -141,6 +141,9 @@ public class SavepointObserver<
             FlinkService flinkService,
             SavepointInfo currentSavepointInfo,
             Configuration observeConfig) {
+        final boolean savepointCleanupEnabled =
+                observeConfig.getBoolean(
+                        
KubernetesOperatorConfigOptions.OPERATOR_SAVEPOINT_CLEANUP_ENABLED);
 
         // maintain history
         List<Savepoint> savepointHistory = 
currentSavepointInfo.getSavepointHistory();
@@ -161,7 +164,10 @@ public class SavepointObserver<
                                         .getSavepointHistoryCountThreshold()));
         while (savepointHistory.size() > maxCount) {
             // remove oldest entries
-            disposeSavepointQuietly(flinkService, savepointHistory.remove(0), 
observeConfig);
+            Savepoint sp = savepointHistory.remove(0);
+            if (savepointCleanupEnabled) {
+                disposeSavepointQuietly(flinkService, sp, observeConfig);
+            }
         }
 
         Duration maxAge =
@@ -175,7 +181,9 @@ public class SavepointObserver<
             Savepoint sp = it.next();
             if (sp.getTimeStamp() < maxTms && sp != lastSavepoint) {
                 it.remove();
-                disposeSavepointQuietly(flinkService, sp, observeConfig);
+                if (savepointCleanupEnabled) {
+                    disposeSavepointQuietly(flinkService, sp, observeConfig);
+                }
             }
         }
     }
diff --git 
a/flink-kubernetes-operator/src/main/java/org/apache/flink/kubernetes/operator/utils/FlinkResourceExceptionUtils.java
 
b/flink-kubernetes-operator/src/main/java/org/apache/flink/kubernetes/operator/utils/FlinkResourceExceptionUtils.java
index fad62b6f..78b26414 100644
--- 
a/flink-kubernetes-operator/src/main/java/org/apache/flink/kubernetes/operator/utils/FlinkResourceExceptionUtils.java
+++ 
b/flink-kubernetes-operator/src/main/java/org/apache/flink/kubernetes/operator/utils/FlinkResourceExceptionUtils.java
@@ -41,7 +41,7 @@ public final class FlinkResourceExceptionUtils {
     public static <R extends AbstractFlinkResource> void 
updateFlinkResourceException(
             Throwable throwable, R resource, FlinkOperatorConfiguration conf) {
 
-        boolean stackTraceEnabled = conf.getExceptionStackTraceEnabled();
+        boolean stackTraceEnabled = conf.isExceptionStackTraceEnabled();
         int stackTraceLengthThreshold = 
conf.getExceptionStackTraceLengthThreshold();
         int lengthThreshold = conf.getExceptionFieldLengthThreshold();
         int throwableCountThreshold = 
conf.getExceptionThrowableCountThreshold();
diff --git 
a/flink-kubernetes-operator/src/test/java/org/apache/flink/kubernetes/operator/observer/SavepointObserverTest.java
 
b/flink-kubernetes-operator/src/test/java/org/apache/flink/kubernetes/operator/observer/SavepointObserverTest.java
index ea56aac4..2b1bf201 100644
--- 
a/flink-kubernetes-operator/src/test/java/org/apache/flink/kubernetes/operator/observer/SavepointObserverTest.java
+++ 
b/flink-kubernetes-operator/src/test/java/org/apache/flink/kubernetes/operator/observer/SavepointObserverTest.java
@@ -39,6 +39,7 @@ import org.junit.jupiter.api.Test;
 
 import java.time.Duration;
 import java.util.Collections;
+import java.util.List;
 
 import static org.junit.jupiter.api.Assertions.assertEquals;
 import static org.junit.jupiter.api.Assertions.assertFalse;
@@ -138,6 +139,43 @@ public class SavepointObserverTest extends 
OperatorTestBase {
         configManager.updateDefaultConfig(new Configuration());
     }
 
+    @Test
+    public void testDisabledDispose() {
+        Configuration conf = new Configuration();
+        
conf.set(KubernetesOperatorConfigOptions.OPERATOR_SAVEPOINT_CLEANUP_ENABLED, 
false);
+        
conf.set(KubernetesOperatorConfigOptions.OPERATOR_SAVEPOINT_HISTORY_MAX_COUNT, 
1000);
+        conf.set(
+                
KubernetesOperatorConfigOptions.OPERATOR_SAVEPOINT_HISTORY_MAX_AGE,
+                Duration.ofDays(100L));
+
+        configManager.updateDefaultConfig(conf);
+
+        SavepointInfo spInfo = new SavepointInfo();
+
+        Savepoint sp1 =
+                new Savepoint(
+                        9999999999999998L,
+                        "sp1",
+                        SavepointTriggerType.MANUAL,
+                        SavepointFormatType.CANONICAL,
+                        123L);
+        spInfo.updateLastSavepoint(sp1);
+        observer.cleanupSavepointHistory(flinkService, spInfo, conf);
+
+        Savepoint sp2 =
+                new Savepoint(
+                        9999999999999999L,
+                        "sp2",
+                        SavepointTriggerType.MANUAL,
+                        SavepointFormatType.CANONICAL,
+                        123L);
+        spInfo.updateLastSavepoint(sp2);
+        observer.cleanupSavepointHistory(flinkService, spInfo, conf);
+        Assertions.assertIterableEquals(List.of(sp1, sp2), 
spInfo.getSavepointHistory());
+        Assertions.assertIterableEquals(
+                Collections.emptyList(), flinkService.getDisposedSavepoints());
+    }
+
     @Test
     public void testPeriodicSavepoint() {
         var conf = new Configuration();
diff --git 
a/flink-kubernetes-operator/src/test/java/org/apache/flink/kubernetes/operator/utils/FlinkResourceExceptionUtilsTest.java
 
b/flink-kubernetes-operator/src/test/java/org/apache/flink/kubernetes/operator/utils/FlinkResourceExceptionUtilsTest.java
index 13224ec0..13ca0861 100644
--- 
a/flink-kubernetes-operator/src/test/java/org/apache/flink/kubernetes/operator/utils/FlinkResourceExceptionUtilsTest.java
+++ 
b/flink-kubernetes-operator/src/test/java/org/apache/flink/kubernetes/operator/utils/FlinkResourceExceptionUtilsTest.java
@@ -58,7 +58,7 @@ public class FlinkResourceExceptionUtilsTest {
         assertEquals(
                 
KubernetesOperatorConfigOptions.OPERATOR_EXCEPTION_STACK_TRACE_ENABLED
                         .defaultValue(),
-                
configManager.getOperatorConfiguration().getExceptionStackTraceEnabled());
+                
configManager.getOperatorConfiguration().isExceptionStackTraceEnabled());
         assertEquals(
                 
KubernetesOperatorConfigOptions.OPERATOR_EXCEPTION_STACK_TRACE_MAX_LENGTH
                         .defaultValue(),
@@ -172,7 +172,7 @@ public class FlinkResourceExceptionUtilsTest {
                                                         
.getExceptionFieldLengthThreshold());
                             });
 
-            if 
(!configManager.getOperatorConfiguration().getExceptionStackTraceEnabled()) {
+            if 
(!configManager.getOperatorConfiguration().isExceptionStackTraceEnabled()) {
                 assertNull(flinkResourceException.getStackTrace());
             } else {
                 assertTrue(

Reply via email to