This is an automated email from the ASF dual-hosted git repository.

gyfora pushed a commit to branch main
in repository https://gitbox.apache.org/repos/asf/flink-kubernetes-operator.git


The following commit(s) were added to refs/heads/main by this push:
     new 5328e6b  [FLINK-27458] Expose allowNonRestoredState flag in JobSpec
5328e6b is described below

commit 5328e6b434d35c1b5689a0a4b26719be48c22253
Author: Aitozi <[email protected]>
AuthorDate: Thu May 5 21:18:25 2022 +0800

    [FLINK-27458] Expose allowNonRestoredState flag in JobSpec
---
 docs/content/docs/custom-resource/reference.md     |  1 +
 .../operator/config/FlinkConfigBuilder.java        |  6 ++
 .../kubernetes/operator/crd/spec/JobSpec.java      |  3 +
 .../kubernetes/operator/service/FlinkService.java  |  2 +-
 .../operator/config/FlinkConfigBuilderTest.java    | 89 +++++++++++++++-------
 .../crds/flinkdeployments.flink.apache.org-v1.yml  |  2 +
 .../crds/flinksessionjobs.flink.apache.org-v1.yml  |  2 +
 7 files changed, 76 insertions(+), 29 deletions(-)

diff --git a/docs/content/docs/custom-resource/reference.md 
b/docs/content/docs/custom-resource/reference.md
index d826df7..15112b0 100644
--- a/docs/content/docs/custom-resource/reference.md
+++ b/docs/content/docs/custom-resource/reference.md
@@ -117,6 +117,7 @@ This page serves as a full reference for FlinkDeployment 
custom resource definit
 | savepointTriggerNonce | java.lang.Long | Nonce used to manually trigger 
savepoint for the running job. In order to trigger a  savepoint, change the 
number to anything other than the current value. |
 | initialSavepointPath | java.lang.String | Savepoint path used by the job the 
first time it is deployed. Upgrades/redeployments will not  be affected. |
 | upgradeMode | org.apache.flink.kubernetes.operator.crd.spec.UpgradeMode | 
Upgrade mode of the Flink job. |
+| allowNonRestoredState | java.lang.Boolean | Allow checkpoint state that 
cannot be mapped to any job vertex in tasks. |
 
 ### JobState
 **Class**: org.apache.flink.kubernetes.operator.crd.spec.JobState
diff --git 
a/flink-kubernetes-operator/src/main/java/org/apache/flink/kubernetes/operator/config/FlinkConfigBuilder.java
 
b/flink-kubernetes-operator/src/main/java/org/apache/flink/kubernetes/operator/config/FlinkConfigBuilder.java
index 94e70dc..5712ffa 100644
--- 
a/flink-kubernetes-operator/src/main/java/org/apache/flink/kubernetes/operator/config/FlinkConfigBuilder.java
+++ 
b/flink-kubernetes-operator/src/main/java/org/apache/flink/kubernetes/operator/config/FlinkConfigBuilder.java
@@ -32,6 +32,7 @@ import 
org.apache.flink.kubernetes.operator.crd.FlinkDeployment;
 import org.apache.flink.kubernetes.operator.crd.spec.FlinkDeploymentSpec;
 import org.apache.flink.kubernetes.operator.crd.spec.Resource;
 import org.apache.flink.kubernetes.operator.crd.spec.UpgradeMode;
+import org.apache.flink.runtime.jobgraph.SavepointConfigOptions;
 import 
org.apache.flink.streaming.api.environment.ExecutionCheckpointingOptions;
 import org.apache.flink.util.FileUtils;
 import org.apache.flink.util.StringUtils;
@@ -216,6 +217,11 @@ public class FlinkConfigBuilder {
                 effectiveConfig.set(
                         CoreOptions.DEFAULT_PARALLELISM, 
spec.getJob().getParallelism());
             }
+            if (spec.getJob().getAllowNonRestoredState() != null) {
+                effectiveConfig.set(
+                        
SavepointConfigOptions.SAVEPOINT_IGNORE_UNCLAIMED_STATE,
+                        spec.getJob().getAllowNonRestoredState());
+            }
         } else {
             effectiveConfig.set(
                     DeploymentOptions.TARGET, 
KubernetesDeploymentTarget.SESSION.getName());
diff --git 
a/flink-kubernetes-operator/src/main/java/org/apache/flink/kubernetes/operator/crd/spec/JobSpec.java
 
b/flink-kubernetes-operator/src/main/java/org/apache/flink/kubernetes/operator/crd/spec/JobSpec.java
index c78e91b..a657499 100644
--- 
a/flink-kubernetes-operator/src/main/java/org/apache/flink/kubernetes/operator/crd/spec/JobSpec.java
+++ 
b/flink-kubernetes-operator/src/main/java/org/apache/flink/kubernetes/operator/crd/spec/JobSpec.java
@@ -65,4 +65,7 @@ public class JobSpec {
 
     /** Upgrade mode of the Flink job. */
     @EqualsAndHashCode.Exclude private UpgradeMode upgradeMode = 
UpgradeMode.STATELESS;
+
+    /** Allow checkpoint state that cannot be mapped to any job vertex in 
tasks. */
+    @EqualsAndHashCode.Exclude private Boolean allowNonRestoredState;
 }
diff --git 
a/flink-kubernetes-operator/src/main/java/org/apache/flink/kubernetes/operator/service/FlinkService.java
 
b/flink-kubernetes-operator/src/main/java/org/apache/flink/kubernetes/operator/service/FlinkService.java
index c2c3ee4..7c9fa0c 100644
--- 
a/flink-kubernetes-operator/src/main/java/org/apache/flink/kubernetes/operator/service/FlinkService.java
+++ 
b/flink-kubernetes-operator/src/main/java/org/apache/flink/kubernetes/operator/service/FlinkService.java
@@ -188,7 +188,7 @@ public class FlinkService {
                             job.getArgs() == null ? null : 
Arrays.asList(job.getArgs()),
                             job.getParallelism() > 0 ? job.getParallelism() : 
null,
                             jobID,
-                            null,
+                            job.getAllowNonRestoredState(),
                             savepoint);
             LOG.info("Submitting job: {} to session cluster.", 
jobID.toHexString());
             return clusterClient
diff --git 
a/flink-kubernetes-operator/src/test/java/org/apache/flink/kubernetes/operator/config/FlinkConfigBuilderTest.java
 
b/flink-kubernetes-operator/src/test/java/org/apache/flink/kubernetes/operator/config/FlinkConfigBuilderTest.java
index 17d1d99..d26d455 100644
--- 
a/flink-kubernetes-operator/src/test/java/org/apache/flink/kubernetes/operator/config/FlinkConfigBuilderTest.java
+++ 
b/flink-kubernetes-operator/src/test/java/org/apache/flink/kubernetes/operator/config/FlinkConfigBuilderTest.java
@@ -36,18 +36,20 @@ import 
org.apache.flink.kubernetes.operator.crd.spec.IngressSpec;
 import org.apache.flink.kubernetes.operator.crd.spec.UpgradeMode;
 import org.apache.flink.kubernetes.operator.reconciler.ReconciliationUtils;
 import org.apache.flink.kubernetes.utils.Constants;
+import org.apache.flink.runtime.jobgraph.SavepointConfigOptions;
 import 
org.apache.flink.streaming.api.environment.ExecutionCheckpointingOptions;
 
 import com.fasterxml.jackson.databind.ObjectMapper;
 import com.fasterxml.jackson.dataformat.yaml.YAMLFactory;
 import io.fabric8.kubernetes.api.model.Container;
 import io.fabric8.kubernetes.api.model.Pod;
-import org.junit.Assert;
+import org.junit.jupiter.api.Assertions;
 import org.junit.jupiter.api.BeforeAll;
 import org.junit.jupiter.api.Test;
 
 import java.io.File;
 import java.io.IOException;
+import java.net.URISyntaxException;
 import java.nio.file.Files;
 import java.util.ArrayList;
 import java.util.Arrays;
@@ -88,7 +90,7 @@ public class FlinkConfigBuilderTest {
     public void testApplyImage() {
         final Configuration configuration =
                 new FlinkConfigBuilder(flinkDeployment, new 
Configuration()).applyImage().build();
-        Assert.assertEquals(IMAGE, 
configuration.get(KubernetesConfigOptions.CONTAINER_IMAGE));
+        Assertions.assertEquals(IMAGE, 
configuration.get(KubernetesConfigOptions.CONTAINER_IMAGE));
     }
 
     @Test
@@ -97,7 +99,7 @@ public class FlinkConfigBuilderTest {
                 new FlinkConfigBuilder(flinkDeployment, new Configuration())
                         .applyImagePullPolicy()
                         .build();
-        Assert.assertEquals(
+        Assertions.assertEquals(
                 IMAGE_POLICY,
                 
configuration.get(KubernetesConfigOptions.CONTAINER_IMAGE_PULL_POLICY).toString());
     }
@@ -108,11 +110,11 @@ public class FlinkConfigBuilderTest {
                 new FlinkConfigBuilder(flinkDeployment, new Configuration())
                         .applyFlinkConfiguration()
                         .build();
-        Assert.assertEquals(2, (int) 
configuration.get(TaskManagerOptions.NUM_TASK_SLOTS));
-        Assert.assertEquals(
+        Assertions.assertEquals(2, (int) 
configuration.get(TaskManagerOptions.NUM_TASK_SLOTS));
+        Assertions.assertEquals(
                 KubernetesConfigOptions.ServiceExposedType.ClusterIP,
                 
configuration.get(KubernetesConfigOptions.REST_SERVICE_EXPOSED_TYPE));
-        Assert.assertEquals(false, 
configuration.get(WebOptions.CANCEL_ENABLE));
+        Assertions.assertEquals(false, 
configuration.get(WebOptions.CANCEL_ENABLE));
 
         FlinkDeployment deployment = 
ReconciliationUtils.clone(flinkDeployment);
         deployment
@@ -126,7 +128,7 @@ public class FlinkConfigBuilderTest {
                 new FlinkConfigBuilder(deployment, new Configuration())
                         .applyFlinkConfiguration()
                         .build();
-        Assert.assertEquals(
+        Assertions.assertEquals(
                 KubernetesConfigOptions.ServiceExposedType.LoadBalancer,
                 
configuration.get(KubernetesConfigOptions.REST_SERVICE_EXPOSED_TYPE));
 
@@ -141,7 +143,7 @@ public class FlinkConfigBuilderTest {
                                                         .getCanonicalName()))
                         .applyFlinkConfiguration()
                         .build();
-        Assert.assertEquals(
+        Assertions.assertEquals(
                 DEFAULT_CHECKPOINTING_INTERVAL,
                 
configuration.get(ExecutionCheckpointingOptions.CHECKPOINTING_INTERVAL));
     }
@@ -157,8 +159,8 @@ public class FlinkConfigBuilderTest {
                 new File(
                         configuration.get(DeploymentOptionsInternal.CONF_DIR),
                         CONFIG_FILE_LOG4J_NAME);
-        Assert.assertTrue(log4jFile.exists() && log4jFile.isFile() && 
log4jFile.canRead());
-        Assert.assertEquals(CUSTOM_LOG_CONFIG, 
Files.readString(log4jFile.toPath()));
+        Assertions.assertTrue(log4jFile.exists() && log4jFile.isFile() && 
log4jFile.canRead());
+        Assertions.assertEquals(CUSTOM_LOG_CONFIG, 
Files.readString(log4jFile.toPath()));
     }
 
     @Test
@@ -179,8 +181,8 @@ public class FlinkConfigBuilderTest {
                                 configuration.getString(
                                         
KubernetesConfigOptions.TASK_MANAGER_POD_TEMPLATE)),
                         Pod.class);
-        Assert.assertEquals("container0", 
jmPod.getSpec().getContainers().get(0).getName());
-        Assert.assertEquals("container0", 
tmPod.getSpec().getContainers().get(0).getName());
+        Assertions.assertEquals("container0", 
jmPod.getSpec().getContainers().get(0).getName());
+        Assertions.assertEquals("container0", 
tmPod.getSpec().getContainers().get(0).getName());
     }
 
     @Test
@@ -189,7 +191,7 @@ public class FlinkConfigBuilderTest {
                 new FlinkConfigBuilder(flinkDeployment, new Configuration())
                         .applyIngressDomain()
                         .build();
-        Assert.assertEquals(
+        Assertions.assertEquals(
                 KubernetesConfigOptions.ServiceExposedType.ClusterIP,
                 
configuration.get(KubernetesConfigOptions.REST_SERVICE_EXPOSED_TYPE));
     }
@@ -200,7 +202,7 @@ public class FlinkConfigBuilderTest {
                 new FlinkConfigBuilder(flinkDeployment, new Configuration())
                         .applyServiceAccount()
                         .build();
-        Assert.assertEquals(
+        Assertions.assertEquals(
                 SERVICE_ACCOUNT,
                 
configuration.get(KubernetesConfigOptions.KUBERNETES_SERVICE_ACCOUNT));
     }
@@ -212,14 +214,14 @@ public class FlinkConfigBuilderTest {
                         .applyJobManagerSpec()
                         .build();
 
-        Assert.assertNull(
+        Assertions.assertNull(
                 
configuration.getString(KubernetesConfigOptions.JOB_MANAGER_POD_TEMPLATE));
-        Assert.assertEquals(
+        Assertions.assertEquals(
                 MemorySize.parse("2048m"),
                 configuration.get(JobManagerOptions.TOTAL_PROCESS_MEMORY));
-        Assert.assertEquals(
+        Assertions.assertEquals(
                 Double.valueOf(1), 
configuration.get(KubernetesConfigOptions.JOB_MANAGER_CPU));
-        Assert.assertEquals(
+        Assertions.assertEquals(
                 Integer.valueOf(2),
                 
configuration.get(KubernetesConfigOptions.KUBERNETES_JOBMANAGER_REPLICAS));
 
@@ -240,7 +242,7 @@ public class FlinkConfigBuilderTest {
                                 configuration.getString(
                                         
KubernetesConfigOptions.JOB_MANAGER_POD_TEMPLATE)),
                         Pod.class);
-        Assert.assertEquals("pod1 api version", jmPod.getApiVersion());
+        Assertions.assertEquals("pod1 api version", jmPod.getApiVersion());
     }
 
     @Test
@@ -253,12 +255,12 @@ public class FlinkConfigBuilderTest {
                         .applyTaskManagerSpec()
                         .build();
 
-        Assert.assertNull(
+        Assertions.assertNull(
                 
configuration.getString(KubernetesConfigOptions.TASK_MANAGER_POD_TEMPLATE));
-        Assert.assertEquals(
+        Assertions.assertEquals(
                 MemorySize.parse("2048m"),
                 configuration.get(TaskManagerOptions.TOTAL_PROCESS_MEMORY));
-        Assert.assertEquals(
+        Assertions.assertEquals(
                 Double.valueOf(1), 
configuration.get(KubernetesConfigOptions.TASK_MANAGER_CPU));
 
         deploymentClone
@@ -278,20 +280,51 @@ public class FlinkConfigBuilderTest {
                                 configuration.getString(
                                         
KubernetesConfigOptions.TASK_MANAGER_POD_TEMPLATE)),
                         Pod.class);
-        Assert.assertEquals("pod2 api version", tmPod.getApiVersion());
+        Assertions.assertEquals("pod2 api version", tmPod.getApiVersion());
     }
 
     @Test
     public void testApplyJobOrSessionSpec() throws Exception {
+        flinkDeployment.getSpec().getJob().setAllowNonRestoredState(true);
         final Configuration configuration =
                 new FlinkConfigBuilder(flinkDeployment, new Configuration())
                         .applyJobOrSessionSpec()
                         .build();
-        Assert.assertEquals(
+        Assertions.assertTrue(
+                
configuration.getBoolean(SavepointConfigOptions.SAVEPOINT_IGNORE_UNCLAIMED_STATE));
+        Assertions.assertEquals(
                 KubernetesDeploymentTarget.APPLICATION.getName(),
                 configuration.get(DeploymentOptions.TARGET));
-        Assert.assertEquals(SAMPLE_JAR, 
configuration.get(PipelineOptions.JARS).get(0));
-        Assert.assertEquals(Integer.valueOf(2), 
configuration.get(CoreOptions.DEFAULT_PARALLELISM));
+        Assertions.assertEquals(SAMPLE_JAR, 
configuration.get(PipelineOptions.JARS).get(0));
+        Assertions.assertEquals(
+                Integer.valueOf(2), 
configuration.get(CoreOptions.DEFAULT_PARALLELISM));
+    }
+
+    @Test
+    public void testAllowNonRestoredStateInSpecOverrideInFlinkConf() throws 
URISyntaxException {
+        flinkDeployment.getSpec().getJob().setAllowNonRestoredState(false);
+        flinkDeployment
+                .getSpec()
+                .getFlinkConfiguration()
+                
.put(SavepointConfigOptions.SAVEPOINT_IGNORE_UNCLAIMED_STATE.key(), "true");
+        Configuration configuration =
+                new FlinkConfigBuilder(flinkDeployment, new Configuration())
+                        .applyJobOrSessionSpec()
+                        .build();
+        Assertions.assertFalse(
+                
configuration.getBoolean(SavepointConfigOptions.SAVEPOINT_IGNORE_UNCLAIMED_STATE));
+
+        flinkDeployment.getSpec().getJob().setAllowNonRestoredState(true);
+        flinkDeployment
+                .getSpec()
+                .getFlinkConfiguration()
+                
.put(SavepointConfigOptions.SAVEPOINT_IGNORE_UNCLAIMED_STATE.key(), "false");
+        configuration =
+                new FlinkConfigBuilder(flinkDeployment, new Configuration())
+                        .applyJobOrSessionSpec()
+                        .build();
+        Assertions.assertTrue(
+                
configuration.getBoolean(SavepointConfigOptions.SAVEPOINT_IGNORE_UNCLAIMED_STATE));
     }
 
     @Test
@@ -306,7 +339,7 @@ public class FlinkConfigBuilderTest {
         final String clusterId = flinkDeployment.getMetadata().getName();
         // Most configs have been tested by previous unit tests, thus we only 
verify the namespace
         // and clusterId here.
-        Assert.assertEquals(namespace, 
configuration.get(KubernetesConfigOptions.NAMESPACE));
-        Assert.assertEquals(clusterId, 
configuration.get(KubernetesConfigOptions.CLUSTER_ID));
+        Assertions.assertEquals(namespace, 
configuration.get(KubernetesConfigOptions.NAMESPACE));
+        Assertions.assertEquals(clusterId, 
configuration.get(KubernetesConfigOptions.CLUSTER_ID));
     }
 }
diff --git 
a/helm/flink-kubernetes-operator/crds/flinkdeployments.flink.apache.org-v1.yml 
b/helm/flink-kubernetes-operator/crds/flinkdeployments.flink.apache.org-v1.yml
index f6ef559..eb30ae8 100644
--- 
a/helm/flink-kubernetes-operator/crds/flinkdeployments.flink.apache.org-v1.yml
+++ 
b/helm/flink-kubernetes-operator/crds/flinkdeployments.flink.apache.org-v1.yml
@@ -9075,6 +9075,8 @@ spec:
                     - last-state
                     - stateless
                     type: string
+                  allowNonRestoredState:
+                    type: boolean
                 type: object
               restartNonce:
                 type: integer
diff --git 
a/helm/flink-kubernetes-operator/crds/flinksessionjobs.flink.apache.org-v1.yml 
b/helm/flink-kubernetes-operator/crds/flinksessionjobs.flink.apache.org-v1.yml
index 51ea512..e94e543 100644
--- 
a/helm/flink-kubernetes-operator/crds/flinksessionjobs.flink.apache.org-v1.yml
+++ 
b/helm/flink-kubernetes-operator/crds/flinksessionjobs.flink.apache.org-v1.yml
@@ -48,6 +48,8 @@ spec:
                     - last-state
                     - stateless
                     type: string
+                  allowNonRestoredState:
+                    type: boolean
                 type: object
               restartNonce:
                 type: integer

Reply via email to