This is an automated email from the ASF dual-hosted git repository.
gyfora pushed a commit to branch main
in repository https://gitbox.apache.org/repos/asf/flink-kubernetes-operator.git
The following commit(s) were added to refs/heads/main by this push:
new 5328e6b [FLINK-27458] Expose allowNonRestoredState flag in JobSpec
5328e6b is described below
commit 5328e6b434d35c1b5689a0a4b26719be48c22253
Author: Aitozi <[email protected]>
AuthorDate: Thu May 5 21:18:25 2022 +0800
[FLINK-27458] Expose allowNonRestoredState flag in JobSpec
---
docs/content/docs/custom-resource/reference.md | 1 +
.../operator/config/FlinkConfigBuilder.java | 6 ++
.../kubernetes/operator/crd/spec/JobSpec.java | 3 +
.../kubernetes/operator/service/FlinkService.java | 2 +-
.../operator/config/FlinkConfigBuilderTest.java | 89 +++++++++++++++-------
.../crds/flinkdeployments.flink.apache.org-v1.yml | 2 +
.../crds/flinksessionjobs.flink.apache.org-v1.yml | 2 +
7 files changed, 76 insertions(+), 29 deletions(-)
diff --git a/docs/content/docs/custom-resource/reference.md
b/docs/content/docs/custom-resource/reference.md
index d826df7..15112b0 100644
--- a/docs/content/docs/custom-resource/reference.md
+++ b/docs/content/docs/custom-resource/reference.md
@@ -117,6 +117,7 @@ This page serves as a full reference for FlinkDeployment
custom resource definit
| savepointTriggerNonce | java.lang.Long | Nonce used to manually trigger
savepoint for the running job. In order to trigger a savepoint, change the
number to anything other than the current value. |
| initialSavepointPath | java.lang.String | Savepoint path used by the job the
first time it is deployed. Upgrades/redeployments will not be affected. |
| upgradeMode | org.apache.flink.kubernetes.operator.crd.spec.UpgradeMode |
Upgrade mode of the Flink job. |
+| allowNonRestoredState | java.lang.Boolean | Allow checkpoint state that
cannot be mapped to any job vertex in tasks. |
### JobState
**Class**: org.apache.flink.kubernetes.operator.crd.spec.JobState
diff --git
a/flink-kubernetes-operator/src/main/java/org/apache/flink/kubernetes/operator/config/FlinkConfigBuilder.java
b/flink-kubernetes-operator/src/main/java/org/apache/flink/kubernetes/operator/config/FlinkConfigBuilder.java
index 94e70dc..5712ffa 100644
---
a/flink-kubernetes-operator/src/main/java/org/apache/flink/kubernetes/operator/config/FlinkConfigBuilder.java
+++
b/flink-kubernetes-operator/src/main/java/org/apache/flink/kubernetes/operator/config/FlinkConfigBuilder.java
@@ -32,6 +32,7 @@ import
org.apache.flink.kubernetes.operator.crd.FlinkDeployment;
import org.apache.flink.kubernetes.operator.crd.spec.FlinkDeploymentSpec;
import org.apache.flink.kubernetes.operator.crd.spec.Resource;
import org.apache.flink.kubernetes.operator.crd.spec.UpgradeMode;
+import org.apache.flink.runtime.jobgraph.SavepointConfigOptions;
import
org.apache.flink.streaming.api.environment.ExecutionCheckpointingOptions;
import org.apache.flink.util.FileUtils;
import org.apache.flink.util.StringUtils;
@@ -216,6 +217,11 @@ public class FlinkConfigBuilder {
effectiveConfig.set(
CoreOptions.DEFAULT_PARALLELISM,
spec.getJob().getParallelism());
}
+ if (spec.getJob().getAllowNonRestoredState() != null) {
+ effectiveConfig.set(
+
SavepointConfigOptions.SAVEPOINT_IGNORE_UNCLAIMED_STATE,
+ spec.getJob().getAllowNonRestoredState());
+ }
} else {
effectiveConfig.set(
DeploymentOptions.TARGET,
KubernetesDeploymentTarget.SESSION.getName());
diff --git
a/flink-kubernetes-operator/src/main/java/org/apache/flink/kubernetes/operator/crd/spec/JobSpec.java
b/flink-kubernetes-operator/src/main/java/org/apache/flink/kubernetes/operator/crd/spec/JobSpec.java
index c78e91b..a657499 100644
---
a/flink-kubernetes-operator/src/main/java/org/apache/flink/kubernetes/operator/crd/spec/JobSpec.java
+++
b/flink-kubernetes-operator/src/main/java/org/apache/flink/kubernetes/operator/crd/spec/JobSpec.java
@@ -65,4 +65,7 @@ public class JobSpec {
/** Upgrade mode of the Flink job. */
@EqualsAndHashCode.Exclude private UpgradeMode upgradeMode =
UpgradeMode.STATELESS;
+
+ /** Allow checkpoint state that cannot be mapped to any job vertex in
tasks. */
+ @EqualsAndHashCode.Exclude private Boolean allowNonRestoredState;
}
diff --git
a/flink-kubernetes-operator/src/main/java/org/apache/flink/kubernetes/operator/service/FlinkService.java
b/flink-kubernetes-operator/src/main/java/org/apache/flink/kubernetes/operator/service/FlinkService.java
index c2c3ee4..7c9fa0c 100644
---
a/flink-kubernetes-operator/src/main/java/org/apache/flink/kubernetes/operator/service/FlinkService.java
+++
b/flink-kubernetes-operator/src/main/java/org/apache/flink/kubernetes/operator/service/FlinkService.java
@@ -188,7 +188,7 @@ public class FlinkService {
job.getArgs() == null ? null :
Arrays.asList(job.getArgs()),
job.getParallelism() > 0 ? job.getParallelism() :
null,
jobID,
- null,
+ job.getAllowNonRestoredState(),
savepoint);
LOG.info("Submitting job: {} to session cluster.",
jobID.toHexString());
return clusterClient
diff --git
a/flink-kubernetes-operator/src/test/java/org/apache/flink/kubernetes/operator/config/FlinkConfigBuilderTest.java
b/flink-kubernetes-operator/src/test/java/org/apache/flink/kubernetes/operator/config/FlinkConfigBuilderTest.java
index 17d1d99..d26d455 100644
---
a/flink-kubernetes-operator/src/test/java/org/apache/flink/kubernetes/operator/config/FlinkConfigBuilderTest.java
+++
b/flink-kubernetes-operator/src/test/java/org/apache/flink/kubernetes/operator/config/FlinkConfigBuilderTest.java
@@ -36,18 +36,20 @@ import
org.apache.flink.kubernetes.operator.crd.spec.IngressSpec;
import org.apache.flink.kubernetes.operator.crd.spec.UpgradeMode;
import org.apache.flink.kubernetes.operator.reconciler.ReconciliationUtils;
import org.apache.flink.kubernetes.utils.Constants;
+import org.apache.flink.runtime.jobgraph.SavepointConfigOptions;
import
org.apache.flink.streaming.api.environment.ExecutionCheckpointingOptions;
import com.fasterxml.jackson.databind.ObjectMapper;
import com.fasterxml.jackson.dataformat.yaml.YAMLFactory;
import io.fabric8.kubernetes.api.model.Container;
import io.fabric8.kubernetes.api.model.Pod;
-import org.junit.Assert;
+import org.junit.jupiter.api.Assertions;
import org.junit.jupiter.api.BeforeAll;
import org.junit.jupiter.api.Test;
import java.io.File;
import java.io.IOException;
+import java.net.URISyntaxException;
import java.nio.file.Files;
import java.util.ArrayList;
import java.util.Arrays;
@@ -88,7 +90,7 @@ public class FlinkConfigBuilderTest {
public void testApplyImage() {
final Configuration configuration =
new FlinkConfigBuilder(flinkDeployment, new
Configuration()).applyImage().build();
- Assert.assertEquals(IMAGE,
configuration.get(KubernetesConfigOptions.CONTAINER_IMAGE));
+ Assertions.assertEquals(IMAGE,
configuration.get(KubernetesConfigOptions.CONTAINER_IMAGE));
}
@Test
@@ -97,7 +99,7 @@ public class FlinkConfigBuilderTest {
new FlinkConfigBuilder(flinkDeployment, new Configuration())
.applyImagePullPolicy()
.build();
- Assert.assertEquals(
+ Assertions.assertEquals(
IMAGE_POLICY,
configuration.get(KubernetesConfigOptions.CONTAINER_IMAGE_PULL_POLICY).toString());
}
@@ -108,11 +110,11 @@ public class FlinkConfigBuilderTest {
new FlinkConfigBuilder(flinkDeployment, new Configuration())
.applyFlinkConfiguration()
.build();
- Assert.assertEquals(2, (int)
configuration.get(TaskManagerOptions.NUM_TASK_SLOTS));
- Assert.assertEquals(
+ Assertions.assertEquals(2, (int)
configuration.get(TaskManagerOptions.NUM_TASK_SLOTS));
+ Assertions.assertEquals(
KubernetesConfigOptions.ServiceExposedType.ClusterIP,
configuration.get(KubernetesConfigOptions.REST_SERVICE_EXPOSED_TYPE));
- Assert.assertEquals(false,
configuration.get(WebOptions.CANCEL_ENABLE));
+ Assertions.assertEquals(false,
configuration.get(WebOptions.CANCEL_ENABLE));
FlinkDeployment deployment =
ReconciliationUtils.clone(flinkDeployment);
deployment
@@ -126,7 +128,7 @@ public class FlinkConfigBuilderTest {
new FlinkConfigBuilder(deployment, new Configuration())
.applyFlinkConfiguration()
.build();
- Assert.assertEquals(
+ Assertions.assertEquals(
KubernetesConfigOptions.ServiceExposedType.LoadBalancer,
configuration.get(KubernetesConfigOptions.REST_SERVICE_EXPOSED_TYPE));
@@ -141,7 +143,7 @@ public class FlinkConfigBuilderTest {
.getCanonicalName()))
.applyFlinkConfiguration()
.build();
- Assert.assertEquals(
+ Assertions.assertEquals(
DEFAULT_CHECKPOINTING_INTERVAL,
configuration.get(ExecutionCheckpointingOptions.CHECKPOINTING_INTERVAL));
}
@@ -157,8 +159,8 @@ public class FlinkConfigBuilderTest {
new File(
configuration.get(DeploymentOptionsInternal.CONF_DIR),
CONFIG_FILE_LOG4J_NAME);
- Assert.assertTrue(log4jFile.exists() && log4jFile.isFile() &&
log4jFile.canRead());
- Assert.assertEquals(CUSTOM_LOG_CONFIG,
Files.readString(log4jFile.toPath()));
+ Assertions.assertTrue(log4jFile.exists() && log4jFile.isFile() &&
log4jFile.canRead());
+ Assertions.assertEquals(CUSTOM_LOG_CONFIG,
Files.readString(log4jFile.toPath()));
}
@Test
@@ -179,8 +181,8 @@ public class FlinkConfigBuilderTest {
configuration.getString(
KubernetesConfigOptions.TASK_MANAGER_POD_TEMPLATE)),
Pod.class);
- Assert.assertEquals("container0",
jmPod.getSpec().getContainers().get(0).getName());
- Assert.assertEquals("container0",
tmPod.getSpec().getContainers().get(0).getName());
+ Assertions.assertEquals("container0",
jmPod.getSpec().getContainers().get(0).getName());
+ Assertions.assertEquals("container0",
tmPod.getSpec().getContainers().get(0).getName());
}
@Test
@@ -189,7 +191,7 @@ public class FlinkConfigBuilderTest {
new FlinkConfigBuilder(flinkDeployment, new Configuration())
.applyIngressDomain()
.build();
- Assert.assertEquals(
+ Assertions.assertEquals(
KubernetesConfigOptions.ServiceExposedType.ClusterIP,
configuration.get(KubernetesConfigOptions.REST_SERVICE_EXPOSED_TYPE));
}
@@ -200,7 +202,7 @@ public class FlinkConfigBuilderTest {
new FlinkConfigBuilder(flinkDeployment, new Configuration())
.applyServiceAccount()
.build();
- Assert.assertEquals(
+ Assertions.assertEquals(
SERVICE_ACCOUNT,
configuration.get(KubernetesConfigOptions.KUBERNETES_SERVICE_ACCOUNT));
}
@@ -212,14 +214,14 @@ public class FlinkConfigBuilderTest {
.applyJobManagerSpec()
.build();
- Assert.assertNull(
+ Assertions.assertNull(
configuration.getString(KubernetesConfigOptions.JOB_MANAGER_POD_TEMPLATE));
- Assert.assertEquals(
+ Assertions.assertEquals(
MemorySize.parse("2048m"),
configuration.get(JobManagerOptions.TOTAL_PROCESS_MEMORY));
- Assert.assertEquals(
+ Assertions.assertEquals(
Double.valueOf(1),
configuration.get(KubernetesConfigOptions.JOB_MANAGER_CPU));
- Assert.assertEquals(
+ Assertions.assertEquals(
Integer.valueOf(2),
configuration.get(KubernetesConfigOptions.KUBERNETES_JOBMANAGER_REPLICAS));
@@ -240,7 +242,7 @@ public class FlinkConfigBuilderTest {
configuration.getString(
KubernetesConfigOptions.JOB_MANAGER_POD_TEMPLATE)),
Pod.class);
- Assert.assertEquals("pod1 api version", jmPod.getApiVersion());
+ Assertions.assertEquals("pod1 api version", jmPod.getApiVersion());
}
@Test
@@ -253,12 +255,12 @@ public class FlinkConfigBuilderTest {
.applyTaskManagerSpec()
.build();
- Assert.assertNull(
+ Assertions.assertNull(
configuration.getString(KubernetesConfigOptions.TASK_MANAGER_POD_TEMPLATE));
- Assert.assertEquals(
+ Assertions.assertEquals(
MemorySize.parse("2048m"),
configuration.get(TaskManagerOptions.TOTAL_PROCESS_MEMORY));
- Assert.assertEquals(
+ Assertions.assertEquals(
Double.valueOf(1),
configuration.get(KubernetesConfigOptions.TASK_MANAGER_CPU));
deploymentClone
@@ -278,20 +280,51 @@ public class FlinkConfigBuilderTest {
configuration.getString(
KubernetesConfigOptions.TASK_MANAGER_POD_TEMPLATE)),
Pod.class);
- Assert.assertEquals("pod2 api version", tmPod.getApiVersion());
+ Assertions.assertEquals("pod2 api version", tmPod.getApiVersion());
}
@Test
public void testApplyJobOrSessionSpec() throws Exception {
+ flinkDeployment.getSpec().getJob().setAllowNonRestoredState(true);
final Configuration configuration =
new FlinkConfigBuilder(flinkDeployment, new Configuration())
.applyJobOrSessionSpec()
.build();
- Assert.assertEquals(
+ Assertions.assertTrue(
+
configuration.getBoolean(SavepointConfigOptions.SAVEPOINT_IGNORE_UNCLAIMED_STATE));
+ Assertions.assertEquals(
KubernetesDeploymentTarget.APPLICATION.getName(),
configuration.get(DeploymentOptions.TARGET));
- Assert.assertEquals(SAMPLE_JAR,
configuration.get(PipelineOptions.JARS).get(0));
- Assert.assertEquals(Integer.valueOf(2),
configuration.get(CoreOptions.DEFAULT_PARALLELISM));
+ Assertions.assertEquals(SAMPLE_JAR,
configuration.get(PipelineOptions.JARS).get(0));
+ Assertions.assertEquals(
+ Integer.valueOf(2),
configuration.get(CoreOptions.DEFAULT_PARALLELISM));
+ }
+
+ @Test
+ public void testAllowNonRestoredStateInSpecOverrideInFlinkConf() throws
URISyntaxException {
+ flinkDeployment.getSpec().getJob().setAllowNonRestoredState(false);
+ flinkDeployment
+ .getSpec()
+ .getFlinkConfiguration()
+
.put(SavepointConfigOptions.SAVEPOINT_IGNORE_UNCLAIMED_STATE.key(), "true");
+ Configuration configuration =
+ new FlinkConfigBuilder(flinkDeployment, new Configuration())
+ .applyJobOrSessionSpec()
+ .build();
+ Assertions.assertFalse(
+
configuration.getBoolean(SavepointConfigOptions.SAVEPOINT_IGNORE_UNCLAIMED_STATE));
+
+ flinkDeployment.getSpec().getJob().setAllowNonRestoredState(true);
+ flinkDeployment
+ .getSpec()
+ .getFlinkConfiguration()
+
.put(SavepointConfigOptions.SAVEPOINT_IGNORE_UNCLAIMED_STATE.key(), "false");
+ configuration =
+ new FlinkConfigBuilder(flinkDeployment, new Configuration())
+ .applyJobOrSessionSpec()
+ .build();
+ Assertions.assertTrue(
+
configuration.getBoolean(SavepointConfigOptions.SAVEPOINT_IGNORE_UNCLAIMED_STATE));
}
@Test
@@ -306,7 +339,7 @@ public class FlinkConfigBuilderTest {
final String clusterId = flinkDeployment.getMetadata().getName();
// Most configs have been tested by previous unit tests, thus we only
verify the namespace
// and clusterId here.
- Assert.assertEquals(namespace,
configuration.get(KubernetesConfigOptions.NAMESPACE));
- Assert.assertEquals(clusterId,
configuration.get(KubernetesConfigOptions.CLUSTER_ID));
+ Assertions.assertEquals(namespace,
configuration.get(KubernetesConfigOptions.NAMESPACE));
+ Assertions.assertEquals(clusterId,
configuration.get(KubernetesConfigOptions.CLUSTER_ID));
}
}
diff --git
a/helm/flink-kubernetes-operator/crds/flinkdeployments.flink.apache.org-v1.yml
b/helm/flink-kubernetes-operator/crds/flinkdeployments.flink.apache.org-v1.yml
index f6ef559..eb30ae8 100644
---
a/helm/flink-kubernetes-operator/crds/flinkdeployments.flink.apache.org-v1.yml
+++
b/helm/flink-kubernetes-operator/crds/flinkdeployments.flink.apache.org-v1.yml
@@ -9075,6 +9075,8 @@ spec:
- last-state
- stateless
type: string
+ allowNonRestoredState:
+ type: boolean
type: object
restartNonce:
type: integer
diff --git
a/helm/flink-kubernetes-operator/crds/flinksessionjobs.flink.apache.org-v1.yml
b/helm/flink-kubernetes-operator/crds/flinksessionjobs.flink.apache.org-v1.yml
index 51ea512..e94e543 100644
---
a/helm/flink-kubernetes-operator/crds/flinksessionjobs.flink.apache.org-v1.yml
+++
b/helm/flink-kubernetes-operator/crds/flinksessionjobs.flink.apache.org-v1.yml
@@ -48,6 +48,8 @@ spec:
- last-state
- stateless
type: string
+ allowNonRestoredState:
+ type: boolean
type: object
restartNonce:
type: integer