This is an automated email from the ASF dual-hosted git repository.
gyfora pushed a commit to branch main
in repository https://gitbox.apache.org/repos/asf/flink-kubernetes-operator.git
The following commit(s) were added to refs/heads/main by this push:
new 24643c8c [FLINK-31220] Replace Pod with PodTemplateSpec for the pod
template properties
24643c8c is described below
commit 24643c8c6d9d734732ed2cb7e3112c4452675f40
Author: Gyula Fora <[email protected]>
AuthorDate: Tue Feb 6 21:49:21 2024 +0100
[FLINK-31220] Replace Pod with PodTemplateSpec for the pod template
properties
---
docs/content/docs/custom-resource/pod-template.md | 8 ----
docs/content/docs/custom-resource/reference.md | 6 +--
e2e-tests/data/multi-sessionjob.yaml | 4 --
examples/pod-template.yaml | 8 ----
.../operator/api/spec/FlinkDeploymentSpec.java | 5 +-
.../operator/api/spec/JobManagerSpec.java | 5 +-
.../operator/api/spec/TaskManagerSpec.java | 5 +-
.../operator/api/utils/BaseTestUtils.java | 15 ++++--
.../operator/config/FlinkConfigBuilder.java | 17 +++----
.../reconciler/diff/ReflectiveDiffBuilder.java | 46 ++++++++++++++++--
.../kubernetes/operator/utils/FlinkUtils.java | 9 ++--
.../operator/config/FlinkConfigBuilderTest.java | 54 +++++++++++-----------
.../operator/config/FlinkConfigManagerTest.java | 4 +-
.../operator/reconciler/diff/SpecDiffTest.java | 39 +++++++++++++---
.../kubernetes/operator/utils/FlinkUtilsTest.java | 25 ++++------
15 files changed, 152 insertions(+), 98 deletions(-)
diff --git a/docs/content/docs/custom-resource/pod-template.md
b/docs/content/docs/custom-resource/pod-template.md
index 6d704c92..bf7097e5 100644
--- a/docs/content/docs/custom-resource/pod-template.md
+++ b/docs/content/docs/custom-resource/pod-template.md
@@ -55,10 +55,6 @@ spec:
taskmanager.numberOfTaskSlots: "2"
serviceAccount: flink
podTemplate:
- apiVersion: v1
- kind: Pod
- metadata:
- name: pod-template
spec:
containers:
# Do not change the main container name
@@ -85,10 +81,6 @@ spec:
memory: "2048m"
cpu: 1
podTemplate:
- apiVersion: v1
- kind: Pod
- metadata:
- name: task-manager-pod-template
spec:
initContainers:
# Sample sidecar container
diff --git a/docs/content/docs/custom-resource/reference.md
b/docs/content/docs/custom-resource/reference.md
index c7810eb2..9db6371e 100644
--- a/docs/content/docs/custom-resource/reference.md
+++ b/docs/content/docs/custom-resource/reference.md
@@ -53,7 +53,7 @@ This page serves as a full reference for FlinkDeployment
custom resource definit
| serviceAccount | java.lang.String | Kubernetes service used by the Flink
deployment. |
| flinkVersion | org.apache.flink.kubernetes.operator.api.spec.FlinkVersion |
Flink image version. |
| ingress | org.apache.flink.kubernetes.operator.api.spec.IngressSpec |
Ingress specs. |
-| podTemplate | io.fabric8.kubernetes.api.model.Pod | Base pod template for
job and task manager pods. Can be overridden by the jobManager and taskManager
pod templates. |
+| podTemplate | io.fabric8.kubernetes.api.model.PodTemplateSpec | Base pod
template for job and task manager pods. Can be overridden by the jobManager and
taskManager pod templates. |
| jobManager | org.apache.flink.kubernetes.operator.api.spec.JobManagerSpec |
JobManager specs. |
| taskManager | org.apache.flink.kubernetes.operator.api.spec.TaskManagerSpec
| TaskManager specs. |
| logConfiguration | java.util.Map<java.lang.String,java.lang.String> | Log
configuration overrides for the Flink deployment. Format logConfigFileName ->
configContent. |
@@ -108,7 +108,7 @@ This page serves as a full reference for FlinkDeployment
custom resource definit
| ----------| ---- | ---- |
| resource | org.apache.flink.kubernetes.operator.api.spec.Resource | Resource
specification for the JobManager pods. |
| replicas | int | Number of JobManager replicas. Must be 1 for non-HA
deployments. |
-| podTemplate | io.fabric8.kubernetes.api.model.Pod | JobManager pod template.
It will be merged with FlinkDeploymentSpec.podTemplate. |
+| podTemplate | io.fabric8.kubernetes.api.model.PodTemplateSpec | JobManager
pod template. It will be merged with FlinkDeploymentSpec.podTemplate. |
### JobSpec
**Class**: org.apache.flink.kubernetes.operator.api.spec.JobSpec
@@ -169,7 +169,7 @@ This page serves as a full reference for FlinkDeployment
custom resource definit
| ----------| ---- | ---- |
| resource | org.apache.flink.kubernetes.operator.api.spec.Resource | Resource
specification for the TaskManager pods. |
| replicas | java.lang.Integer | Number of TaskManager replicas. If defined,
takes precedence over parallelism |
-| podTemplate | io.fabric8.kubernetes.api.model.Pod | TaskManager pod
template. It will be merged with FlinkDeploymentSpec.podTemplate. |
+| podTemplate | io.fabric8.kubernetes.api.model.PodTemplateSpec | TaskManager
pod template. It will be merged with FlinkDeploymentSpec.podTemplate. |
### UpgradeMode
**Class**: org.apache.flink.kubernetes.operator.api.spec.UpgradeMode
diff --git a/e2e-tests/data/multi-sessionjob.yaml
b/e2e-tests/data/multi-sessionjob.yaml
index fd30c838..d10b6c11 100644
--- a/e2e-tests/data/multi-sessionjob.yaml
+++ b/e2e-tests/data/multi-sessionjob.yaml
@@ -85,10 +85,6 @@ spec:
state.savepoints.dir: file:///opt/flink/volume/flink-sp
serviceAccount: flink
podTemplate:
- apiVersion: v1
- kind: Pod
- metadata:
- name: pod-template
spec:
containers:
# Do not change the main container name
diff --git a/examples/pod-template.yaml b/examples/pod-template.yaml
index 17ec3c8a..bab35d4a 100644
--- a/examples/pod-template.yaml
+++ b/examples/pod-template.yaml
@@ -27,10 +27,6 @@ spec:
taskmanager.numberOfTaskSlots: "2"
serviceAccount: flink
podTemplate:
- apiVersion: v1
- kind: Pod
- metadata:
- name: pod-template
spec:
containers:
# Do not change the main container name
@@ -57,10 +53,6 @@ spec:
memory: "2048m"
cpu: 1
podTemplate:
- apiVersion: v1
- kind: Pod
- metadata:
- name: task-manager-pod-template
spec:
initContainers:
# Sample init container for fetching remote artifacts
diff --git
a/flink-kubernetes-operator-api/src/main/java/org/apache/flink/kubernetes/operator/api/spec/FlinkDeploymentSpec.java
b/flink-kubernetes-operator-api/src/main/java/org/apache/flink/kubernetes/operator/api/spec/FlinkDeploymentSpec.java
index 78f49cf8..29e40906 100644
---
a/flink-kubernetes-operator-api/src/main/java/org/apache/flink/kubernetes/operator/api/spec/FlinkDeploymentSpec.java
+++
b/flink-kubernetes-operator-api/src/main/java/org/apache/flink/kubernetes/operator/api/spec/FlinkDeploymentSpec.java
@@ -20,7 +20,9 @@ package org.apache.flink.kubernetes.operator.api.spec;
import org.apache.flink.annotation.Experimental;
import com.fasterxml.jackson.annotation.JsonIgnoreProperties;
+import io.fabric8.crd.generator.annotation.SchemaFrom;
import io.fabric8.kubernetes.api.model.Pod;
+import io.fabric8.kubernetes.api.model.PodTemplateSpec;
import lombok.AllArgsConstructor;
import lombok.Data;
import lombok.EqualsAndHashCode;
@@ -59,7 +61,8 @@ public class FlinkDeploymentSpec extends AbstractFlinkSpec {
* Base pod template for job and task manager pods. Can be overridden by
the jobManager and
* taskManager pod templates.
*/
- private Pod podTemplate;
+ @SchemaFrom(type = Pod.class)
+ private PodTemplateSpec podTemplate;
/** JobManager specs. */
private JobManagerSpec jobManager;
diff --git
a/flink-kubernetes-operator-api/src/main/java/org/apache/flink/kubernetes/operator/api/spec/JobManagerSpec.java
b/flink-kubernetes-operator-api/src/main/java/org/apache/flink/kubernetes/operator/api/spec/JobManagerSpec.java
index 522a1e08..658d48fe 100644
---
a/flink-kubernetes-operator-api/src/main/java/org/apache/flink/kubernetes/operator/api/spec/JobManagerSpec.java
+++
b/flink-kubernetes-operator-api/src/main/java/org/apache/flink/kubernetes/operator/api/spec/JobManagerSpec.java
@@ -20,7 +20,9 @@ package org.apache.flink.kubernetes.operator.api.spec;
import org.apache.flink.annotation.Experimental;
import com.fasterxml.jackson.annotation.JsonIgnoreProperties;
+import io.fabric8.crd.generator.annotation.SchemaFrom;
import io.fabric8.kubernetes.api.model.Pod;
+import io.fabric8.kubernetes.api.model.PodTemplateSpec;
import lombok.AllArgsConstructor;
import lombok.Builder;
import lombok.Data;
@@ -41,5 +43,6 @@ public class JobManagerSpec {
private int replicas = 1;
/** JobManager pod template. It will be merged with
FlinkDeploymentSpec.podTemplate. */
- private Pod podTemplate;
+ @SchemaFrom(type = Pod.class)
+ private PodTemplateSpec podTemplate;
}
diff --git
a/flink-kubernetes-operator-api/src/main/java/org/apache/flink/kubernetes/operator/api/spec/TaskManagerSpec.java
b/flink-kubernetes-operator-api/src/main/java/org/apache/flink/kubernetes/operator/api/spec/TaskManagerSpec.java
index 8758ebad..861303f4 100644
---
a/flink-kubernetes-operator-api/src/main/java/org/apache/flink/kubernetes/operator/api/spec/TaskManagerSpec.java
+++
b/flink-kubernetes-operator-api/src/main/java/org/apache/flink/kubernetes/operator/api/spec/TaskManagerSpec.java
@@ -23,7 +23,9 @@ import org.apache.flink.kubernetes.operator.api.diff.Diffable;
import org.apache.flink.kubernetes.operator.api.diff.SpecDiff;
import com.fasterxml.jackson.annotation.JsonIgnoreProperties;
+import io.fabric8.crd.generator.annotation.SchemaFrom;
import io.fabric8.kubernetes.api.model.Pod;
+import io.fabric8.kubernetes.api.model.PodTemplateSpec;
import io.fabric8.kubernetes.model.annotation.SpecReplicas;
import lombok.AllArgsConstructor;
import lombok.Builder;
@@ -47,5 +49,6 @@ public class TaskManagerSpec implements
Diffable<TaskManagerSpec> {
private Integer replicas;
/** TaskManager pod template. It will be merged with
FlinkDeploymentSpec.podTemplate. */
- private Pod podTemplate;
+ @SchemaFrom(type = Pod.class)
+ private PodTemplateSpec podTemplate;
}
diff --git
a/flink-kubernetes-operator-api/src/test/java/org/apache/flink/kubernetes/operator/api/utils/BaseTestUtils.java
b/flink-kubernetes-operator-api/src/test/java/org/apache/flink/kubernetes/operator/api/utils/BaseTestUtils.java
index 89724023..8e887766 100644
---
a/flink-kubernetes-operator-api/src/test/java/org/apache/flink/kubernetes/operator/api/utils/BaseTestUtils.java
+++
b/flink-kubernetes-operator-api/src/test/java/org/apache/flink/kubernetes/operator/api/utils/BaseTestUtils.java
@@ -39,6 +39,7 @@ import io.fabric8.kubernetes.api.model.Container;
import io.fabric8.kubernetes.api.model.ObjectMetaBuilder;
import io.fabric8.kubernetes.api.model.Pod;
import io.fabric8.kubernetes.api.model.PodSpec;
+import io.fabric8.kubernetes.api.model.PodTemplateSpec;
import java.time.Instant;
import java.util.HashMap;
@@ -185,13 +186,21 @@ public class BaseTestUtils {
.build();
}
- public static Pod getTestPod(String hostname, String apiVersion,
List<Container> containers) {
+ public static PodTemplateSpec getTestPodTemplate(String hostname,
List<Container> containers) {
final PodSpec podSpec = new PodSpec();
podSpec.setHostname(hostname);
podSpec.setContainers(containers);
- final Pod pod = new Pod();
- pod.setApiVersion(apiVersion);
+ var pod = new PodTemplateSpec();
pod.setSpec(podSpec);
return pod;
}
+
+ public static Pod getTestPod(String hostname, String apiVersion,
List<Container> containers) {
+ var pod = new Pod();
+ var podTemplate = getTestPodTemplate(hostname, containers);
+ pod.setApiVersion(apiVersion);
+ pod.setSpec(podTemplate.getSpec());
+ pod.setMetadata(podTemplate.getMetadata());
+ return pod;
+ }
}
diff --git
a/flink-kubernetes-operator/src/main/java/org/apache/flink/kubernetes/operator/config/FlinkConfigBuilder.java
b/flink-kubernetes-operator/src/main/java/org/apache/flink/kubernetes/operator/config/FlinkConfigBuilder.java
index 6caa46a3..339dfaa3 100644
---
a/flink-kubernetes-operator/src/main/java/org/apache/flink/kubernetes/operator/config/FlinkConfigBuilder.java
+++
b/flink-kubernetes-operator/src/main/java/org/apache/flink/kubernetes/operator/config/FlinkConfigBuilder.java
@@ -52,8 +52,8 @@ import org.apache.flink.util.FileUtils;
import org.apache.flink.util.StringUtils;
import io.fabric8.kubernetes.api.model.Container;
-import io.fabric8.kubernetes.api.model.Pod;
import io.fabric8.kubernetes.api.model.PodSpec;
+import io.fabric8.kubernetes.api.model.PodTemplateSpec;
import io.fabric8.kubernetes.api.model.Quantity;
import io.fabric8.kubernetes.api.model.ResourceRequirements;
import io.fabric8.kubernetes.client.utils.Serialization;
@@ -189,11 +189,11 @@ public class FlinkConfigBuilder {
}
protected FlinkConfigBuilder applyPodTemplate() throws IOException {
- Pod commonPodTemplate = spec.getPodTemplate();
+ PodTemplateSpec commonPodTemplate = spec.getPodTemplate();
boolean mergeByName =
effectiveConfig.get(KubernetesOperatorConfigOptions.POD_TEMPLATE_MERGE_BY_NAME);
- Pod jmPodTemplate;
+ PodTemplateSpec jmPodTemplate;
if (spec.getJobManager() != null) {
jmPodTemplate =
mergePodTemplates(
@@ -208,7 +208,7 @@ public class FlinkConfigBuilder {
if (effectiveConfig.get(
KubernetesOperatorConfigOptions.OPERATOR_JM_STARTUP_PROBE_ENABLED)) {
if (jmPodTemplate == null) {
- jmPodTemplate = new Pod();
+ jmPodTemplate = new PodTemplateSpec();
}
FlinkUtils.addStartupProbe(jmPodTemplate);
}
@@ -219,7 +219,7 @@ public class FlinkConfigBuilder {
effectiveConfig.set(KubernetesConfigOptions.JOB_MANAGER_POD_TEMPLATE,
jmTemplateFile);
}
- Pod tmPodTemplate;
+ PodTemplateSpec tmPodTemplate;
if (spec.getTaskManager() != null) {
tmPodTemplate =
mergePodTemplates(
@@ -488,14 +488,15 @@ public class FlinkConfigBuilder {
}
@VisibleForTesting
- protected static Pod applyResourceToPodTemplate(Pod podTemplate, Resource
resource) {
+ protected static PodTemplateSpec applyResourceToPodTemplate(
+ PodTemplateSpec podTemplate, Resource resource) {
if (resource == null
||
StringUtils.isNullOrWhitespaceOnly(resource.getEphemeralStorage())) {
return podTemplate;
}
if (podTemplate == null) {
- Pod newPodTemplate = new Pod();
+ var newPodTemplate = new PodTemplateSpec();
newPodTemplate.setSpec(createPodSpecWithResource(resource));
return newPodTemplate;
} else if (podTemplate.getSpec() == null) {
@@ -568,7 +569,7 @@ public class FlinkConfigBuilder {
return tmpDir.getAbsolutePath();
}
- private static String createTempFile(Pod podTemplate) throws IOException {
+ private static String createTempFile(PodTemplateSpec podTemplate) throws
IOException {
final File tmp = File.createTempFile(GENERATED_FILE_PREFIX +
"podTemplate_", ".yaml");
Files.write(tmp.toPath(),
Serialization.asYaml(podTemplate).getBytes());
tmp.deleteOnExit();
diff --git
a/flink-kubernetes-operator/src/main/java/org/apache/flink/kubernetes/operator/reconciler/diff/ReflectiveDiffBuilder.java
b/flink-kubernetes-operator/src/main/java/org/apache/flink/kubernetes/operator/reconciler/diff/ReflectiveDiffBuilder.java
index 2a1fed4e..5ab52d7f 100644
---
a/flink-kubernetes-operator/src/main/java/org/apache/flink/kubernetes/operator/reconciler/diff/ReflectiveDiffBuilder.java
+++
b/flink-kubernetes-operator/src/main/java/org/apache/flink/kubernetes/operator/reconciler/diff/ReflectiveDiffBuilder.java
@@ -21,8 +21,10 @@ import org.apache.flink.annotation.Experimental;
import org.apache.flink.kubernetes.operator.api.diff.DiffType;
import org.apache.flink.kubernetes.operator.api.diff.Diffable;
import org.apache.flink.kubernetes.operator.api.diff.SpecDiff;
+import org.apache.flink.kubernetes.operator.api.spec.FlinkDeploymentSpec;
import org.apache.flink.kubernetes.operator.api.spec.KubernetesDeploymentMode;
+import io.fabric8.kubernetes.api.model.PodTemplateSpec;
import lombok.NonNull;
import org.apache.commons.lang3.ClassUtils;
import org.apache.commons.lang3.ObjectUtils;
@@ -61,6 +63,8 @@ public class ReflectiveDiffBuilder<T> implements
Builder<DiffResult<T>> {
this.before = before;
this.after = after;
diffBuilder = new DiffBuilder<>(before, after);
+ clearIgnoredFields(before);
+ clearIgnoredFields(after);
}
@Override
@@ -79,6 +83,7 @@ public class ReflectiveDiffBuilder<T> implements
Builder<DiffResult<T>> {
try {
var leftField = readField(field, before, true);
var rightField = readField(field, after, true);
+
if (field.isAnnotationPresent(SpecDiff.Config.class)
&& Map.class.isAssignableFrom(field.getType())) {
diffBuilder.append(
@@ -112,11 +117,7 @@ public class ReflectiveDiffBuilder<T> implements
Builder<DiffResult<T>> {
.build());
} else {
- diffBuilder.append(
- field.getName(),
- readField(field, before, true),
- readField(field, after, true),
- UPGRADE);
+ diffBuilder.append(field.getName(), leftField,
rightField, UPGRADE);
}
} catch (final IllegalAccessException ex) {
@@ -172,4 +173,39 @@ public class ReflectiveDiffBuilder<T> implements
Builder<DiffResult<T>> {
}
return diffType;
}
+
+ /**
+ * This method is responsible for clearing / nulling out deprecated fields
that should be
+ * ignored during spec diff comparison. These fields may still be present
in the
+ * lastReconciledSpec.
+ *
+ * @param o Object to be cleaned.
+ */
+ private static void clearIgnoredFields(Object o) {
+ if (o == null) {
+ return;
+ }
+ if (o instanceof FlinkDeploymentSpec) {
+ var spec = (FlinkDeploymentSpec) o;
+ clearPodTemplateAdditionalProps(spec.getPodTemplate());
+ if (spec.getJobManager() != null) {
+
clearPodTemplateAdditionalProps(spec.getJobManager().getPodTemplate());
+ }
+ if (spec.getTaskManager() != null) {
+
clearPodTemplateAdditionalProps(spec.getTaskManager().getPodTemplate());
+ }
+ }
+ }
+
+ /**
+ * Remove additional props from deserialized PodTemplateSpec which could
still be there when we
+ * moved Pod -> PodTemplateSpec.
+ *
+ * @param o Object to be cleaned.
+ */
+ private static void clearPodTemplateAdditionalProps(Object o) {
+ if (o != null && o instanceof PodTemplateSpec) {
+ ((PodTemplateSpec) o).setAdditionalProperties(null);
+ }
+ }
}
diff --git
a/flink-kubernetes-operator/src/main/java/org/apache/flink/kubernetes/operator/utils/FlinkUtils.java
b/flink-kubernetes-operator/src/main/java/org/apache/flink/kubernetes/operator/utils/FlinkUtils.java
index cd12f056..10288c70 100644
---
a/flink-kubernetes-operator/src/main/java/org/apache/flink/kubernetes/operator/utils/FlinkUtils.java
+++
b/flink-kubernetes-operator/src/main/java/org/apache/flink/kubernetes/operator/utils/FlinkUtils.java
@@ -41,8 +41,8 @@ import io.fabric8.kubernetes.api.model.ConfigMapList;
import io.fabric8.kubernetes.api.model.Container;
import io.fabric8.kubernetes.api.model.HTTPGetAction;
import io.fabric8.kubernetes.api.model.IntOrString;
-import io.fabric8.kubernetes.api.model.Pod;
import io.fabric8.kubernetes.api.model.PodSpec;
+import io.fabric8.kubernetes.api.model.PodTemplateSpec;
import io.fabric8.kubernetes.api.model.Probe;
import io.fabric8.kubernetes.api.model.apps.Deployment;
import io.fabric8.kubernetes.api.model.apps.DeploymentCondition;
@@ -69,7 +69,8 @@ public class FlinkUtils {
public static final String CR_GENERATION_LABEL =
"flinkdeployment.flink.apache.org/generation";
- public static Pod mergePodTemplates(Pod toPod, Pod fromPod, boolean
mergeArraysByName) {
+ public static PodTemplateSpec mergePodTemplates(
+ PodTemplateSpec toPod, PodTemplateSpec fromPod, boolean
mergeArraysByName) {
if (fromPod == null) {
return ReconciliationUtils.clone(toPod);
} else if (toPod == null) {
@@ -79,7 +80,7 @@ public class FlinkUtils {
JsonNode node2 = MAPPER.valueToTree(fromPod);
mergeInto(node1, node2, mergeArraysByName);
try {
- return MAPPER.treeToValue(node1, Pod.class);
+ return MAPPER.treeToValue(node1, PodTemplateSpec.class);
} catch (Exception ex) {
throw new RuntimeException(ex);
}
@@ -152,7 +153,7 @@ public class FlinkUtils {
return out;
}
- public static void addStartupProbe(Pod pod) {
+ public static void addStartupProbe(PodTemplateSpec pod) {
var spec = pod.getSpec();
if (spec == null) {
spec = new PodSpec();
diff --git
a/flink-kubernetes-operator/src/test/java/org/apache/flink/kubernetes/operator/config/FlinkConfigBuilderTest.java
b/flink-kubernetes-operator/src/test/java/org/apache/flink/kubernetes/operator/config/FlinkConfigBuilderTest.java
index 4c905c3f..04a7c842 100644
---
a/flink-kubernetes-operator/src/test/java/org/apache/flink/kubernetes/operator/config/FlinkConfigBuilderTest.java
+++
b/flink-kubernetes-operator/src/test/java/org/apache/flink/kubernetes/operator/config/FlinkConfigBuilderTest.java
@@ -52,7 +52,7 @@ import com.fasterxml.jackson.databind.ObjectMapper;
import com.fasterxml.jackson.dataformat.yaml.YAMLFactory;
import io.fabric8.kubernetes.api.model.Container;
import io.fabric8.kubernetes.api.model.ObjectMeta;
-import io.fabric8.kubernetes.api.model.Pod;
+import io.fabric8.kubernetes.api.model.PodTemplateSpec;
import org.junit.jupiter.api.Assertions;
import org.junit.jupiter.api.BeforeEach;
import org.junit.jupiter.api.Test;
@@ -209,7 +209,9 @@ public class FlinkConfigBuilderTest {
mainContainer.setName(Constants.MAIN_CONTAINER_NAME);
mainContainer.setImage("test");
- flinkDeployment.getSpec().setPodTemplate(TestUtils.getTestPod("", "",
List.of(container0)));
+ flinkDeployment
+ .getSpec()
+ .setPodTemplate(TestUtils.getTestPodTemplate("",
List.of(container0)));
var inConfig = new Configuration();
inConfig.set(KubernetesOperatorConfigOptions.OPERATOR_JM_STARTUP_PROBE_ENABLED,
false);
@@ -227,18 +229,22 @@ public class FlinkConfigBuilderTest {
List.of(container0),
getJmPod(configuration).getSpec().getContainers());
flinkDeployment.getSpec().getJobManager().getResource().setEphemeralStorage("2G");
-
flinkDeployment.getSpec().getTaskManager().getResource().setEphemeralStorage("2G");
+
flinkDeployment.getSpec().getTaskManager().getResource().setEphemeralStorage("3G");
configuration =
new FlinkConfigBuilder(flinkDeployment, inConfig.clone())
.applyPodTemplate()
.build();
+
+ Assertions.assertNotEquals(
+
configuration.getString(KubernetesConfigOptions.JOB_MANAGER_POD_TEMPLATE),
+
configuration.getString(KubernetesConfigOptions.TASK_MANAGER_POD_TEMPLATE));
assertMainContainerEphemeralStorage(
getJmPod(configuration).getSpec().getContainers().get(1),
"2G");
Assertions.assertEquals(
container0,
getJmPod(configuration).getSpec().getContainers().get(0));
assertMainContainerEphemeralStorage(
- getTmPod(configuration).getSpec().getContainers().get(1),
"2G");
+ getTmPod(configuration).getSpec().getContainers().get(1),
"3G");
Assertions.assertEquals(
container0,
getTmPod(configuration).getSpec().getContainers().get(0));
@@ -283,11 +289,12 @@ public class FlinkConfigBuilderTest {
flinkDeployment
.getSpec()
.getJobManager()
- .setPodTemplate(TestUtils.getTestPod("", "",
List.of(mainContainer, container0)));
+ .setPodTemplate(
+ TestUtils.getTestPodTemplate("",
List.of(mainContainer, container0)));
flinkDeployment
.getSpec()
.getTaskManager()
- .setPodTemplate(TestUtils.getTestPod("", "",
List.of(container1)));
+ .setPodTemplate(TestUtils.getTestPodTemplate("",
List.of(container1)));
configuration =
new FlinkConfigBuilder(flinkDeployment, inConfig.clone())
@@ -308,7 +315,7 @@ public class FlinkConfigBuilderTest {
Assertions.assertEquals(List.of(container1),
tmPod.getSpec().getContainers());
// Override common
- var common = TestUtils.getTestPod("", "", Collections.emptyList());
+ var common = TestUtils.getTestPodTemplate("", Collections.emptyList());
common.getSpec().setDnsPolicy("test");
flinkDeployment.getSpec().setPodTemplate(common);
@@ -361,17 +368,17 @@ public class FlinkConfigBuilderTest {
configuration.contains(KubernetesConfigOptions.TASK_MANAGER_POD_TEMPLATE));
}
- private Pod getJmPod(Configuration configuration) throws IOException {
+ private PodTemplateSpec getJmPod(Configuration configuration) throws
IOException {
return OBJECT_MAPPER.readValue(
new
File(configuration.getString(KubernetesConfigOptions.JOB_MANAGER_POD_TEMPLATE)),
- Pod.class);
+ PodTemplateSpec.class);
}
- private Pod getTmPod(Configuration configuration) throws IOException {
+ private PodTemplateSpec getTmPod(Configuration configuration) throws
IOException {
return OBJECT_MAPPER.readValue(
new File(
configuration.getString(KubernetesConfigOptions.TASK_MANAGER_POD_TEMPLATE)),
- Pod.class);
+ PodTemplateSpec.class);
}
@Test
@@ -402,7 +409,7 @@ public class FlinkConfigBuilderTest {
// We must use deprecated configs for 1.16 and before
deploymentClone.getSpec().setFlinkVersion(FlinkVersion.v1_16);
- deploymentClone.getSpec().setPodTemplate(new Pod());
+ deploymentClone.getSpec().setPodTemplate(new PodTemplateSpec());
Configuration configuration =
new FlinkConfigBuilder(deploymentClone, new Configuration())
.applyJobManagerSpec()
@@ -447,16 +454,13 @@ public class FlinkConfigBuilderTest {
flinkDeployment
.getSpec()
.getJobManager()
- .setPodTemplate(
- TestUtils.getTestPod(
- "pod1 hostname", "pod1 api version", new
ArrayList<>()));
+ .setPodTemplate(TestUtils.getTestPodTemplate("pod1 hostname",
new ArrayList<>()));
configuration =
new FlinkConfigBuilder(flinkDeployment, new Configuration())
.applyPodTemplate()
.build();
- Pod jmPod = getJmPod(configuration);
- assertEquals("pod1 api version", jmPod.getApiVersion());
+ var jmPod = getJmPod(configuration);
assertMainContainerEphemeralStorage(jmPod.getSpec().getContainers().get(0),
"2G");
}
@@ -718,16 +722,13 @@ public class FlinkConfigBuilderTest {
flinkDeployment
.getSpec()
.getTaskManager()
- .setPodTemplate(
- TestUtils.getTestPod(
- "pod2 hostname", "pod2 api version", new
ArrayList<>()));
+ .setPodTemplate(TestUtils.getTestPodTemplate("pod2 hostname",
new ArrayList<>()));
configuration =
new FlinkConfigBuilder(flinkDeployment, new Configuration())
.applyPodTemplate()
.build();
var tmPod = getTmPod(configuration);
- assertEquals("pod2 api version", tmPod.getApiVersion());
assertMainContainerEphemeralStorage(tmPod.getSpec().getContainers().get(0),
"2G");
}
@@ -916,11 +917,11 @@ public class FlinkConfigBuilderTest {
public void testApplyResourceToPodTemplate() {
Resource resource =
flinkDeployment.getSpec().getTaskManager().getResource();
- Pod pod = FlinkConfigBuilder.applyResourceToPodTemplate(null,
resource);
+ var pod = FlinkConfigBuilder.applyResourceToPodTemplate(null,
resource);
assertEquals(Constants.MAIN_CONTAINER_NAME,
pod.getSpec().getContainers().get(0).getName());
assertMainContainerEphemeralStorage(pod.getSpec().getContainers().get(0), "2G");
- Pod podWithMetadata = new Pod();
+ var podWithMetadata = new PodTemplateSpec();
ObjectMeta metaData = new ObjectMeta();
podWithMetadata.setMetadata(metaData);
pod = FlinkConfigBuilder.applyResourceToPodTemplate(podWithMetadata,
resource);
@@ -947,14 +948,13 @@ public class FlinkConfigBuilderTest {
.toString());
}
- private Pod createTestPodWithContainers() {
+ private PodTemplateSpec createTestPodWithContainers() {
Container mainContainer = new Container();
mainContainer.setName(Constants.MAIN_CONTAINER_NAME);
Container sideCarContainer = new Container();
sideCarContainer.setName("sidecar");
- Pod pod =
- TestUtils.getTestPod(
- "hostname", "api version", List.of(mainContainer,
sideCarContainer));
+ var pod =
+ TestUtils.getTestPodTemplate("hostname",
List.of(mainContainer, sideCarContainer));
return pod;
}
}
diff --git
a/flink-kubernetes-operator/src/test/java/org/apache/flink/kubernetes/operator/config/FlinkConfigManagerTest.java
b/flink-kubernetes-operator/src/test/java/org/apache/flink/kubernetes/operator/config/FlinkConfigManagerTest.java
index 3ec8fa40..c8e575cf 100644
---
a/flink-kubernetes-operator/src/test/java/org/apache/flink/kubernetes/operator/config/FlinkConfigManagerTest.java
+++
b/flink-kubernetes-operator/src/test/java/org/apache/flink/kubernetes/operator/config/FlinkConfigManagerTest.java
@@ -33,7 +33,7 @@ import org.apache.flink.kubernetes.operator.utils.EnvUtils;
import org.apache.flink.kubernetes.operator.utils.FlinkUtils;
import org.apache.flink.kubernetes.utils.Constants;
-import io.fabric8.kubernetes.api.model.Pod;
+import io.fabric8.kubernetes.api.model.PodTemplateSpec;
import org.junit.jupiter.api.Assertions;
import org.junit.jupiter.api.Test;
import org.junit.jupiter.api.io.TempDir;
@@ -135,7 +135,7 @@ public class FlinkConfigManagerTest {
FlinkDeployment deployment = TestUtils.buildApplicationCluster();
deployment.getSpec().setLogConfiguration(Map.of(Constants.CONFIG_FILE_LOG4J_NAME,
"test"));
- deployment.getSpec().setPodTemplate(new Pod());
+ deployment.getSpec().setPodTemplate(new PodTemplateSpec());
ReconciliationUtils.updateStatusForDeployedSpec(deployment, config);
Configuration deployConfig =
configManager.getObserveConfig(deployment);
diff --git
a/flink-kubernetes-operator/src/test/java/org/apache/flink/kubernetes/operator/reconciler/diff/SpecDiffTest.java
b/flink-kubernetes-operator/src/test/java/org/apache/flink/kubernetes/operator/reconciler/diff/SpecDiffTest.java
index b0aa7c6f..07bc421b 100644
---
a/flink-kubernetes-operator/src/test/java/org/apache/flink/kubernetes/operator/reconciler/diff/SpecDiffTest.java
+++
b/flink-kubernetes-operator/src/test/java/org/apache/flink/kubernetes/operator/reconciler/diff/SpecDiffTest.java
@@ -34,7 +34,11 @@ import
org.apache.flink.kubernetes.operator.api.utils.SpecUtils;
import
org.apache.flink.kubernetes.operator.config.KubernetesOperatorConfigOptions;
import org.apache.flink.kubernetes.operator.reconciler.ReconciliationUtils;
+import com.fasterxml.jackson.core.JsonProcessingException;
+import com.fasterxml.jackson.databind.ObjectMapper;
import io.fabric8.kubernetes.api.model.HostAlias;
+import io.fabric8.kubernetes.api.model.ObjectMeta;
+import io.fabric8.kubernetes.api.model.PodTemplateSpec;
import lombok.Value;
import org.junit.jupiter.api.Test;
@@ -66,7 +70,7 @@ public class SpecDiffTest {
assertEquals(0, diff.getNumDiffs());
left = BaseTestUtils.buildApplicationCluster().getSpec();
- left.setPodTemplate(BaseTestUtils.getTestPod("localhost", "v1",
List.of()));
+ left.setPodTemplate(BaseTestUtils.getTestPodTemplate("localhost",
List.of()));
left.setIngress(IngressSpec.builder().template("template").build());
right = SpecUtils.clone(left);
@@ -120,7 +124,7 @@ public class SpecDiffTest {
right.getJobManager().getResource().setMemory("999m");
right.getTaskManager().setReplicas(999);
- right.getPodTemplate().setApiVersion("v2");
+ right.getPodTemplate().setMetadata(new ObjectMeta());
right.getIngress().setTemplate("none");
diff = new ReflectiveDiffBuilder<>(KubernetesDeploymentMode.NATIVE,
left, right).build();
@@ -226,9 +230,9 @@ public class SpecDiffTest {
}
@Test
- public void testPodTemplateChanges() {
+ public void testPodTemplateChanges() throws JsonProcessingException {
var left = BaseTestUtils.buildApplicationCluster().getSpec();
- left.setPodTemplate(BaseTestUtils.getTestPod("localhost1", "v1",
List.of()));
+ left.setPodTemplate(BaseTestUtils.getTestPodTemplate("localhost1",
List.of()));
left.getPodTemplate()
.getSpec()
.getHostAliases()
@@ -238,7 +242,7 @@ public class SpecDiffTest {
ingressSpec.setTemplate("temp");
left.setIngress(ingressSpec);
var right = BaseTestUtils.buildApplicationCluster().getSpec();
- right.setPodTemplate(BaseTestUtils.getTestPod("localhost2", "v2",
List.of()));
+ right.setPodTemplate(BaseTestUtils.getTestPodTemplate("localhost2",
List.of()));
right.getPodTemplate()
.getSpec()
.getHostAliases()
@@ -252,11 +256,34 @@ public class SpecDiffTest {
assertEquals(
"Diff: FlinkDeploymentSpec[image : img1 -> img2, "
+ "ingress : {..} -> null, "
- + "podTemplate.apiVersion : v1 -> v2, "
+ "podTemplate.spec.hostAliases.0.hostnames.1 : host2
-> null, "
+ "podTemplate.spec.hostname : localhost1 ->
localhost2, "
+ "restartNonce : null -> 1]",
diff.toString());
+
+ // Make sure removed fields dont trigger upgrade
+ String oldTemplate =
+ "{\"apiVersion\": \"v1\", \"metadata\": {\"labels\" : {\"l1\":
\"v1\"}}, \"spec\": {\"hostname\": \"h\"}}";
+
+ String newTemplate =
+ "{\"metadata\": {\"labels\" : {\"l1\": \"v1\"}}, \"spec\":
{\"hostname\": \"h\"}}";
+
+ var om = new ObjectMapper();
+ left = BaseTestUtils.buildApplicationCluster().getSpec();
+ left.setPodTemplate(om.readValue(oldTemplate, PodTemplateSpec.class));
+ right = BaseTestUtils.buildApplicationCluster().getSpec();
+ right.setPodTemplate(om.readValue(newTemplate, PodTemplateSpec.class));
+
+ diff = new ReflectiveDiffBuilder<>(KubernetesDeploymentMode.NATIVE,
left, right).build();
+ assertEquals(DiffType.IGNORE, diff.getType());
+
+ left = BaseTestUtils.buildApplicationCluster().getSpec();
+ left.getJobManager().setPodTemplate(om.readValue(oldTemplate,
PodTemplateSpec.class));
+ right = BaseTestUtils.buildApplicationCluster().getSpec();
+ right.getJobManager().setPodTemplate(om.readValue(newTemplate,
PodTemplateSpec.class));
+
+ diff = new ReflectiveDiffBuilder<>(KubernetesDeploymentMode.NATIVE,
left, right).build();
+ assertEquals(DiffType.IGNORE, diff.getType());
}
@Test
diff --git
a/flink-kubernetes-operator/src/test/java/org/apache/flink/kubernetes/operator/utils/FlinkUtilsTest.java
b/flink-kubernetes-operator/src/test/java/org/apache/flink/kubernetes/operator/utils/FlinkUtilsTest.java
index 23e2204e..a4c97f6d 100644
---
a/flink-kubernetes-operator/src/test/java/org/apache/flink/kubernetes/operator/utils/FlinkUtilsTest.java
+++
b/flink-kubernetes-operator/src/test/java/org/apache/flink/kubernetes/operator/utils/FlinkUtilsTest.java
@@ -40,7 +40,7 @@ import io.fabric8.kubernetes.api.model.EphemeralVolumeSource;
import io.fabric8.kubernetes.api.model.HTTPGetAction;
import io.fabric8.kubernetes.api.model.IntOrString;
import io.fabric8.kubernetes.api.model.ObjectMeta;
-import io.fabric8.kubernetes.api.model.Pod;
+import io.fabric8.kubernetes.api.model.PodTemplateSpec;
import io.fabric8.kubernetes.api.model.Probe;
import io.fabric8.kubernetes.api.model.Volume;
import io.fabric8.kubernetes.api.model.apps.Deployment;
@@ -77,21 +77,17 @@ public class FlinkUtilsTest {
Container container2 = new Container();
container2.setName("container2");
- Pod pod1 = TestUtils.getTestPod("pod1 hostname", "pod1 api version",
List.of());
+ var pod1 = TestUtils.getTestPodTemplate("pod1 hostname", List.of());
- Pod pod2 =
- TestUtils.getTestPod(
- "pod2 hostname", "pod2 api version",
List.of(container1, container2));
+ var pod2 = TestUtils.getTestPodTemplate("pod2 hostname",
List.of(container1, container2));
- Pod mergedPod = FlinkUtils.mergePodTemplates(pod1, pod2, false);
-
- assertEquals(pod2.getApiVersion(), mergedPod.getApiVersion());
+ var mergedPod = FlinkUtils.mergePodTemplates(pod1, pod2, false);
assertEquals(pod2.getSpec().getContainers(),
mergedPod.getSpec().getContainers());
}
@Test
public void testAddStartupProbe() {
- Pod pod = new Pod();
+ var pod = new PodTemplateSpec();
FlinkUtils.addStartupProbe(pod);
Probe expectedProbe = new Probe();
@@ -332,24 +328,19 @@ public class FlinkUtilsTest {
Volume volume3 = new Volume();
volume3.setName("v3");
- Pod pod1 =
- TestUtils.getTestPod(
- "pod1 hostname", "pod1 api version",
List.of(container1, container2));
+ var pod1 = TestUtils.getTestPodTemplate("pod1 hostname",
List.of(container1, container2));
pod1.getSpec().setVolumes(List.of(volume1));
- Pod pod2 =
- TestUtils.getTestPod(
- "pod2 hostname", "pod2 api version",
List.of(container1, container3));
+ var pod2 = TestUtils.getTestPodTemplate("pod2 hostname",
List.of(container1, container3));
pod2.getSpec().setVolumes(List.of(volume12, volume2, volume3));
- Pod mergedPod = FlinkUtils.mergePodTemplates(pod1, pod2, true);
+ var mergedPod = FlinkUtils.mergePodTemplates(pod1, pod2, true);
Volume v1merged = new Volume();
v1merged.setName("v1");
v1merged.setEphemeral(new EphemeralVolumeSource());
v1merged.setEmptyDir(new EmptyDirVolumeSource());
- assertEquals(pod2.getApiVersion(), mergedPod.getApiVersion());
assertEquals(
List.of(container1, container2, container3),
mergedPod.getSpec().getContainers());
assertEquals(List.of(v1merged, volume2, volume3),
mergedPod.getSpec().getVolumes());