This is an automated email from the ASF dual-hosted git repository.

gyfora pushed a commit to branch main
in repository https://gitbox.apache.org/repos/asf/flink-kubernetes-operator.git


The following commit(s) were added to refs/heads/main by this push:
     new 24643c8c [FLINK-31220] Replace Pod with PodTemplateSpec for the pod 
template properties
24643c8c is described below

commit 24643c8c6d9d734732ed2cb7e3112c4452675f40
Author: Gyula Fora <[email protected]>
AuthorDate: Tue Feb 6 21:49:21 2024 +0100

    [FLINK-31220] Replace Pod with PodTemplateSpec for the pod template 
properties
---
 docs/content/docs/custom-resource/pod-template.md  |  8 ----
 docs/content/docs/custom-resource/reference.md     |  6 +--
 e2e-tests/data/multi-sessionjob.yaml               |  4 --
 examples/pod-template.yaml                         |  8 ----
 .../operator/api/spec/FlinkDeploymentSpec.java     |  5 +-
 .../operator/api/spec/JobManagerSpec.java          |  5 +-
 .../operator/api/spec/TaskManagerSpec.java         |  5 +-
 .../operator/api/utils/BaseTestUtils.java          | 15 ++++--
 .../operator/config/FlinkConfigBuilder.java        | 17 +++----
 .../reconciler/diff/ReflectiveDiffBuilder.java     | 46 ++++++++++++++++--
 .../kubernetes/operator/utils/FlinkUtils.java      |  9 ++--
 .../operator/config/FlinkConfigBuilderTest.java    | 54 +++++++++++-----------
 .../operator/config/FlinkConfigManagerTest.java    |  4 +-
 .../operator/reconciler/diff/SpecDiffTest.java     | 39 +++++++++++++---
 .../kubernetes/operator/utils/FlinkUtilsTest.java  | 25 ++++------
 15 files changed, 152 insertions(+), 98 deletions(-)

diff --git a/docs/content/docs/custom-resource/pod-template.md 
b/docs/content/docs/custom-resource/pod-template.md
index 6d704c92..bf7097e5 100644
--- a/docs/content/docs/custom-resource/pod-template.md
+++ b/docs/content/docs/custom-resource/pod-template.md
@@ -55,10 +55,6 @@ spec:
     taskmanager.numberOfTaskSlots: "2"
   serviceAccount: flink
   podTemplate:
-    apiVersion: v1
-    kind: Pod
-    metadata:
-      name: pod-template
     spec:
       containers:
         # Do not change the main container name
@@ -85,10 +81,6 @@ spec:
       memory: "2048m"
       cpu: 1
     podTemplate:
-      apiVersion: v1
-      kind: Pod
-      metadata:
-        name: task-manager-pod-template
       spec:
         initContainers:
           # Sample sidecar container
diff --git a/docs/content/docs/custom-resource/reference.md 
b/docs/content/docs/custom-resource/reference.md
index c7810eb2..9db6371e 100644
--- a/docs/content/docs/custom-resource/reference.md
+++ b/docs/content/docs/custom-resource/reference.md
@@ -53,7 +53,7 @@ This page serves as a full reference for FlinkDeployment 
custom resource definit
 | serviceAccount | java.lang.String | Kubernetes service used by the Flink 
deployment. |
 | flinkVersion | org.apache.flink.kubernetes.operator.api.spec.FlinkVersion | 
Flink image version. |
 | ingress | org.apache.flink.kubernetes.operator.api.spec.IngressSpec | 
Ingress specs. |
-| podTemplate | io.fabric8.kubernetes.api.model.Pod | Base pod template for 
job and task manager pods. Can be overridden by the jobManager and taskManager 
pod templates. |
+| podTemplate | io.fabric8.kubernetes.api.model.PodTemplateSpec | Base pod 
template for job and task manager pods. Can be overridden by the jobManager and 
taskManager pod templates. |
 | jobManager | org.apache.flink.kubernetes.operator.api.spec.JobManagerSpec | 
JobManager specs. |
 | taskManager | org.apache.flink.kubernetes.operator.api.spec.TaskManagerSpec 
| TaskManager specs. |
 | logConfiguration | java.util.Map<java.lang.String,java.lang.String> | Log 
configuration overrides for the Flink deployment. Format logConfigFileName -> 
configContent. |
@@ -108,7 +108,7 @@ This page serves as a full reference for FlinkDeployment 
custom resource definit
 | ----------| ---- | ---- |
 | resource | org.apache.flink.kubernetes.operator.api.spec.Resource | Resource 
specification for the JobManager pods. |
 | replicas | int | Number of JobManager replicas. Must be 1 for non-HA 
deployments. |
-| podTemplate | io.fabric8.kubernetes.api.model.Pod | JobManager pod template. 
It will be merged with FlinkDeploymentSpec.podTemplate. |
+| podTemplate | io.fabric8.kubernetes.api.model.PodTemplateSpec | JobManager 
pod template. It will be merged with FlinkDeploymentSpec.podTemplate. |
 
 ### JobSpec
 **Class**: org.apache.flink.kubernetes.operator.api.spec.JobSpec
@@ -169,7 +169,7 @@ This page serves as a full reference for FlinkDeployment 
custom resource definit
 | ----------| ---- | ---- |
 | resource | org.apache.flink.kubernetes.operator.api.spec.Resource | Resource 
specification for the TaskManager pods. |
 | replicas | java.lang.Integer | Number of TaskManager replicas. If defined, 
takes precedence over parallelism |
-| podTemplate | io.fabric8.kubernetes.api.model.Pod | TaskManager pod 
template. It will be merged with FlinkDeploymentSpec.podTemplate. |
+| podTemplate | io.fabric8.kubernetes.api.model.PodTemplateSpec | TaskManager 
pod template. It will be merged with FlinkDeploymentSpec.podTemplate. |
 
 ### UpgradeMode
 **Class**: org.apache.flink.kubernetes.operator.api.spec.UpgradeMode
diff --git a/e2e-tests/data/multi-sessionjob.yaml 
b/e2e-tests/data/multi-sessionjob.yaml
index fd30c838..d10b6c11 100644
--- a/e2e-tests/data/multi-sessionjob.yaml
+++ b/e2e-tests/data/multi-sessionjob.yaml
@@ -85,10 +85,6 @@ spec:
     state.savepoints.dir: file:///opt/flink/volume/flink-sp
   serviceAccount: flink
   podTemplate:
-    apiVersion: v1
-    kind: Pod
-    metadata:
-      name: pod-template
     spec:
       containers:
         # Do not change the main container name
diff --git a/examples/pod-template.yaml b/examples/pod-template.yaml
index 17ec3c8a..bab35d4a 100644
--- a/examples/pod-template.yaml
+++ b/examples/pod-template.yaml
@@ -27,10 +27,6 @@ spec:
     taskmanager.numberOfTaskSlots: "2"
   serviceAccount: flink
   podTemplate:
-    apiVersion: v1
-    kind: Pod
-    metadata:
-      name: pod-template
     spec:
       containers:
         # Do not change the main container name
@@ -57,10 +53,6 @@ spec:
       memory: "2048m"
       cpu: 1
     podTemplate:
-      apiVersion: v1
-      kind: Pod
-      metadata:
-        name: task-manager-pod-template
       spec:
         initContainers:
           # Sample init container for fetching remote artifacts
diff --git 
a/flink-kubernetes-operator-api/src/main/java/org/apache/flink/kubernetes/operator/api/spec/FlinkDeploymentSpec.java
 
b/flink-kubernetes-operator-api/src/main/java/org/apache/flink/kubernetes/operator/api/spec/FlinkDeploymentSpec.java
index 78f49cf8..29e40906 100644
--- 
a/flink-kubernetes-operator-api/src/main/java/org/apache/flink/kubernetes/operator/api/spec/FlinkDeploymentSpec.java
+++ 
b/flink-kubernetes-operator-api/src/main/java/org/apache/flink/kubernetes/operator/api/spec/FlinkDeploymentSpec.java
@@ -20,7 +20,9 @@ package org.apache.flink.kubernetes.operator.api.spec;
 import org.apache.flink.annotation.Experimental;
 
 import com.fasterxml.jackson.annotation.JsonIgnoreProperties;
+import io.fabric8.crd.generator.annotation.SchemaFrom;
 import io.fabric8.kubernetes.api.model.Pod;
+import io.fabric8.kubernetes.api.model.PodTemplateSpec;
 import lombok.AllArgsConstructor;
 import lombok.Data;
 import lombok.EqualsAndHashCode;
@@ -59,7 +61,8 @@ public class FlinkDeploymentSpec extends AbstractFlinkSpec {
      * Base pod template for job and task manager pods. Can be overridden by 
the jobManager and
      * taskManager pod templates.
      */
-    private Pod podTemplate;
+    @SchemaFrom(type = Pod.class)
+    private PodTemplateSpec podTemplate;
 
     /** JobManager specs. */
     private JobManagerSpec jobManager;
diff --git 
a/flink-kubernetes-operator-api/src/main/java/org/apache/flink/kubernetes/operator/api/spec/JobManagerSpec.java
 
b/flink-kubernetes-operator-api/src/main/java/org/apache/flink/kubernetes/operator/api/spec/JobManagerSpec.java
index 522a1e08..658d48fe 100644
--- 
a/flink-kubernetes-operator-api/src/main/java/org/apache/flink/kubernetes/operator/api/spec/JobManagerSpec.java
+++ 
b/flink-kubernetes-operator-api/src/main/java/org/apache/flink/kubernetes/operator/api/spec/JobManagerSpec.java
@@ -20,7 +20,9 @@ package org.apache.flink.kubernetes.operator.api.spec;
 import org.apache.flink.annotation.Experimental;
 
 import com.fasterxml.jackson.annotation.JsonIgnoreProperties;
+import io.fabric8.crd.generator.annotation.SchemaFrom;
 import io.fabric8.kubernetes.api.model.Pod;
+import io.fabric8.kubernetes.api.model.PodTemplateSpec;
 import lombok.AllArgsConstructor;
 import lombok.Builder;
 import lombok.Data;
@@ -41,5 +43,6 @@ public class JobManagerSpec {
     private int replicas = 1;
 
     /** JobManager pod template. It will be merged with 
FlinkDeploymentSpec.podTemplate. */
-    private Pod podTemplate;
+    @SchemaFrom(type = Pod.class)
+    private PodTemplateSpec podTemplate;
 }
diff --git 
a/flink-kubernetes-operator-api/src/main/java/org/apache/flink/kubernetes/operator/api/spec/TaskManagerSpec.java
 
b/flink-kubernetes-operator-api/src/main/java/org/apache/flink/kubernetes/operator/api/spec/TaskManagerSpec.java
index 8758ebad..861303f4 100644
--- 
a/flink-kubernetes-operator-api/src/main/java/org/apache/flink/kubernetes/operator/api/spec/TaskManagerSpec.java
+++ 
b/flink-kubernetes-operator-api/src/main/java/org/apache/flink/kubernetes/operator/api/spec/TaskManagerSpec.java
@@ -23,7 +23,9 @@ import org.apache.flink.kubernetes.operator.api.diff.Diffable;
 import org.apache.flink.kubernetes.operator.api.diff.SpecDiff;
 
 import com.fasterxml.jackson.annotation.JsonIgnoreProperties;
+import io.fabric8.crd.generator.annotation.SchemaFrom;
 import io.fabric8.kubernetes.api.model.Pod;
+import io.fabric8.kubernetes.api.model.PodTemplateSpec;
 import io.fabric8.kubernetes.model.annotation.SpecReplicas;
 import lombok.AllArgsConstructor;
 import lombok.Builder;
@@ -47,5 +49,6 @@ public class TaskManagerSpec implements 
Diffable<TaskManagerSpec> {
     private Integer replicas;
 
     /** TaskManager pod template. It will be merged with 
FlinkDeploymentSpec.podTemplate. */
-    private Pod podTemplate;
+    @SchemaFrom(type = Pod.class)
+    private PodTemplateSpec podTemplate;
 }
diff --git 
a/flink-kubernetes-operator-api/src/test/java/org/apache/flink/kubernetes/operator/api/utils/BaseTestUtils.java
 
b/flink-kubernetes-operator-api/src/test/java/org/apache/flink/kubernetes/operator/api/utils/BaseTestUtils.java
index 89724023..8e887766 100644
--- 
a/flink-kubernetes-operator-api/src/test/java/org/apache/flink/kubernetes/operator/api/utils/BaseTestUtils.java
+++ 
b/flink-kubernetes-operator-api/src/test/java/org/apache/flink/kubernetes/operator/api/utils/BaseTestUtils.java
@@ -39,6 +39,7 @@ import io.fabric8.kubernetes.api.model.Container;
 import io.fabric8.kubernetes.api.model.ObjectMetaBuilder;
 import io.fabric8.kubernetes.api.model.Pod;
 import io.fabric8.kubernetes.api.model.PodSpec;
+import io.fabric8.kubernetes.api.model.PodTemplateSpec;
 
 import java.time.Instant;
 import java.util.HashMap;
@@ -185,13 +186,21 @@ public class BaseTestUtils {
                 .build();
     }
 
-    public static Pod getTestPod(String hostname, String apiVersion, 
List<Container> containers) {
+    public static PodTemplateSpec getTestPodTemplate(String hostname, 
List<Container> containers) {
         final PodSpec podSpec = new PodSpec();
         podSpec.setHostname(hostname);
         podSpec.setContainers(containers);
-        final Pod pod = new Pod();
-        pod.setApiVersion(apiVersion);
+        var pod = new PodTemplateSpec();
         pod.setSpec(podSpec);
         return pod;
     }
+
+    public static Pod getTestPod(String hostname, String apiVersion, 
List<Container> containers) {
+        var pod = new Pod();
+        var podTemplate = getTestPodTemplate(hostname, containers);
+        pod.setApiVersion(apiVersion);
+        pod.setSpec(podTemplate.getSpec());
+        pod.setMetadata(podTemplate.getMetadata());
+        return pod;
+    }
 }
diff --git 
a/flink-kubernetes-operator/src/main/java/org/apache/flink/kubernetes/operator/config/FlinkConfigBuilder.java
 
b/flink-kubernetes-operator/src/main/java/org/apache/flink/kubernetes/operator/config/FlinkConfigBuilder.java
index 6caa46a3..339dfaa3 100644
--- 
a/flink-kubernetes-operator/src/main/java/org/apache/flink/kubernetes/operator/config/FlinkConfigBuilder.java
+++ 
b/flink-kubernetes-operator/src/main/java/org/apache/flink/kubernetes/operator/config/FlinkConfigBuilder.java
@@ -52,8 +52,8 @@ import org.apache.flink.util.FileUtils;
 import org.apache.flink.util.StringUtils;
 
 import io.fabric8.kubernetes.api.model.Container;
-import io.fabric8.kubernetes.api.model.Pod;
 import io.fabric8.kubernetes.api.model.PodSpec;
+import io.fabric8.kubernetes.api.model.PodTemplateSpec;
 import io.fabric8.kubernetes.api.model.Quantity;
 import io.fabric8.kubernetes.api.model.ResourceRequirements;
 import io.fabric8.kubernetes.client.utils.Serialization;
@@ -189,11 +189,11 @@ public class FlinkConfigBuilder {
     }
 
     protected FlinkConfigBuilder applyPodTemplate() throws IOException {
-        Pod commonPodTemplate = spec.getPodTemplate();
+        PodTemplateSpec commonPodTemplate = spec.getPodTemplate();
         boolean mergeByName =
                 
effectiveConfig.get(KubernetesOperatorConfigOptions.POD_TEMPLATE_MERGE_BY_NAME);
 
-        Pod jmPodTemplate;
+        PodTemplateSpec jmPodTemplate;
         if (spec.getJobManager() != null) {
             jmPodTemplate =
                     mergePodTemplates(
@@ -208,7 +208,7 @@ public class FlinkConfigBuilder {
         if (effectiveConfig.get(
                 
KubernetesOperatorConfigOptions.OPERATOR_JM_STARTUP_PROBE_ENABLED)) {
             if (jmPodTemplate == null) {
-                jmPodTemplate = new Pod();
+                jmPodTemplate = new PodTemplateSpec();
             }
             FlinkUtils.addStartupProbe(jmPodTemplate);
         }
@@ -219,7 +219,7 @@ public class FlinkConfigBuilder {
             
effectiveConfig.set(KubernetesConfigOptions.JOB_MANAGER_POD_TEMPLATE, 
jmTemplateFile);
         }
 
-        Pod tmPodTemplate;
+        PodTemplateSpec tmPodTemplate;
         if (spec.getTaskManager() != null) {
             tmPodTemplate =
                     mergePodTemplates(
@@ -488,14 +488,15 @@ public class FlinkConfigBuilder {
     }
 
     @VisibleForTesting
-    protected static Pod applyResourceToPodTemplate(Pod podTemplate, Resource 
resource) {
+    protected static PodTemplateSpec applyResourceToPodTemplate(
+            PodTemplateSpec podTemplate, Resource resource) {
         if (resource == null
                 || 
StringUtils.isNullOrWhitespaceOnly(resource.getEphemeralStorage())) {
             return podTemplate;
         }
 
         if (podTemplate == null) {
-            Pod newPodTemplate = new Pod();
+            var newPodTemplate = new PodTemplateSpec();
             newPodTemplate.setSpec(createPodSpecWithResource(resource));
             return newPodTemplate;
         } else if (podTemplate.getSpec() == null) {
@@ -568,7 +569,7 @@ public class FlinkConfigBuilder {
         return tmpDir.getAbsolutePath();
     }
 
-    private static String createTempFile(Pod podTemplate) throws IOException {
+    private static String createTempFile(PodTemplateSpec podTemplate) throws 
IOException {
         final File tmp = File.createTempFile(GENERATED_FILE_PREFIX + 
"podTemplate_", ".yaml");
         Files.write(tmp.toPath(), 
Serialization.asYaml(podTemplate).getBytes());
         tmp.deleteOnExit();
diff --git 
a/flink-kubernetes-operator/src/main/java/org/apache/flink/kubernetes/operator/reconciler/diff/ReflectiveDiffBuilder.java
 
b/flink-kubernetes-operator/src/main/java/org/apache/flink/kubernetes/operator/reconciler/diff/ReflectiveDiffBuilder.java
index 2a1fed4e..5ab52d7f 100644
--- 
a/flink-kubernetes-operator/src/main/java/org/apache/flink/kubernetes/operator/reconciler/diff/ReflectiveDiffBuilder.java
+++ 
b/flink-kubernetes-operator/src/main/java/org/apache/flink/kubernetes/operator/reconciler/diff/ReflectiveDiffBuilder.java
@@ -21,8 +21,10 @@ import org.apache.flink.annotation.Experimental;
 import org.apache.flink.kubernetes.operator.api.diff.DiffType;
 import org.apache.flink.kubernetes.operator.api.diff.Diffable;
 import org.apache.flink.kubernetes.operator.api.diff.SpecDiff;
+import org.apache.flink.kubernetes.operator.api.spec.FlinkDeploymentSpec;
 import org.apache.flink.kubernetes.operator.api.spec.KubernetesDeploymentMode;
 
+import io.fabric8.kubernetes.api.model.PodTemplateSpec;
 import lombok.NonNull;
 import org.apache.commons.lang3.ClassUtils;
 import org.apache.commons.lang3.ObjectUtils;
@@ -61,6 +63,8 @@ public class ReflectiveDiffBuilder<T> implements 
Builder<DiffResult<T>> {
         this.before = before;
         this.after = after;
         diffBuilder = new DiffBuilder<>(before, after);
+        clearIgnoredFields(before);
+        clearIgnoredFields(after);
     }
 
     @Override
@@ -79,6 +83,7 @@ public class ReflectiveDiffBuilder<T> implements 
Builder<DiffResult<T>> {
                 try {
                     var leftField = readField(field, before, true);
                     var rightField = readField(field, after, true);
+
                     if (field.isAnnotationPresent(SpecDiff.Config.class)
                             && Map.class.isAssignableFrom(field.getType())) {
                         diffBuilder.append(
@@ -112,11 +117,7 @@ public class ReflectiveDiffBuilder<T> implements 
Builder<DiffResult<T>> {
                                         .build());
 
                     } else {
-                        diffBuilder.append(
-                                field.getName(),
-                                readField(field, before, true),
-                                readField(field, after, true),
-                                UPGRADE);
+                        diffBuilder.append(field.getName(), leftField, 
rightField, UPGRADE);
                     }
 
                 } catch (final IllegalAccessException ex) {
@@ -172,4 +173,39 @@ public class ReflectiveDiffBuilder<T> implements 
Builder<DiffResult<T>> {
         }
         return diffType;
     }
+
+    /**
+     * This method is responsible for clearing / nulling out deprecated fields 
that should be
+     * ignored during spec diff comparison. These fields may still be present 
in the
+     * lastReconciledSpec.
+     *
+     * @param o Object to be cleaned.
+     */
+    private static void clearIgnoredFields(Object o) {
+        if (o == null) {
+            return;
+        }
+        if (o instanceof FlinkDeploymentSpec) {
+            var spec = (FlinkDeploymentSpec) o;
+            clearPodTemplateAdditionalProps(spec.getPodTemplate());
+            if (spec.getJobManager() != null) {
+                
clearPodTemplateAdditionalProps(spec.getJobManager().getPodTemplate());
+            }
+            if (spec.getTaskManager() != null) {
+                
clearPodTemplateAdditionalProps(spec.getTaskManager().getPodTemplate());
+            }
+        }
+    }
+
+    /**
+     * Remove additional props from deserialized PodTemplateSpec which could 
still be there when we
+     * moved Pod -> PodTemplateSpec.
+     *
+     * @param o Object to be cleaned.
+     */
+    private static void clearPodTemplateAdditionalProps(Object o) {
+        if (o != null && o instanceof PodTemplateSpec) {
+            ((PodTemplateSpec) o).setAdditionalProperties(null);
+        }
+    }
 }
diff --git 
a/flink-kubernetes-operator/src/main/java/org/apache/flink/kubernetes/operator/utils/FlinkUtils.java
 
b/flink-kubernetes-operator/src/main/java/org/apache/flink/kubernetes/operator/utils/FlinkUtils.java
index cd12f056..10288c70 100644
--- 
a/flink-kubernetes-operator/src/main/java/org/apache/flink/kubernetes/operator/utils/FlinkUtils.java
+++ 
b/flink-kubernetes-operator/src/main/java/org/apache/flink/kubernetes/operator/utils/FlinkUtils.java
@@ -41,8 +41,8 @@ import io.fabric8.kubernetes.api.model.ConfigMapList;
 import io.fabric8.kubernetes.api.model.Container;
 import io.fabric8.kubernetes.api.model.HTTPGetAction;
 import io.fabric8.kubernetes.api.model.IntOrString;
-import io.fabric8.kubernetes.api.model.Pod;
 import io.fabric8.kubernetes.api.model.PodSpec;
+import io.fabric8.kubernetes.api.model.PodTemplateSpec;
 import io.fabric8.kubernetes.api.model.Probe;
 import io.fabric8.kubernetes.api.model.apps.Deployment;
 import io.fabric8.kubernetes.api.model.apps.DeploymentCondition;
@@ -69,7 +69,8 @@ public class FlinkUtils {
 
     public static final String CR_GENERATION_LABEL = 
"flinkdeployment.flink.apache.org/generation";
 
-    public static Pod mergePodTemplates(Pod toPod, Pod fromPod, boolean 
mergeArraysByName) {
+    public static PodTemplateSpec mergePodTemplates(
+            PodTemplateSpec toPod, PodTemplateSpec fromPod, boolean 
mergeArraysByName) {
         if (fromPod == null) {
             return ReconciliationUtils.clone(toPod);
         } else if (toPod == null) {
@@ -79,7 +80,7 @@ public class FlinkUtils {
         JsonNode node2 = MAPPER.valueToTree(fromPod);
         mergeInto(node1, node2, mergeArraysByName);
         try {
-            return MAPPER.treeToValue(node1, Pod.class);
+            return MAPPER.treeToValue(node1, PodTemplateSpec.class);
         } catch (Exception ex) {
             throw new RuntimeException(ex);
         }
@@ -152,7 +153,7 @@ public class FlinkUtils {
         return out;
     }
 
-    public static void addStartupProbe(Pod pod) {
+    public static void addStartupProbe(PodTemplateSpec pod) {
         var spec = pod.getSpec();
         if (spec == null) {
             spec = new PodSpec();
diff --git 
a/flink-kubernetes-operator/src/test/java/org/apache/flink/kubernetes/operator/config/FlinkConfigBuilderTest.java
 
b/flink-kubernetes-operator/src/test/java/org/apache/flink/kubernetes/operator/config/FlinkConfigBuilderTest.java
index 4c905c3f..04a7c842 100644
--- 
a/flink-kubernetes-operator/src/test/java/org/apache/flink/kubernetes/operator/config/FlinkConfigBuilderTest.java
+++ 
b/flink-kubernetes-operator/src/test/java/org/apache/flink/kubernetes/operator/config/FlinkConfigBuilderTest.java
@@ -52,7 +52,7 @@ import com.fasterxml.jackson.databind.ObjectMapper;
 import com.fasterxml.jackson.dataformat.yaml.YAMLFactory;
 import io.fabric8.kubernetes.api.model.Container;
 import io.fabric8.kubernetes.api.model.ObjectMeta;
-import io.fabric8.kubernetes.api.model.Pod;
+import io.fabric8.kubernetes.api.model.PodTemplateSpec;
 import org.junit.jupiter.api.Assertions;
 import org.junit.jupiter.api.BeforeEach;
 import org.junit.jupiter.api.Test;
@@ -209,7 +209,9 @@ public class FlinkConfigBuilderTest {
         mainContainer.setName(Constants.MAIN_CONTAINER_NAME);
         mainContainer.setImage("test");
 
-        flinkDeployment.getSpec().setPodTemplate(TestUtils.getTestPod("", "", 
List.of(container0)));
+        flinkDeployment
+                .getSpec()
+                .setPodTemplate(TestUtils.getTestPodTemplate("", 
List.of(container0)));
 
         var inConfig = new Configuration();
         
inConfig.set(KubernetesOperatorConfigOptions.OPERATOR_JM_STARTUP_PROBE_ENABLED, 
false);
@@ -227,18 +229,22 @@ public class FlinkConfigBuilderTest {
                 List.of(container0), 
getJmPod(configuration).getSpec().getContainers());
 
         
flinkDeployment.getSpec().getJobManager().getResource().setEphemeralStorage("2G");
-        
flinkDeployment.getSpec().getTaskManager().getResource().setEphemeralStorage("2G");
+        
flinkDeployment.getSpec().getTaskManager().getResource().setEphemeralStorage("3G");
 
         configuration =
                 new FlinkConfigBuilder(flinkDeployment, inConfig.clone())
                         .applyPodTemplate()
                         .build();
+
+        Assertions.assertNotEquals(
+                
configuration.getString(KubernetesConfigOptions.JOB_MANAGER_POD_TEMPLATE),
+                
configuration.getString(KubernetesConfigOptions.TASK_MANAGER_POD_TEMPLATE));
         assertMainContainerEphemeralStorage(
                 getJmPod(configuration).getSpec().getContainers().get(1), 
"2G");
         Assertions.assertEquals(
                 container0, 
getJmPod(configuration).getSpec().getContainers().get(0));
         assertMainContainerEphemeralStorage(
-                getTmPod(configuration).getSpec().getContainers().get(1), 
"2G");
+                getTmPod(configuration).getSpec().getContainers().get(1), 
"3G");
         Assertions.assertEquals(
                 container0, 
getTmPod(configuration).getSpec().getContainers().get(0));
 
@@ -283,11 +289,12 @@ public class FlinkConfigBuilderTest {
         flinkDeployment
                 .getSpec()
                 .getJobManager()
-                .setPodTemplate(TestUtils.getTestPod("", "", 
List.of(mainContainer, container0)));
+                .setPodTemplate(
+                        TestUtils.getTestPodTemplate("", 
List.of(mainContainer, container0)));
         flinkDeployment
                 .getSpec()
                 .getTaskManager()
-                .setPodTemplate(TestUtils.getTestPod("", "", 
List.of(container1)));
+                .setPodTemplate(TestUtils.getTestPodTemplate("", 
List.of(container1)));
 
         configuration =
                 new FlinkConfigBuilder(flinkDeployment, inConfig.clone())
@@ -308,7 +315,7 @@ public class FlinkConfigBuilderTest {
         Assertions.assertEquals(List.of(container1), 
tmPod.getSpec().getContainers());
 
         // Override common
-        var common = TestUtils.getTestPod("", "", Collections.emptyList());
+        var common = TestUtils.getTestPodTemplate("", Collections.emptyList());
         common.getSpec().setDnsPolicy("test");
         flinkDeployment.getSpec().setPodTemplate(common);
 
@@ -361,17 +368,17 @@ public class FlinkConfigBuilderTest {
                 
configuration.contains(KubernetesConfigOptions.TASK_MANAGER_POD_TEMPLATE));
     }
 
-    private Pod getJmPod(Configuration configuration) throws IOException {
+    private PodTemplateSpec getJmPod(Configuration configuration) throws 
IOException {
         return OBJECT_MAPPER.readValue(
                 new 
File(configuration.getString(KubernetesConfigOptions.JOB_MANAGER_POD_TEMPLATE)),
-                Pod.class);
+                PodTemplateSpec.class);
     }
 
-    private Pod getTmPod(Configuration configuration) throws IOException {
+    private PodTemplateSpec getTmPod(Configuration configuration) throws 
IOException {
         return OBJECT_MAPPER.readValue(
                 new File(
                         
configuration.getString(KubernetesConfigOptions.TASK_MANAGER_POD_TEMPLATE)),
-                Pod.class);
+                PodTemplateSpec.class);
     }
 
     @Test
@@ -402,7 +409,7 @@ public class FlinkConfigBuilderTest {
 
         // We must use deprecated configs for 1.16 and before
         deploymentClone.getSpec().setFlinkVersion(FlinkVersion.v1_16);
-        deploymentClone.getSpec().setPodTemplate(new Pod());
+        deploymentClone.getSpec().setPodTemplate(new PodTemplateSpec());
         Configuration configuration =
                 new FlinkConfigBuilder(deploymentClone, new Configuration())
                         .applyJobManagerSpec()
@@ -447,16 +454,13 @@ public class FlinkConfigBuilderTest {
         flinkDeployment
                 .getSpec()
                 .getJobManager()
-                .setPodTemplate(
-                        TestUtils.getTestPod(
-                                "pod1 hostname", "pod1 api version", new 
ArrayList<>()));
+                .setPodTemplate(TestUtils.getTestPodTemplate("pod1 hostname", 
new ArrayList<>()));
         configuration =
                 new FlinkConfigBuilder(flinkDeployment, new Configuration())
                         .applyPodTemplate()
                         .build();
 
-        Pod jmPod = getJmPod(configuration);
-        assertEquals("pod1 api version", jmPod.getApiVersion());
+        var jmPod = getJmPod(configuration);
         
assertMainContainerEphemeralStorage(jmPod.getSpec().getContainers().get(0), 
"2G");
     }
 
@@ -718,16 +722,13 @@ public class FlinkConfigBuilderTest {
         flinkDeployment
                 .getSpec()
                 .getTaskManager()
-                .setPodTemplate(
-                        TestUtils.getTestPod(
-                                "pod2 hostname", "pod2 api version", new 
ArrayList<>()));
+                .setPodTemplate(TestUtils.getTestPodTemplate("pod2 hostname", 
new ArrayList<>()));
         configuration =
                 new FlinkConfigBuilder(flinkDeployment, new Configuration())
                         .applyPodTemplate()
                         .build();
 
         var tmPod = getTmPod(configuration);
-        assertEquals("pod2 api version", tmPod.getApiVersion());
         
assertMainContainerEphemeralStorage(tmPod.getSpec().getContainers().get(0), 
"2G");
     }
 
@@ -916,11 +917,11 @@ public class FlinkConfigBuilderTest {
     public void testApplyResourceToPodTemplate() {
         Resource resource = 
flinkDeployment.getSpec().getTaskManager().getResource();
 
-        Pod pod = FlinkConfigBuilder.applyResourceToPodTemplate(null, 
resource);
+        var pod = FlinkConfigBuilder.applyResourceToPodTemplate(null, 
resource);
         assertEquals(Constants.MAIN_CONTAINER_NAME, 
pod.getSpec().getContainers().get(0).getName());
         
assertMainContainerEphemeralStorage(pod.getSpec().getContainers().get(0), "2G");
 
-        Pod podWithMetadata = new Pod();
+        var podWithMetadata = new PodTemplateSpec();
         ObjectMeta metaData = new ObjectMeta();
         podWithMetadata.setMetadata(metaData);
         pod = FlinkConfigBuilder.applyResourceToPodTemplate(podWithMetadata, 
resource);
@@ -947,14 +948,13 @@ public class FlinkConfigBuilderTest {
                         .toString());
     }
 
-    private Pod createTestPodWithContainers() {
+    private PodTemplateSpec createTestPodWithContainers() {
         Container mainContainer = new Container();
         mainContainer.setName(Constants.MAIN_CONTAINER_NAME);
         Container sideCarContainer = new Container();
         sideCarContainer.setName("sidecar");
-        Pod pod =
-                TestUtils.getTestPod(
-                        "hostname", "api version", List.of(mainContainer, 
sideCarContainer));
+        var pod =
+                TestUtils.getTestPodTemplate("hostname", 
List.of(mainContainer, sideCarContainer));
         return pod;
     }
 }
diff --git 
a/flink-kubernetes-operator/src/test/java/org/apache/flink/kubernetes/operator/config/FlinkConfigManagerTest.java
 
b/flink-kubernetes-operator/src/test/java/org/apache/flink/kubernetes/operator/config/FlinkConfigManagerTest.java
index 3ec8fa40..c8e575cf 100644
--- 
a/flink-kubernetes-operator/src/test/java/org/apache/flink/kubernetes/operator/config/FlinkConfigManagerTest.java
+++ 
b/flink-kubernetes-operator/src/test/java/org/apache/flink/kubernetes/operator/config/FlinkConfigManagerTest.java
@@ -33,7 +33,7 @@ import org.apache.flink.kubernetes.operator.utils.EnvUtils;
 import org.apache.flink.kubernetes.operator.utils.FlinkUtils;
 import org.apache.flink.kubernetes.utils.Constants;
 
-import io.fabric8.kubernetes.api.model.Pod;
+import io.fabric8.kubernetes.api.model.PodTemplateSpec;
 import org.junit.jupiter.api.Assertions;
 import org.junit.jupiter.api.Test;
 import org.junit.jupiter.api.io.TempDir;
@@ -135,7 +135,7 @@ public class FlinkConfigManagerTest {
 
         FlinkDeployment deployment = TestUtils.buildApplicationCluster();
         
deployment.getSpec().setLogConfiguration(Map.of(Constants.CONFIG_FILE_LOG4J_NAME,
 "test"));
-        deployment.getSpec().setPodTemplate(new Pod());
+        deployment.getSpec().setPodTemplate(new PodTemplateSpec());
 
         ReconciliationUtils.updateStatusForDeployedSpec(deployment, config);
         Configuration deployConfig = 
configManager.getObserveConfig(deployment);
diff --git 
a/flink-kubernetes-operator/src/test/java/org/apache/flink/kubernetes/operator/reconciler/diff/SpecDiffTest.java
 
b/flink-kubernetes-operator/src/test/java/org/apache/flink/kubernetes/operator/reconciler/diff/SpecDiffTest.java
index b0aa7c6f..07bc421b 100644
--- 
a/flink-kubernetes-operator/src/test/java/org/apache/flink/kubernetes/operator/reconciler/diff/SpecDiffTest.java
+++ 
b/flink-kubernetes-operator/src/test/java/org/apache/flink/kubernetes/operator/reconciler/diff/SpecDiffTest.java
@@ -34,7 +34,11 @@ import 
org.apache.flink.kubernetes.operator.api.utils.SpecUtils;
 import 
org.apache.flink.kubernetes.operator.config.KubernetesOperatorConfigOptions;
 import org.apache.flink.kubernetes.operator.reconciler.ReconciliationUtils;
 
+import com.fasterxml.jackson.core.JsonProcessingException;
+import com.fasterxml.jackson.databind.ObjectMapper;
 import io.fabric8.kubernetes.api.model.HostAlias;
+import io.fabric8.kubernetes.api.model.ObjectMeta;
+import io.fabric8.kubernetes.api.model.PodTemplateSpec;
 import lombok.Value;
 import org.junit.jupiter.api.Test;
 
@@ -66,7 +70,7 @@ public class SpecDiffTest {
         assertEquals(0, diff.getNumDiffs());
 
         left = BaseTestUtils.buildApplicationCluster().getSpec();
-        left.setPodTemplate(BaseTestUtils.getTestPod("localhost", "v1", 
List.of()));
+        left.setPodTemplate(BaseTestUtils.getTestPodTemplate("localhost", 
List.of()));
         left.setIngress(IngressSpec.builder().template("template").build());
 
         right = SpecUtils.clone(left);
@@ -120,7 +124,7 @@ public class SpecDiffTest {
 
         right.getJobManager().getResource().setMemory("999m");
         right.getTaskManager().setReplicas(999);
-        right.getPodTemplate().setApiVersion("v2");
+        right.getPodTemplate().setMetadata(new ObjectMeta());
         right.getIngress().setTemplate("none");
 
         diff = new ReflectiveDiffBuilder<>(KubernetesDeploymentMode.NATIVE, 
left, right).build();
@@ -226,9 +230,9 @@ public class SpecDiffTest {
     }
 
     @Test
-    public void testPodTemplateChanges() {
+    public void testPodTemplateChanges() throws JsonProcessingException {
         var left = BaseTestUtils.buildApplicationCluster().getSpec();
-        left.setPodTemplate(BaseTestUtils.getTestPod("localhost1", "v1", 
List.of()));
+        left.setPodTemplate(BaseTestUtils.getTestPodTemplate("localhost1", 
List.of()));
         left.getPodTemplate()
                 .getSpec()
                 .getHostAliases()
@@ -238,7 +242,7 @@ public class SpecDiffTest {
         ingressSpec.setTemplate("temp");
         left.setIngress(ingressSpec);
         var right = BaseTestUtils.buildApplicationCluster().getSpec();
-        right.setPodTemplate(BaseTestUtils.getTestPod("localhost2", "v2", 
List.of()));
+        right.setPodTemplate(BaseTestUtils.getTestPodTemplate("localhost2", 
List.of()));
         right.getPodTemplate()
                 .getSpec()
                 .getHostAliases()
@@ -252,11 +256,34 @@ public class SpecDiffTest {
         assertEquals(
                 "Diff: FlinkDeploymentSpec[image : img1 -> img2, "
                         + "ingress : {..} -> null, "
-                        + "podTemplate.apiVersion : v1 -> v2, "
                         + "podTemplate.spec.hostAliases.0.hostnames.1 : host2 
-> null, "
                         + "podTemplate.spec.hostname : localhost1 -> 
localhost2, "
                         + "restartNonce : null -> 1]",
                 diff.toString());
+
+        // Make sure removed fields dont trigger upgrade
+        String oldTemplate =
+                "{\"apiVersion\": \"v1\", \"metadata\": {\"labels\" : {\"l1\": 
\"v1\"}}, \"spec\": {\"hostname\": \"h\"}}";
+
+        String newTemplate =
+                "{\"metadata\": {\"labels\" : {\"l1\": \"v1\"}}, \"spec\": 
{\"hostname\": \"h\"}}";
+
+        var om = new ObjectMapper();
+        left = BaseTestUtils.buildApplicationCluster().getSpec();
+        left.setPodTemplate(om.readValue(oldTemplate, PodTemplateSpec.class));
+        right = BaseTestUtils.buildApplicationCluster().getSpec();
+        right.setPodTemplate(om.readValue(newTemplate, PodTemplateSpec.class));
+
+        diff = new ReflectiveDiffBuilder<>(KubernetesDeploymentMode.NATIVE, 
left, right).build();
+        assertEquals(DiffType.IGNORE, diff.getType());
+
+        left = BaseTestUtils.buildApplicationCluster().getSpec();
+        left.getJobManager().setPodTemplate(om.readValue(oldTemplate, 
PodTemplateSpec.class));
+        right = BaseTestUtils.buildApplicationCluster().getSpec();
+        right.getJobManager().setPodTemplate(om.readValue(newTemplate, 
PodTemplateSpec.class));
+
+        diff = new ReflectiveDiffBuilder<>(KubernetesDeploymentMode.NATIVE, 
left, right).build();
+        assertEquals(DiffType.IGNORE, diff.getType());
     }
 
     @Test
diff --git 
a/flink-kubernetes-operator/src/test/java/org/apache/flink/kubernetes/operator/utils/FlinkUtilsTest.java
 
b/flink-kubernetes-operator/src/test/java/org/apache/flink/kubernetes/operator/utils/FlinkUtilsTest.java
index 23e2204e..a4c97f6d 100644
--- 
a/flink-kubernetes-operator/src/test/java/org/apache/flink/kubernetes/operator/utils/FlinkUtilsTest.java
+++ 
b/flink-kubernetes-operator/src/test/java/org/apache/flink/kubernetes/operator/utils/FlinkUtilsTest.java
@@ -40,7 +40,7 @@ import io.fabric8.kubernetes.api.model.EphemeralVolumeSource;
 import io.fabric8.kubernetes.api.model.HTTPGetAction;
 import io.fabric8.kubernetes.api.model.IntOrString;
 import io.fabric8.kubernetes.api.model.ObjectMeta;
-import io.fabric8.kubernetes.api.model.Pod;
+import io.fabric8.kubernetes.api.model.PodTemplateSpec;
 import io.fabric8.kubernetes.api.model.Probe;
 import io.fabric8.kubernetes.api.model.Volume;
 import io.fabric8.kubernetes.api.model.apps.Deployment;
@@ -77,21 +77,17 @@ public class FlinkUtilsTest {
         Container container2 = new Container();
         container2.setName("container2");
 
-        Pod pod1 = TestUtils.getTestPod("pod1 hostname", "pod1 api version", 
List.of());
+        var pod1 = TestUtils.getTestPodTemplate("pod1 hostname", List.of());
 
-        Pod pod2 =
-                TestUtils.getTestPod(
-                        "pod2 hostname", "pod2 api version", 
List.of(container1, container2));
+        var pod2 = TestUtils.getTestPodTemplate("pod2 hostname", 
List.of(container1, container2));
 
-        Pod mergedPod = FlinkUtils.mergePodTemplates(pod1, pod2, false);
-
-        assertEquals(pod2.getApiVersion(), mergedPod.getApiVersion());
+        var mergedPod = FlinkUtils.mergePodTemplates(pod1, pod2, false);
         assertEquals(pod2.getSpec().getContainers(), 
mergedPod.getSpec().getContainers());
     }
 
     @Test
     public void testAddStartupProbe() {
-        Pod pod = new Pod();
+        var pod = new PodTemplateSpec();
         FlinkUtils.addStartupProbe(pod);
 
         Probe expectedProbe = new Probe();
@@ -332,24 +328,19 @@ public class FlinkUtilsTest {
         Volume volume3 = new Volume();
         volume3.setName("v3");
 
-        Pod pod1 =
-                TestUtils.getTestPod(
-                        "pod1 hostname", "pod1 api version", 
List.of(container1, container2));
+        var pod1 = TestUtils.getTestPodTemplate("pod1 hostname", 
List.of(container1, container2));
         pod1.getSpec().setVolumes(List.of(volume1));
 
-        Pod pod2 =
-                TestUtils.getTestPod(
-                        "pod2 hostname", "pod2 api version", 
List.of(container1, container3));
+        var pod2 = TestUtils.getTestPodTemplate("pod2 hostname", 
List.of(container1, container3));
         pod2.getSpec().setVolumes(List.of(volume12, volume2, volume3));
 
-        Pod mergedPod = FlinkUtils.mergePodTemplates(pod1, pod2, true);
+        var mergedPod = FlinkUtils.mergePodTemplates(pod1, pod2, true);
 
         Volume v1merged = new Volume();
         v1merged.setName("v1");
         v1merged.setEphemeral(new EphemeralVolumeSource());
         v1merged.setEmptyDir(new EmptyDirVolumeSource());
 
-        assertEquals(pod2.getApiVersion(), mergedPod.getApiVersion());
         assertEquals(
                 List.of(container1, container2, container3), 
mergedPod.getSpec().getContainers());
         assertEquals(List.of(v1merged, volume2, volume3), 
mergedPod.getSpec().getVolumes());


Reply via email to