This is an automated email from the ASF dual-hosted git repository.
gyfora pushed a commit to branch main
in repository https://gitbox.apache.org/repos/asf/flink-kubernetes-operator.git
The following commit(s) were added to refs/heads/main by this push:
new b9e6b7a [FLINK-26893] Validate checkpoint config with last-state
upgrade mode (#139)
b9e6b7a is described below
commit b9e6b7a196d4d489b5d15fd03c026c5b6f543d52
Author: Nicholas Jiang <[email protected]>
AuthorDate: Sun Apr 3 15:06:49 2022 +0800
[FLINK-26893] Validate checkpoint config with last-state upgrade mode (#139)
---
.../operator/utils/FlinkConfigBuilder.java | 16 ++++++++++++++++
.../operator/utils/FlinkConfigBuilderTest.java | 20 ++++++++++++++++++++
2 files changed, 36 insertions(+)
diff --git
a/flink-kubernetes-operator/src/main/java/org/apache/flink/kubernetes/operator/utils/FlinkConfigBuilder.java
b/flink-kubernetes-operator/src/main/java/org/apache/flink/kubernetes/operator/utils/FlinkConfigBuilder.java
index cc59838..e84ca4b 100644
---
a/flink-kubernetes-operator/src/main/java/org/apache/flink/kubernetes/operator/utils/FlinkConfigBuilder.java
+++
b/flink-kubernetes-operator/src/main/java/org/apache/flink/kubernetes/operator/utils/FlinkConfigBuilder.java
@@ -30,6 +30,8 @@ import
org.apache.flink.kubernetes.configuration.KubernetesDeploymentTarget;
import org.apache.flink.kubernetes.operator.crd.FlinkDeployment;
import org.apache.flink.kubernetes.operator.crd.spec.FlinkDeploymentSpec;
import org.apache.flink.kubernetes.operator.crd.spec.Resource;
+import org.apache.flink.kubernetes.operator.crd.spec.UpgradeMode;
+import
org.apache.flink.streaming.api.environment.ExecutionCheckpointingOptions;
import org.apache.flink.util.StringUtils;
import io.fabric8.kubernetes.api.model.ObjectMeta;
@@ -41,6 +43,7 @@ import java.io.IOException;
import java.net.URI;
import java.net.URISyntaxException;
import java.nio.file.Files;
+import java.time.Duration;
import java.util.Collections;
import static
org.apache.flink.configuration.DeploymentOptionsInternal.CONF_DIR;
@@ -55,6 +58,8 @@ public class FlinkConfigBuilder {
private final FlinkDeploymentSpec spec;
private final Configuration effectiveConfig;
+ public static final Duration DEFAULT_CHECKPOINTING_INTERVAL =
Duration.ofMinutes(5);
+
public FlinkConfigBuilder(FlinkDeployment deploy, Configuration
flinkConfig) {
this(deploy.getMetadata(), deploy.getSpec(), flinkConfig);
}
@@ -94,6 +99,17 @@ public class FlinkConfigBuilder {
REST_SERVICE_EXPOSED_TYPE,
KubernetesConfigOptions.ServiceExposedType.ClusterIP);
}
+
+ // With last-state upgrade mode, set the default value of
'execution.checkpointing.interval'
+ // to 5 minutes when HA is enabled.
+ if (spec.getJob() != null
+ && spec.getJob().getUpgradeMode() == UpgradeMode.LAST_STATE
+ && !effectiveConfig.contains(
+ ExecutionCheckpointingOptions.CHECKPOINTING_INTERVAL))
{
+ effectiveConfig.set(
+ ExecutionCheckpointingOptions.CHECKPOINTING_INTERVAL,
+ DEFAULT_CHECKPOINTING_INTERVAL);
+ }
return this;
}
diff --git
a/flink-kubernetes-operator/src/test/java/org/apache/flink/kubernetes/operator/utils/FlinkConfigBuilderTest.java
b/flink-kubernetes-operator/src/test/java/org/apache/flink/kubernetes/operator/utils/FlinkConfigBuilderTest.java
index 638f19c..c81f7ce 100644
---
a/flink-kubernetes-operator/src/test/java/org/apache/flink/kubernetes/operator/utils/FlinkConfigBuilderTest.java
+++
b/flink-kubernetes-operator/src/test/java/org/apache/flink/kubernetes/operator/utils/FlinkConfigBuilderTest.java
@@ -21,17 +21,21 @@ import org.apache.flink.configuration.Configuration;
import org.apache.flink.configuration.CoreOptions;
import org.apache.flink.configuration.DeploymentOptions;
import org.apache.flink.configuration.DeploymentOptionsInternal;
+import org.apache.flink.configuration.HighAvailabilityOptions;
import org.apache.flink.configuration.JobManagerOptions;
import org.apache.flink.configuration.MemorySize;
import org.apache.flink.configuration.PipelineOptions;
import org.apache.flink.configuration.TaskManagerOptions;
import org.apache.flink.kubernetes.configuration.KubernetesConfigOptions;
import org.apache.flink.kubernetes.configuration.KubernetesDeploymentTarget;
+import
org.apache.flink.kubernetes.highavailability.KubernetesHaServicesFactory;
import org.apache.flink.kubernetes.operator.TestUtils;
import org.apache.flink.kubernetes.operator.crd.FlinkDeployment;
import org.apache.flink.kubernetes.operator.crd.spec.IngressSpec;
+import org.apache.flink.kubernetes.operator.crd.spec.UpgradeMode;
import org.apache.flink.kubernetes.operator.reconciler.ReconciliationUtils;
import org.apache.flink.kubernetes.utils.Constants;
+import
org.apache.flink.streaming.api.environment.ExecutionCheckpointingOptions;
import com.fasterxml.jackson.databind.ObjectMapper;
import com.fasterxml.jackson.dataformat.yaml.YAMLFactory;
@@ -52,6 +56,7 @@ import static
org.apache.flink.kubernetes.operator.TestUtils.IMAGE;
import static org.apache.flink.kubernetes.operator.TestUtils.IMAGE_POLICY;
import static org.apache.flink.kubernetes.operator.TestUtils.SAMPLE_JAR;
import static org.apache.flink.kubernetes.operator.TestUtils.SERVICE_ACCOUNT;
+import static
org.apache.flink.kubernetes.operator.utils.FlinkConfigBuilder.DEFAULT_CHECKPOINTING_INTERVAL;
import static
org.apache.flink.kubernetes.utils.Constants.CONFIG_FILE_LOG4J_NAME;
/** FlinkConfigBuilderTest. */
@@ -128,6 +133,21 @@ public class FlinkConfigBuilderTest {
Assert.assertEquals(
KubernetesConfigOptions.ServiceExposedType.LoadBalancer,
configuration.get(KubernetesConfigOptions.REST_SERVICE_EXPOSED_TYPE));
+
+ deployment.getSpec().getJob().setUpgradeMode(UpgradeMode.LAST_STATE);
+ configuration =
+ new FlinkConfigBuilder(
+ deployment,
+ new Configuration()
+ .set(
+
HighAvailabilityOptions.HA_MODE,
+
KubernetesHaServicesFactory.class
+ .getCanonicalName()))
+ .applyFlinkConfiguration()
+ .build();
+ Assert.assertEquals(
+ DEFAULT_CHECKPOINTING_INTERVAL,
+
configuration.get(ExecutionCheckpointingOptions.CHECKPOINTING_INTERVAL));
}
@Test