This is an automated email from the ASF dual-hosted git repository. gyfora pushed a commit to branch main in repository https://gitbox.apache.org/repos/asf/flink-kubernetes-operator.git
The following commit(s) were added to refs/heads/main by this push: new 6df91ae1 [FLINK-31795] Add config to disable savepoint cleanup 6df91ae1 is described below commit 6df91ae17873c274b2b993ab7bef30f1297e5c3e Author: darenwkt <108749182+daren...@users.noreply.github.com> AuthorDate: Wed May 3 16:39:16 2023 +0100 [FLINK-31795] Add config to disable savepoint cleanup --- .../shortcodes/generated/dynamic_section.html | 6 ++++ .../kubernetes_operator_config_configuration.html | 6 ++++ .../config/FlinkOperatorConfiguration.java | 2 +- .../config/KubernetesOperatorConfigOptions.java | 7 ++++ .../operator/observer/SavepointObserver.java | 12 +++++-- .../utils/FlinkResourceExceptionUtils.java | 2 +- .../operator/observer/SavepointObserverTest.java | 38 ++++++++++++++++++++++ .../utils/FlinkResourceExceptionUtilsTest.java | 4 +-- 8 files changed, 71 insertions(+), 6 deletions(-) diff --git a/docs/layouts/shortcodes/generated/dynamic_section.html b/docs/layouts/shortcodes/generated/dynamic_section.html index 3179183f..9662e8f1 100644 --- a/docs/layouts/shortcodes/generated/dynamic_section.html +++ b/docs/layouts/shortcodes/generated/dynamic_section.html @@ -98,6 +98,12 @@ <td>Boolean</td> <td>Configure the array merge behaviour during pod merging. Arrays can be either merged by position or name matching.</td> </tr> + <tr> + <td><h5>kubernetes.operator.savepoint.cleanup.enabled</h5></td> + <td style="word-wrap: break-word;">true</td> + <td>Boolean</td> + <td>Whether to enable clean up of savepoint history.</td> + </tr> <tr> <td><h5>kubernetes.operator.savepoint.format.type</h5></td> <td style="word-wrap: break-word;">CANONICAL</td> diff --git a/docs/layouts/shortcodes/generated/kubernetes_operator_config_configuration.html b/docs/layouts/shortcodes/generated/kubernetes_operator_config_configuration.html index 5495b2af..7d87aa71 100644 --- a/docs/layouts/shortcodes/generated/kubernetes_operator_config_configuration.html +++ b/docs/layouts/shortcodes/generated/kubernetes_operator_config_configuration.html @@ -272,6 +272,12 @@ <td>Integer</td> <td>Max attempts of automatic reconcile retries on recoverable errors.</td> </tr> + <tr> + <td><h5>kubernetes.operator.savepoint.cleanup.enabled</h5></td> + <td style="word-wrap: break-word;">true</td> + <td>Boolean</td> + <td>Whether to enable clean up of savepoint history.</td> + </tr> <tr> <td><h5>kubernetes.operator.savepoint.format.type</h5></td> <td style="word-wrap: break-word;">CANONICAL</td> diff --git a/flink-kubernetes-operator/src/main/java/org/apache/flink/kubernetes/operator/config/FlinkOperatorConfiguration.java b/flink-kubernetes-operator/src/main/java/org/apache/flink/kubernetes/operator/config/FlinkOperatorConfiguration.java index b29077aa..f32e57d8 100644 --- a/flink-kubernetes-operator/src/main/java/org/apache/flink/kubernetes/operator/config/FlinkOperatorConfiguration.java +++ b/flink-kubernetes-operator/src/main/java/org/apache/flink/kubernetes/operator/config/FlinkOperatorConfiguration.java @@ -62,7 +62,7 @@ public class FlinkOperatorConfiguration { Integer savepointHistoryCountThreshold; Duration savepointHistoryAgeThreshold; RetryConfiguration retryConfiguration; - Boolean exceptionStackTraceEnabled; + boolean exceptionStackTraceEnabled; int exceptionStackTraceLengthThreshold; int exceptionFieldLengthThreshold; int exceptionThrowableCountThreshold; diff --git a/flink-kubernetes-operator/src/main/java/org/apache/flink/kubernetes/operator/config/KubernetesOperatorConfigOptions.java b/flink-kubernetes-operator/src/main/java/org/apache/flink/kubernetes/operator/config/KubernetesOperatorConfigOptions.java index 820eb221..3b296758 100644 --- a/flink-kubernetes-operator/src/main/java/org/apache/flink/kubernetes/operator/config/KubernetesOperatorConfigOptions.java +++ b/flink-kubernetes-operator/src/main/java/org/apache/flink/kubernetes/operator/config/KubernetesOperatorConfigOptions.java @@ -189,6 +189,13 @@ public class KubernetesOperatorConfigOptions { .withDescription( "Whether to enable recovery of missing/deleted jobmanager deployments."); + @Documentation.Section(SECTION_DYNAMIC) + public static final ConfigOption<Boolean> OPERATOR_SAVEPOINT_CLEANUP_ENABLED = + operatorConfig("savepoint.cleanup.enabled") + .booleanType() + .defaultValue(true) + .withDescription("Whether to enable clean up of savepoint history."); + @Documentation.Section(SECTION_DYNAMIC) public static final ConfigOption<Integer> OPERATOR_SAVEPOINT_HISTORY_MAX_COUNT = operatorConfig("savepoint.history.max.count") diff --git a/flink-kubernetes-operator/src/main/java/org/apache/flink/kubernetes/operator/observer/SavepointObserver.java b/flink-kubernetes-operator/src/main/java/org/apache/flink/kubernetes/operator/observer/SavepointObserver.java index 8b7042e1..fef3a0a0 100644 --- a/flink-kubernetes-operator/src/main/java/org/apache/flink/kubernetes/operator/observer/SavepointObserver.java +++ b/flink-kubernetes-operator/src/main/java/org/apache/flink/kubernetes/operator/observer/SavepointObserver.java @@ -141,6 +141,9 @@ public class SavepointObserver< FlinkService flinkService, SavepointInfo currentSavepointInfo, Configuration observeConfig) { + final boolean savepointCleanupEnabled = + observeConfig.getBoolean( + KubernetesOperatorConfigOptions.OPERATOR_SAVEPOINT_CLEANUP_ENABLED); // maintain history List<Savepoint> savepointHistory = currentSavepointInfo.getSavepointHistory(); @@ -161,7 +164,10 @@ public class SavepointObserver< .getSavepointHistoryCountThreshold())); while (savepointHistory.size() > maxCount) { // remove oldest entries - disposeSavepointQuietly(flinkService, savepointHistory.remove(0), observeConfig); + Savepoint sp = savepointHistory.remove(0); + if (savepointCleanupEnabled) { + disposeSavepointQuietly(flinkService, sp, observeConfig); + } } Duration maxAge = @@ -175,7 +181,9 @@ public class SavepointObserver< Savepoint sp = it.next(); if (sp.getTimeStamp() < maxTms && sp != lastSavepoint) { it.remove(); - disposeSavepointQuietly(flinkService, sp, observeConfig); + if (savepointCleanupEnabled) { + disposeSavepointQuietly(flinkService, sp, observeConfig); + } } } } diff --git a/flink-kubernetes-operator/src/main/java/org/apache/flink/kubernetes/operator/utils/FlinkResourceExceptionUtils.java b/flink-kubernetes-operator/src/main/java/org/apache/flink/kubernetes/operator/utils/FlinkResourceExceptionUtils.java index fad62b6f..78b26414 100644 --- a/flink-kubernetes-operator/src/main/java/org/apache/flink/kubernetes/operator/utils/FlinkResourceExceptionUtils.java +++ b/flink-kubernetes-operator/src/main/java/org/apache/flink/kubernetes/operator/utils/FlinkResourceExceptionUtils.java @@ -41,7 +41,7 @@ public final class FlinkResourceExceptionUtils { public static <R extends AbstractFlinkResource> void updateFlinkResourceException( Throwable throwable, R resource, FlinkOperatorConfiguration conf) { - boolean stackTraceEnabled = conf.getExceptionStackTraceEnabled(); + boolean stackTraceEnabled = conf.isExceptionStackTraceEnabled(); int stackTraceLengthThreshold = conf.getExceptionStackTraceLengthThreshold(); int lengthThreshold = conf.getExceptionFieldLengthThreshold(); int throwableCountThreshold = conf.getExceptionThrowableCountThreshold(); diff --git a/flink-kubernetes-operator/src/test/java/org/apache/flink/kubernetes/operator/observer/SavepointObserverTest.java b/flink-kubernetes-operator/src/test/java/org/apache/flink/kubernetes/operator/observer/SavepointObserverTest.java index ea56aac4..2b1bf201 100644 --- a/flink-kubernetes-operator/src/test/java/org/apache/flink/kubernetes/operator/observer/SavepointObserverTest.java +++ b/flink-kubernetes-operator/src/test/java/org/apache/flink/kubernetes/operator/observer/SavepointObserverTest.java @@ -39,6 +39,7 @@ import org.junit.jupiter.api.Test; import java.time.Duration; import java.util.Collections; +import java.util.List; import static org.junit.jupiter.api.Assertions.assertEquals; import static org.junit.jupiter.api.Assertions.assertFalse; @@ -138,6 +139,43 @@ public class SavepointObserverTest extends OperatorTestBase { configManager.updateDefaultConfig(new Configuration()); } + @Test + public void testDisabledDispose() { + Configuration conf = new Configuration(); + conf.set(KubernetesOperatorConfigOptions.OPERATOR_SAVEPOINT_CLEANUP_ENABLED, false); + conf.set(KubernetesOperatorConfigOptions.OPERATOR_SAVEPOINT_HISTORY_MAX_COUNT, 1000); + conf.set( + KubernetesOperatorConfigOptions.OPERATOR_SAVEPOINT_HISTORY_MAX_AGE, + Duration.ofDays(100L)); + + configManager.updateDefaultConfig(conf); + + SavepointInfo spInfo = new SavepointInfo(); + + Savepoint sp1 = + new Savepoint( + 9999999999999998L, + "sp1", + SavepointTriggerType.MANUAL, + SavepointFormatType.CANONICAL, + 123L); + spInfo.updateLastSavepoint(sp1); + observer.cleanupSavepointHistory(flinkService, spInfo, conf); + + Savepoint sp2 = + new Savepoint( + 9999999999999999L, + "sp2", + SavepointTriggerType.MANUAL, + SavepointFormatType.CANONICAL, + 123L); + spInfo.updateLastSavepoint(sp2); + observer.cleanupSavepointHistory(flinkService, spInfo, conf); + Assertions.assertIterableEquals(List.of(sp1, sp2), spInfo.getSavepointHistory()); + Assertions.assertIterableEquals( + Collections.emptyList(), flinkService.getDisposedSavepoints()); + } + @Test public void testPeriodicSavepoint() { var conf = new Configuration(); diff --git a/flink-kubernetes-operator/src/test/java/org/apache/flink/kubernetes/operator/utils/FlinkResourceExceptionUtilsTest.java b/flink-kubernetes-operator/src/test/java/org/apache/flink/kubernetes/operator/utils/FlinkResourceExceptionUtilsTest.java index 13224ec0..13ca0861 100644 --- a/flink-kubernetes-operator/src/test/java/org/apache/flink/kubernetes/operator/utils/FlinkResourceExceptionUtilsTest.java +++ b/flink-kubernetes-operator/src/test/java/org/apache/flink/kubernetes/operator/utils/FlinkResourceExceptionUtilsTest.java @@ -58,7 +58,7 @@ public class FlinkResourceExceptionUtilsTest { assertEquals( KubernetesOperatorConfigOptions.OPERATOR_EXCEPTION_STACK_TRACE_ENABLED .defaultValue(), - configManager.getOperatorConfiguration().getExceptionStackTraceEnabled()); + configManager.getOperatorConfiguration().isExceptionStackTraceEnabled()); assertEquals( KubernetesOperatorConfigOptions.OPERATOR_EXCEPTION_STACK_TRACE_MAX_LENGTH .defaultValue(), @@ -172,7 +172,7 @@ public class FlinkResourceExceptionUtilsTest { .getExceptionFieldLengthThreshold()); }); - if (!configManager.getOperatorConfiguration().getExceptionStackTraceEnabled()) { + if (!configManager.getOperatorConfiguration().isExceptionStackTraceEnabled()) { assertNull(flinkResourceException.getStackTrace()); } else { assertTrue(