This is an automated email from the ASF dual-hosted git repository.

gyfora pushed a commit to branch main
in repository https://gitbox.apache.org/repos/asf/flink-kubernetes-operator.git


The following commit(s) were added to refs/heads/main by this push:
     new b9e6b7a  [FLINK-26893] Validate checkpoint config with last-state 
upgrade mode (#139)
b9e6b7a is described below

commit b9e6b7a196d4d489b5d15fd03c026c5b6f543d52
Author: Nicholas Jiang <[email protected]>
AuthorDate: Sun Apr 3 15:06:49 2022 +0800

    [FLINK-26893] Validate checkpoint config with last-state upgrade mode (#139)
---
 .../operator/utils/FlinkConfigBuilder.java           | 16 ++++++++++++++++
 .../operator/utils/FlinkConfigBuilderTest.java       | 20 ++++++++++++++++++++
 2 files changed, 36 insertions(+)

diff --git 
a/flink-kubernetes-operator/src/main/java/org/apache/flink/kubernetes/operator/utils/FlinkConfigBuilder.java
 
b/flink-kubernetes-operator/src/main/java/org/apache/flink/kubernetes/operator/utils/FlinkConfigBuilder.java
index cc59838..e84ca4b 100644
--- 
a/flink-kubernetes-operator/src/main/java/org/apache/flink/kubernetes/operator/utils/FlinkConfigBuilder.java
+++ 
b/flink-kubernetes-operator/src/main/java/org/apache/flink/kubernetes/operator/utils/FlinkConfigBuilder.java
@@ -30,6 +30,8 @@ import 
org.apache.flink.kubernetes.configuration.KubernetesDeploymentTarget;
 import org.apache.flink.kubernetes.operator.crd.FlinkDeployment;
 import org.apache.flink.kubernetes.operator.crd.spec.FlinkDeploymentSpec;
 import org.apache.flink.kubernetes.operator.crd.spec.Resource;
+import org.apache.flink.kubernetes.operator.crd.spec.UpgradeMode;
+import 
org.apache.flink.streaming.api.environment.ExecutionCheckpointingOptions;
 import org.apache.flink.util.StringUtils;
 
 import io.fabric8.kubernetes.api.model.ObjectMeta;
@@ -41,6 +43,7 @@ import java.io.IOException;
 import java.net.URI;
 import java.net.URISyntaxException;
 import java.nio.file.Files;
+import java.time.Duration;
 import java.util.Collections;
 
 import static 
org.apache.flink.configuration.DeploymentOptionsInternal.CONF_DIR;
@@ -55,6 +58,8 @@ public class FlinkConfigBuilder {
     private final FlinkDeploymentSpec spec;
     private final Configuration effectiveConfig;
 
+    public static final Duration DEFAULT_CHECKPOINTING_INTERVAL = 
Duration.ofMinutes(5);
+
     public FlinkConfigBuilder(FlinkDeployment deploy, Configuration 
flinkConfig) {
         this(deploy.getMetadata(), deploy.getSpec(), flinkConfig);
     }
@@ -94,6 +99,17 @@ public class FlinkConfigBuilder {
                     REST_SERVICE_EXPOSED_TYPE,
                     KubernetesConfigOptions.ServiceExposedType.ClusterIP);
         }
+
+        // With last-state upgrade mode, set the default value of 
'execution.checkpointing.interval'
+        // to 5 minutes when HA is enabled.
+        if (spec.getJob() != null
+                && spec.getJob().getUpgradeMode() == UpgradeMode.LAST_STATE
+                && !effectiveConfig.contains(
+                        ExecutionCheckpointingOptions.CHECKPOINTING_INTERVAL)) 
{
+            effectiveConfig.set(
+                    ExecutionCheckpointingOptions.CHECKPOINTING_INTERVAL,
+                    DEFAULT_CHECKPOINTING_INTERVAL);
+        }
         return this;
     }
 
diff --git 
a/flink-kubernetes-operator/src/test/java/org/apache/flink/kubernetes/operator/utils/FlinkConfigBuilderTest.java
 
b/flink-kubernetes-operator/src/test/java/org/apache/flink/kubernetes/operator/utils/FlinkConfigBuilderTest.java
index 638f19c..c81f7ce 100644
--- 
a/flink-kubernetes-operator/src/test/java/org/apache/flink/kubernetes/operator/utils/FlinkConfigBuilderTest.java
+++ 
b/flink-kubernetes-operator/src/test/java/org/apache/flink/kubernetes/operator/utils/FlinkConfigBuilderTest.java
@@ -21,17 +21,21 @@ import org.apache.flink.configuration.Configuration;
 import org.apache.flink.configuration.CoreOptions;
 import org.apache.flink.configuration.DeploymentOptions;
 import org.apache.flink.configuration.DeploymentOptionsInternal;
+import org.apache.flink.configuration.HighAvailabilityOptions;
 import org.apache.flink.configuration.JobManagerOptions;
 import org.apache.flink.configuration.MemorySize;
 import org.apache.flink.configuration.PipelineOptions;
 import org.apache.flink.configuration.TaskManagerOptions;
 import org.apache.flink.kubernetes.configuration.KubernetesConfigOptions;
 import org.apache.flink.kubernetes.configuration.KubernetesDeploymentTarget;
+import 
org.apache.flink.kubernetes.highavailability.KubernetesHaServicesFactory;
 import org.apache.flink.kubernetes.operator.TestUtils;
 import org.apache.flink.kubernetes.operator.crd.FlinkDeployment;
 import org.apache.flink.kubernetes.operator.crd.spec.IngressSpec;
+import org.apache.flink.kubernetes.operator.crd.spec.UpgradeMode;
 import org.apache.flink.kubernetes.operator.reconciler.ReconciliationUtils;
 import org.apache.flink.kubernetes.utils.Constants;
+import 
org.apache.flink.streaming.api.environment.ExecutionCheckpointingOptions;
 
 import com.fasterxml.jackson.databind.ObjectMapper;
 import com.fasterxml.jackson.dataformat.yaml.YAMLFactory;
@@ -52,6 +56,7 @@ import static 
org.apache.flink.kubernetes.operator.TestUtils.IMAGE;
 import static org.apache.flink.kubernetes.operator.TestUtils.IMAGE_POLICY;
 import static org.apache.flink.kubernetes.operator.TestUtils.SAMPLE_JAR;
 import static org.apache.flink.kubernetes.operator.TestUtils.SERVICE_ACCOUNT;
+import static 
org.apache.flink.kubernetes.operator.utils.FlinkConfigBuilder.DEFAULT_CHECKPOINTING_INTERVAL;
 import static 
org.apache.flink.kubernetes.utils.Constants.CONFIG_FILE_LOG4J_NAME;
 
 /** FlinkConfigBuilderTest. */
@@ -128,6 +133,21 @@ public class FlinkConfigBuilderTest {
         Assert.assertEquals(
                 KubernetesConfigOptions.ServiceExposedType.LoadBalancer,
                 
configuration.get(KubernetesConfigOptions.REST_SERVICE_EXPOSED_TYPE));
+
+        deployment.getSpec().getJob().setUpgradeMode(UpgradeMode.LAST_STATE);
+        configuration =
+                new FlinkConfigBuilder(
+                                deployment,
+                                new Configuration()
+                                        .set(
+                                                
HighAvailabilityOptions.HA_MODE,
+                                                
KubernetesHaServicesFactory.class
+                                                        .getCanonicalName()))
+                        .applyFlinkConfiguration()
+                        .build();
+        Assert.assertEquals(
+                DEFAULT_CHECKPOINTING_INTERVAL,
+                
configuration.get(ExecutionCheckpointingOptions.CHECKPOINTING_INTERVAL));
     }
 
     @Test

Reply via email to