This is an automated email from the ASF dual-hosted git repository.
gyfora pushed a commit to branch main
in repository https://gitbox.apache.org/repos/asf/flink-kubernetes-operator.git
The following commit(s) were added to refs/heads/main by this push:
new 956dec07 [FLINK-31794] Ignore unknown CRD fields
956dec07 is described below
commit 956dec079e6fcfa63ea409b4c076ee88b28f22dc
Author: darenwkt <[email protected]>
AuthorDate: Wed Apr 19 13:29:01 2023 +0100
[FLINK-31794] Ignore unknown CRD fields
---
.../operator/api/spec/FlinkDeploymentSpec.java | 2 +
.../operator/api/spec/FlinkSessionJobSpec.java | 2 +
.../kubernetes/operator/api/spec/IngressSpec.java | 2 +
.../operator/api/spec/JobManagerSpec.java | 2 +
.../kubernetes/operator/api/spec/JobSpec.java | 2 +
.../kubernetes/operator/api/spec/Resource.java | 2 +
.../operator/api/spec/TaskManagerSpec.java | 2 +
.../FlinkDeploymentReconciliationStatus.java | 2 +
.../operator/api/status/FlinkDeploymentStatus.java | 2 +
.../FlinkSessionJobReconciliationStatus.java | 2 +
.../operator/api/status/FlinkSessionJobStatus.java | 2 +
.../kubernetes/operator/api/status/JobStatus.java | 2 +
.../operator/api/status/TaskManagerInfo.java | 2 +
.../validation/CrdCompatibilityCheckerTest.java | 31 +++++++++++++++
.../test-deployment-with-unknown-fields.yaml | 46 ++++++++++++++++++++++
.../src/test/resources/test-deployment.yaml | 40 +++++++++++++++++++
.../test-session-job-with-unknown-fields.yaml | 30 ++++++++++++++
.../src/test/resources/test-session-job.yaml | 27 +++++++++++++
18 files changed, 200 insertions(+)
diff --git
a/flink-kubernetes-operator-api/src/main/java/org/apache/flink/kubernetes/operator/api/spec/FlinkDeploymentSpec.java
b/flink-kubernetes-operator-api/src/main/java/org/apache/flink/kubernetes/operator/api/spec/FlinkDeploymentSpec.java
index 423d9baf..78f49cf8 100644
---
a/flink-kubernetes-operator-api/src/main/java/org/apache/flink/kubernetes/operator/api/spec/FlinkDeploymentSpec.java
+++
b/flink-kubernetes-operator-api/src/main/java/org/apache/flink/kubernetes/operator/api/spec/FlinkDeploymentSpec.java
@@ -19,6 +19,7 @@ package org.apache.flink.kubernetes.operator.api.spec;
import org.apache.flink.annotation.Experimental;
+import com.fasterxml.jackson.annotation.JsonIgnoreProperties;
import io.fabric8.kubernetes.api.model.Pod;
import lombok.AllArgsConstructor;
import lombok.Data;
@@ -37,6 +38,7 @@ import java.util.Map;
@EqualsAndHashCode(callSuper = true)
@ToString(callSuper = true)
@SuperBuilder
+@JsonIgnoreProperties(ignoreUnknown = true)
public class FlinkDeploymentSpec extends AbstractFlinkSpec {
/** Flink docker image used to start the Job and TaskManager pods. */
private String image;
diff --git
a/flink-kubernetes-operator-api/src/main/java/org/apache/flink/kubernetes/operator/api/spec/FlinkSessionJobSpec.java
b/flink-kubernetes-operator-api/src/main/java/org/apache/flink/kubernetes/operator/api/spec/FlinkSessionJobSpec.java
index a402b9bf..baea7752 100644
---
a/flink-kubernetes-operator-api/src/main/java/org/apache/flink/kubernetes/operator/api/spec/FlinkSessionJobSpec.java
+++
b/flink-kubernetes-operator-api/src/main/java/org/apache/flink/kubernetes/operator/api/spec/FlinkSessionJobSpec.java
@@ -19,6 +19,7 @@ package org.apache.flink.kubernetes.operator.api.spec;
import org.apache.flink.annotation.Experimental;
+import com.fasterxml.jackson.annotation.JsonIgnoreProperties;
import lombok.AllArgsConstructor;
import lombok.Data;
import lombok.EqualsAndHashCode;
@@ -34,6 +35,7 @@ import lombok.experimental.SuperBuilder;
@EqualsAndHashCode(callSuper = true)
@ToString(callSuper = true)
@SuperBuilder
+@JsonIgnoreProperties(ignoreUnknown = true)
public class FlinkSessionJobSpec extends AbstractFlinkSpec {
/** The name of the target session cluster deployment. */
diff --git
a/flink-kubernetes-operator-api/src/main/java/org/apache/flink/kubernetes/operator/api/spec/IngressSpec.java
b/flink-kubernetes-operator-api/src/main/java/org/apache/flink/kubernetes/operator/api/spec/IngressSpec.java
index da90281c..d0111e92 100644
---
a/flink-kubernetes-operator-api/src/main/java/org/apache/flink/kubernetes/operator/api/spec/IngressSpec.java
+++
b/flink-kubernetes-operator-api/src/main/java/org/apache/flink/kubernetes/operator/api/spec/IngressSpec.java
@@ -19,6 +19,7 @@ package org.apache.flink.kubernetes.operator.api.spec;
import org.apache.flink.annotation.Experimental;
+import com.fasterxml.jackson.annotation.JsonIgnoreProperties;
import lombok.AllArgsConstructor;
import lombok.Builder;
import lombok.Data;
@@ -32,6 +33,7 @@ import java.util.Map;
@NoArgsConstructor
@AllArgsConstructor
@Builder
+@JsonIgnoreProperties(ignoreUnknown = true)
public class IngressSpec {
/** Ingress template for the JobManager service. */
diff --git
a/flink-kubernetes-operator-api/src/main/java/org/apache/flink/kubernetes/operator/api/spec/JobManagerSpec.java
b/flink-kubernetes-operator-api/src/main/java/org/apache/flink/kubernetes/operator/api/spec/JobManagerSpec.java
index 26f1dbb6..522a1e08 100644
---
a/flink-kubernetes-operator-api/src/main/java/org/apache/flink/kubernetes/operator/api/spec/JobManagerSpec.java
+++
b/flink-kubernetes-operator-api/src/main/java/org/apache/flink/kubernetes/operator/api/spec/JobManagerSpec.java
@@ -19,6 +19,7 @@ package org.apache.flink.kubernetes.operator.api.spec;
import org.apache.flink.annotation.Experimental;
+import com.fasterxml.jackson.annotation.JsonIgnoreProperties;
import io.fabric8.kubernetes.api.model.Pod;
import lombok.AllArgsConstructor;
import lombok.Builder;
@@ -31,6 +32,7 @@ import lombok.NoArgsConstructor;
@NoArgsConstructor
@AllArgsConstructor
@Builder
+@JsonIgnoreProperties(ignoreUnknown = true)
public class JobManagerSpec {
/** Resource specification for the JobManager pods. */
private Resource resource;
diff --git
a/flink-kubernetes-operator-api/src/main/java/org/apache/flink/kubernetes/operator/api/spec/JobSpec.java
b/flink-kubernetes-operator-api/src/main/java/org/apache/flink/kubernetes/operator/api/spec/JobSpec.java
index 361ad84d..8740701b 100644
---
a/flink-kubernetes-operator-api/src/main/java/org/apache/flink/kubernetes/operator/api/spec/JobSpec.java
+++
b/flink-kubernetes-operator-api/src/main/java/org/apache/flink/kubernetes/operator/api/spec/JobSpec.java
@@ -22,6 +22,7 @@ import org.apache.flink.kubernetes.operator.api.diff.DiffType;
import org.apache.flink.kubernetes.operator.api.diff.Diffable;
import org.apache.flink.kubernetes.operator.api.diff.SpecDiff;
+import com.fasterxml.jackson.annotation.JsonIgnoreProperties;
import lombok.AllArgsConstructor;
import lombok.Builder;
import lombok.Data;
@@ -35,6 +36,7 @@ import lombok.NoArgsConstructor;
@AllArgsConstructor
@Builder
@EqualsAndHashCode
+@JsonIgnoreProperties(ignoreUnknown = true)
public class JobSpec implements Diffable<JobSpec> {
/**
diff --git
a/flink-kubernetes-operator-api/src/main/java/org/apache/flink/kubernetes/operator/api/spec/Resource.java
b/flink-kubernetes-operator-api/src/main/java/org/apache/flink/kubernetes/operator/api/spec/Resource.java
index c1125660..96c53eb9 100644
---
a/flink-kubernetes-operator-api/src/main/java/org/apache/flink/kubernetes/operator/api/spec/Resource.java
+++
b/flink-kubernetes-operator-api/src/main/java/org/apache/flink/kubernetes/operator/api/spec/Resource.java
@@ -19,6 +19,7 @@ package org.apache.flink.kubernetes.operator.api.spec;
import org.apache.flink.annotation.Experimental;
+import com.fasterxml.jackson.annotation.JsonIgnoreProperties;
import lombok.AllArgsConstructor;
import lombok.Data;
import lombok.NoArgsConstructor;
@@ -28,6 +29,7 @@ import lombok.NoArgsConstructor;
@Data
@NoArgsConstructor
@AllArgsConstructor
+@JsonIgnoreProperties(ignoreUnknown = true)
public class Resource {
/** Amount of CPU allocated to the pod. */
private Double cpu;
diff --git
a/flink-kubernetes-operator-api/src/main/java/org/apache/flink/kubernetes/operator/api/spec/TaskManagerSpec.java
b/flink-kubernetes-operator-api/src/main/java/org/apache/flink/kubernetes/operator/api/spec/TaskManagerSpec.java
index 99058de6..0da6c070 100644
---
a/flink-kubernetes-operator-api/src/main/java/org/apache/flink/kubernetes/operator/api/spec/TaskManagerSpec.java
+++
b/flink-kubernetes-operator-api/src/main/java/org/apache/flink/kubernetes/operator/api/spec/TaskManagerSpec.java
@@ -22,6 +22,7 @@ import org.apache.flink.kubernetes.operator.api.diff.DiffType;
import org.apache.flink.kubernetes.operator.api.diff.Diffable;
import org.apache.flink.kubernetes.operator.api.diff.SpecDiff;
+import com.fasterxml.jackson.annotation.JsonIgnoreProperties;
import io.fabric8.kubernetes.api.model.Pod;
import io.fabric8.kubernetes.model.annotation.SpecReplicas;
import lombok.AllArgsConstructor;
@@ -35,6 +36,7 @@ import lombok.NoArgsConstructor;
@NoArgsConstructor
@AllArgsConstructor
@Builder
+@JsonIgnoreProperties(ignoreUnknown = true)
public class TaskManagerSpec implements Diffable<TaskManagerSpec> {
/** Resource specification for the TaskManager pods. */
private Resource resource;
diff --git
a/flink-kubernetes-operator-api/src/main/java/org/apache/flink/kubernetes/operator/api/status/FlinkDeploymentReconciliationStatus.java
b/flink-kubernetes-operator-api/src/main/java/org/apache/flink/kubernetes/operator/api/status/FlinkDeploymentReconciliationStatus.java
index 58e71866..82b999c1 100644
---
a/flink-kubernetes-operator-api/src/main/java/org/apache/flink/kubernetes/operator/api/status/FlinkDeploymentReconciliationStatus.java
+++
b/flink-kubernetes-operator-api/src/main/java/org/apache/flink/kubernetes/operator/api/status/FlinkDeploymentReconciliationStatus.java
@@ -19,6 +19,7 @@ package org.apache.flink.kubernetes.operator.api.status;
import org.apache.flink.kubernetes.operator.api.spec.FlinkDeploymentSpec;
+import com.fasterxml.jackson.annotation.JsonIgnoreProperties;
import lombok.Data;
import lombok.EqualsAndHashCode;
import lombok.NoArgsConstructor;
@@ -29,6 +30,7 @@ import lombok.ToString;
@NoArgsConstructor
@EqualsAndHashCode(callSuper = true)
@ToString(callSuper = true)
+@JsonIgnoreProperties(ignoreUnknown = true)
public class FlinkDeploymentReconciliationStatus extends
ReconciliationStatus<FlinkDeploymentSpec> {
@Override
diff --git
a/flink-kubernetes-operator-api/src/main/java/org/apache/flink/kubernetes/operator/api/status/FlinkDeploymentStatus.java
b/flink-kubernetes-operator-api/src/main/java/org/apache/flink/kubernetes/operator/api/status/FlinkDeploymentStatus.java
index bc80bfd3..136d3415 100644
---
a/flink-kubernetes-operator-api/src/main/java/org/apache/flink/kubernetes/operator/api/status/FlinkDeploymentStatus.java
+++
b/flink-kubernetes-operator-api/src/main/java/org/apache/flink/kubernetes/operator/api/status/FlinkDeploymentStatus.java
@@ -20,6 +20,7 @@ package org.apache.flink.kubernetes.operator.api.status;
import org.apache.flink.annotation.Experimental;
import org.apache.flink.kubernetes.operator.api.spec.FlinkDeploymentSpec;
+import com.fasterxml.jackson.annotation.JsonIgnoreProperties;
import lombok.AllArgsConstructor;
import lombok.Data;
import lombok.EqualsAndHashCode;
@@ -38,6 +39,7 @@ import java.util.Map;
@EqualsAndHashCode(callSuper = true)
@ToString(callSuper = true)
@SuperBuilder
+@JsonIgnoreProperties(ignoreUnknown = true)
public class FlinkDeploymentStatus extends CommonStatus<FlinkDeploymentSpec> {
/** Information from running clusters. */
diff --git
a/flink-kubernetes-operator-api/src/main/java/org/apache/flink/kubernetes/operator/api/status/FlinkSessionJobReconciliationStatus.java
b/flink-kubernetes-operator-api/src/main/java/org/apache/flink/kubernetes/operator/api/status/FlinkSessionJobReconciliationStatus.java
index f40f1184..e352fd80 100644
---
a/flink-kubernetes-operator-api/src/main/java/org/apache/flink/kubernetes/operator/api/status/FlinkSessionJobReconciliationStatus.java
+++
b/flink-kubernetes-operator-api/src/main/java/org/apache/flink/kubernetes/operator/api/status/FlinkSessionJobReconciliationStatus.java
@@ -19,6 +19,7 @@ package org.apache.flink.kubernetes.operator.api.status;
import org.apache.flink.kubernetes.operator.api.spec.FlinkSessionJobSpec;
+import com.fasterxml.jackson.annotation.JsonIgnoreProperties;
import lombok.Data;
import lombok.EqualsAndHashCode;
import lombok.ToString;
@@ -27,6 +28,7 @@ import lombok.ToString;
@Data
@EqualsAndHashCode(callSuper = true)
@ToString(callSuper = true)
+@JsonIgnoreProperties(ignoreUnknown = true)
public class FlinkSessionJobReconciliationStatus extends
ReconciliationStatus<FlinkSessionJobSpec> {
@Override
diff --git
a/flink-kubernetes-operator-api/src/main/java/org/apache/flink/kubernetes/operator/api/status/FlinkSessionJobStatus.java
b/flink-kubernetes-operator-api/src/main/java/org/apache/flink/kubernetes/operator/api/status/FlinkSessionJobStatus.java
index a39eb912..1b0fee70 100644
---
a/flink-kubernetes-operator-api/src/main/java/org/apache/flink/kubernetes/operator/api/status/FlinkSessionJobStatus.java
+++
b/flink-kubernetes-operator-api/src/main/java/org/apache/flink/kubernetes/operator/api/status/FlinkSessionJobStatus.java
@@ -20,6 +20,7 @@ package org.apache.flink.kubernetes.operator.api.status;
import org.apache.flink.annotation.Experimental;
import org.apache.flink.kubernetes.operator.api.spec.FlinkSessionJobSpec;
+import com.fasterxml.jackson.annotation.JsonIgnoreProperties;
import lombok.AllArgsConstructor;
import lombok.Data;
import lombok.EqualsAndHashCode;
@@ -35,6 +36,7 @@ import lombok.experimental.SuperBuilder;
@EqualsAndHashCode(callSuper = true)
@ToString(callSuper = true)
@SuperBuilder
+@JsonIgnoreProperties(ignoreUnknown = true)
public class FlinkSessionJobStatus extends CommonStatus<FlinkSessionJobSpec> {
/** Status of the last reconcile operation. */
diff --git
a/flink-kubernetes-operator-api/src/main/java/org/apache/flink/kubernetes/operator/api/status/JobStatus.java
b/flink-kubernetes-operator-api/src/main/java/org/apache/flink/kubernetes/operator/api/status/JobStatus.java
index 9b9e15e9..4037eb11 100644
---
a/flink-kubernetes-operator-api/src/main/java/org/apache/flink/kubernetes/operator/api/status/JobStatus.java
+++
b/flink-kubernetes-operator-api/src/main/java/org/apache/flink/kubernetes/operator/api/status/JobStatus.java
@@ -19,6 +19,7 @@ package org.apache.flink.kubernetes.operator.api.status;
import org.apache.flink.annotation.Experimental;
+import com.fasterxml.jackson.annotation.JsonIgnoreProperties;
import io.fabric8.kubernetes.model.annotation.PrinterColumn;
import lombok.AllArgsConstructor;
import lombok.Builder;
@@ -31,6 +32,7 @@ import lombok.NoArgsConstructor;
@NoArgsConstructor
@AllArgsConstructor
@Builder(toBuilder = true)
+@JsonIgnoreProperties(ignoreUnknown = true)
public class JobStatus {
/** Name of the job. */
private String jobName;
diff --git
a/flink-kubernetes-operator-api/src/main/java/org/apache/flink/kubernetes/operator/api/status/TaskManagerInfo.java
b/flink-kubernetes-operator-api/src/main/java/org/apache/flink/kubernetes/operator/api/status/TaskManagerInfo.java
index 5215f26f..c5973af5 100644
---
a/flink-kubernetes-operator-api/src/main/java/org/apache/flink/kubernetes/operator/api/status/TaskManagerInfo.java
+++
b/flink-kubernetes-operator-api/src/main/java/org/apache/flink/kubernetes/operator/api/status/TaskManagerInfo.java
@@ -19,6 +19,7 @@ package org.apache.flink.kubernetes.operator.api.status;
import org.apache.flink.annotation.Experimental;
+import com.fasterxml.jackson.annotation.JsonIgnoreProperties;
import io.fabric8.kubernetes.model.annotation.LabelSelector;
import io.fabric8.kubernetes.model.annotation.StatusReplicas;
import lombok.AllArgsConstructor;
@@ -32,6 +33,7 @@ import lombok.NoArgsConstructor;
@NoArgsConstructor
@AllArgsConstructor
@Builder(toBuilder = true)
+@JsonIgnoreProperties(ignoreUnknown = true)
public class TaskManagerInfo {
/** TaskManager label selector. */
@LabelSelector private String labelSelector;
diff --git
a/flink-kubernetes-operator-api/src/test/java/org/apache/flink/kubernetes/operator/api/validation/CrdCompatibilityCheckerTest.java
b/flink-kubernetes-operator-api/src/test/java/org/apache/flink/kubernetes/operator/api/validation/CrdCompatibilityCheckerTest.java
index 9e523a97..43fa325f 100644
---
a/flink-kubernetes-operator-api/src/test/java/org/apache/flink/kubernetes/operator/api/validation/CrdCompatibilityCheckerTest.java
+++
b/flink-kubernetes-operator-api/src/test/java/org/apache/flink/kubernetes/operator/api/validation/CrdCompatibilityCheckerTest.java
@@ -17,11 +17,17 @@
package org.apache.flink.kubernetes.operator.api.validation;
+import org.apache.flink.kubernetes.operator.api.FlinkDeployment;
+import org.apache.flink.kubernetes.operator.api.FlinkSessionJob;
+
import com.fasterxml.jackson.core.JsonProcessingException;
import com.fasterxml.jackson.databind.ObjectMapper;
import com.fasterxml.jackson.dataformat.yaml.YAMLFactory;
import org.junit.jupiter.api.Test;
+import java.io.File;
+import java.io.IOException;
+
import static org.junit.jupiter.api.Assertions.assertEquals;
import static org.junit.jupiter.api.Assertions.fail;
@@ -239,6 +245,31 @@ public class CrdCompatibilityCheckerTest {
+ " type: object");
}
+ @Test
+ public void testCreateFlinkSessionJobIgnoreUnknownFields() throws
IOException {
+ FlinkSessionJob flinkSessionJobWithUnknownFields =
+ objectMapper.readValue(
+ new
File("src/test/resources/test-session-job-with-unknown-fields.yaml"),
+ FlinkSessionJob.class);
+ FlinkSessionJob flinkSessionJob =
+ objectMapper.readValue(
+ new File("src/test/resources/test-session-job.yaml"),
+ FlinkSessionJob.class);
+ assertEquals(flinkSessionJobWithUnknownFields.toString(),
flinkSessionJob.toString());
+ }
+
+ @Test
+ public void testCreateFlinkDeploymentIgnoreUnknownFields() throws
IOException {
+ FlinkDeployment flinkDeploymentWithUnknownFields =
+ objectMapper.readValue(
+ new
File("src/test/resources/test-deployment-with-unknown-fields.yaml"),
+ FlinkDeployment.class);
+ FlinkDeployment flinkDeployment =
+ objectMapper.readValue(
+ new File("src/test/resources/test-deployment.yaml"),
FlinkDeployment.class);
+ assertEquals(flinkDeploymentWithUnknownFields.toString(),
flinkDeployment.toString());
+ }
+
private void expectSuccess(String oldSchema, String newSchema) throws
JsonProcessingException {
var oldNode = objectMapper.readTree(oldSchema).get("openAPIV3Schema");
var newNode = objectMapper.readTree(newSchema).get("openAPIV3Schema");
diff --git
a/flink-kubernetes-operator-api/src/test/resources/test-deployment-with-unknown-fields.yaml
b/flink-kubernetes-operator-api/src/test/resources/test-deployment-with-unknown-fields.yaml
new file mode 100644
index 00000000..e130046a
--- /dev/null
+++
b/flink-kubernetes-operator-api/src/test/resources/test-deployment-with-unknown-fields.yaml
@@ -0,0 +1,46 @@
+################################################################################
+# Licensed to the Apache Software Foundation (ASF) under one
+# or more contributor license agreements. See the NOTICE file
+# distributed with this work for additional information
+# regarding copyright ownership. The ASF licenses this file
+# to you under the Apache License, Version 2.0 (the
+# "License"); you may not use this file except in compliance
+# with the License. You may obtain a copy of the License at
+#
+# http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing, software
+# distributed under the License is distributed on an "AS IS" BASIS,
+# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+# See the License for the specific language governing permissions and
+# limitations under the License.
+################################################################################
+
+apiVersion: flink.apache.org/v1beta1
+kind: FlinkDeployment
+metadata:
+ name: basic-example
+spec:
+ image: flink:1.16
+ flinkVersion: v1_16
+ flinkConfiguration:
+ taskmanager.numberOfTaskSlots: "2"
+ serviceAccount: flink
+ jobManager:
+ resource:
+ memory: "2048m"
+ cpu: 1
+ unknownField: testUnknownField
+ unknownField: testUnknownField
+ taskManager:
+ resource:
+ memory: "2048m"
+ cpu: 1
+ unknownField: testUnknownField
+ unknownField: testUnknownField
+ job:
+ jarURI: local:///opt/flink/examples/streaming/StateMachineExample.jar
+ parallelism: 2
+ upgradeMode: stateless
+ unknownField: testUnknownField
+ unknownField: testUnknownField
\ No newline at end of file
diff --git
a/flink-kubernetes-operator-api/src/test/resources/test-deployment.yaml
b/flink-kubernetes-operator-api/src/test/resources/test-deployment.yaml
new file mode 100644
index 00000000..2cee8ab3
--- /dev/null
+++ b/flink-kubernetes-operator-api/src/test/resources/test-deployment.yaml
@@ -0,0 +1,40 @@
+################################################################################
+# Licensed to the Apache Software Foundation (ASF) under one
+# or more contributor license agreements. See the NOTICE file
+# distributed with this work for additional information
+# regarding copyright ownership. The ASF licenses this file
+# to you under the Apache License, Version 2.0 (the
+# "License"); you may not use this file except in compliance
+# with the License. You may obtain a copy of the License at
+#
+# http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing, software
+# distributed under the License is distributed on an "AS IS" BASIS,
+# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+# See the License for the specific language governing permissions and
+# limitations under the License.
+################################################################################
+
+apiVersion: flink.apache.org/v1beta1
+kind: FlinkDeployment
+metadata:
+ name: basic-example
+spec:
+ image: flink:1.16
+ flinkVersion: v1_16
+ flinkConfiguration:
+ taskmanager.numberOfTaskSlots: "2"
+ serviceAccount: flink
+ jobManager:
+ resource:
+ memory: "2048m"
+ cpu: 1
+ taskManager:
+ resource:
+ memory: "2048m"
+ cpu: 1
+ job:
+ jarURI: local:///opt/flink/examples/streaming/StateMachineExample.jar
+ parallelism: 2
+ upgradeMode: stateless
diff --git
a/flink-kubernetes-operator-api/src/test/resources/test-session-job-with-unknown-fields.yaml
b/flink-kubernetes-operator-api/src/test/resources/test-session-job-with-unknown-fields.yaml
new file mode 100644
index 00000000..1e28385e
--- /dev/null
+++
b/flink-kubernetes-operator-api/src/test/resources/test-session-job-with-unknown-fields.yaml
@@ -0,0 +1,30 @@
+################################################################################
+# Licensed to the Apache Software Foundation (ASF) under one
+# or more contributor license agreements. See the NOTICE file
+# distributed with this work for additional information
+# regarding copyright ownership. The ASF licenses this file
+# to you under the Apache License, Version 2.0 (the
+# "License"); you may not use this file except in compliance
+# with the License. You may obtain a copy of the License at
+#
+# http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing, software
+# distributed under the License is distributed on an "AS IS" BASIS,
+# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+# See the License for the specific language governing permissions and
+# limitations under the License.
+################################################################################
+apiVersion: flink.apache.org/v1beta1
+kind: FlinkSessionJob
+metadata:
+ name: basic-session-job-only-example
+spec:
+ deploymentName: basic-session-deployment-only-example
+ job:
+ jarURI:
https://repo1.maven.org/maven2/org/apache/flink/flink-examples-streaming_2.12/1.15.3/flink-examples-streaming_2.12-1.15.3-TopSpeedWindowing.jar
+ parallelism: 4
+ upgradeMode: stateless
+ unknownField: testUnknownField
+ unknownField: testUnknownField
+
diff --git
a/flink-kubernetes-operator-api/src/test/resources/test-session-job.yaml
b/flink-kubernetes-operator-api/src/test/resources/test-session-job.yaml
new file mode 100644
index 00000000..e6d8123c
--- /dev/null
+++ b/flink-kubernetes-operator-api/src/test/resources/test-session-job.yaml
@@ -0,0 +1,27 @@
+################################################################################
+# Licensed to the Apache Software Foundation (ASF) under one
+# or more contributor license agreements. See the NOTICE file
+# distributed with this work for additional information
+# regarding copyright ownership. The ASF licenses this file
+# to you under the Apache License, Version 2.0 (the
+# "License"); you may not use this file except in compliance
+# with the License. You may obtain a copy of the License at
+#
+# http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing, software
+# distributed under the License is distributed on an "AS IS" BASIS,
+# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+# See the License for the specific language governing permissions and
+# limitations under the License.
+################################################################################
+apiVersion: flink.apache.org/v1beta1
+kind: FlinkSessionJob
+metadata:
+ name: basic-session-job-only-example
+spec:
+ deploymentName: basic-session-deployment-only-example
+ job:
+ jarURI:
https://repo1.maven.org/maven2/org/apache/flink/flink-examples-streaming_2.12/1.15.3/flink-examples-streaming_2.12-1.15.3-TopSpeedWindowing.jar
+ parallelism: 4
+ upgradeMode: stateless