This is an automated email from the ASF dual-hosted git repository.

gyfora pushed a commit to branch main
in repository https://gitbox.apache.org/repos/asf/flink-kubernetes-operator.git

commit 20b86883ca07e886cefd6ff149444f33fd36b961
Author: Gyula Fora <[email protected]>
AuthorDate: Mon Jun 5 09:40:09 2023 +0200

    [FLINK-32240] Upgrade Flink to 1.17.1
---
 .github/workflows/ci.yml                           |   4 +-
 .../content/docs/custom-resource/job-management.md |   4 +-
 docs/content/docs/custom-resource/overview.md      |   4 +-
 docs/content/docs/custom-resource/pod-template.md  |   4 +-
 docs/content/docs/operations/ingress.md            |   4 +-
 e2e-tests/data/flinkdep-cr.yaml                    |   4 +-
 e2e-tests/data/multi-sessionjob.yaml               |   8 +-
 e2e-tests/data/sessionjob-cr.yaml                  |   4 +-
 examples/advanced-ingress.yaml                     |   4 +-
 examples/basic-checkpoint-ha.yaml                  |   4 +-
 examples/basic-ingress.yaml                        |   4 +-
 examples/basic-reactive.yaml                       |   4 +-
 examples/basic-session-deployment-and-job.yaml     |   4 +-
 examples/basic-session-deployment-only.yaml        |   4 +-
 examples/basic.yaml                                |   4 +-
 examples/custom-logging.yaml                       |   4 +-
 examples/flink-python-example/Dockerfile           |   2 +-
 examples/pod-template.yaml                         |   4 +-
 flink-kubernetes-docs/pom.xml                      |   1 -
 flink-kubernetes-operator-api/pom.xml              |   1 -
 .../operator/api/utils/BaseTestUtils.java          |   6 +-
 .../src/test/resources/log4j2-test.properties      |   2 +-
 .../test-deployment-with-unknown-fields.yaml       |   4 +-
 .../src/test/resources/test-deployment.yaml        |   4 +-
 flink-kubernetes-operator-autoscaler/pom.xml       |   1 -
 .../operator/autoscaler/ScalingExecutor.java       |  12 +--
 .../operator/autoscaler/ScalingExecutorTest.java   |   2 +-
 .../src/test/resources/log4j2-test.properties      |   2 +-
 flink-kubernetes-operator/pom.xml                  |   1 -
 .../operator/config/FlinkConfigBuilder.java        |  49 ++++++---
 .../operator/service/AbstractFlinkService.java     |   9 +-
 .../service/FlinkResourceContextFactory.java       |   5 +-
 .../src/main/resources/META-INF/NOTICE             |  52 +++++-----
 .../operator/config/FlinkConfigBuilderTest.java    | 111 ++++++++++++---------
 .../metrics/KubernetesOperatorMetricGroupTest.java |   6 +-
 .../operator/service/NativeFlinkServiceTest.java   |  54 ++++++++++
 .../src/test/resources/log4j2-test.properties      |   2 +-
 flink-kubernetes-standalone/pom.xml                |  12 ++-
 .../Fabric8FlinkStandaloneKubeClient.java          |   9 +-
 .../kubeclient/FlinkStandaloneKubeClient.java      |   3 +-
 .../CmdStandaloneJobManagerDecorator.java          |   5 +-
 .../CmdStandaloneTaskManagerDecorator.java         |   5 +-
 .../InitStandaloneTaskManagerDecorator.java        |  15 ++-
 .../decorators/UserLibMountDecorator.java          |  15 ++-
 .../StandaloneKubernetesJobManagerFactory.java     |  13 ++-
 .../StandaloneKubernetesTaskManagerFactory.java    |   9 +-
 .../KubernetesStandaloneClusterDescriptor.java     |   2 +-
 .../Fabric8FlinkStandaloneKubeClientTest.java      |  34 ++++---
 .../InitStandaloneTaskManagerDecoratorTest.java    |  16 +--
 .../decorators/UserLibMountDecoratorTest.java      |  10 +-
 .../StandaloneKubernetesJobManagerFactoryTest.java |  24 ++---
 ...StandaloneKubernetesTaskManagerFactoryTest.java |  18 ++--
 .../kubeclient/parameters/ParametersTestBase.java  |  17 ++--
 .../KubernetesStandaloneClusterDescriptorTest.java |  32 ++++--
 .../src/test/resources/log4j2-test.properties      |   2 +-
 flink-kubernetes-webhook/pom.xml                   |   1 -
 pom.xml                                            |  15 ++-
 57 files changed, 379 insertions(+), 276 deletions(-)

diff --git a/.github/workflows/ci.yml b/.github/workflows/ci.yml
index d310b3a4..4386451b 100644
--- a/.github/workflows/ci.yml
+++ b/.github/workflows/ci.yml
@@ -67,7 +67,7 @@ jobs:
     runs-on: ubuntu-latest
     strategy:
       matrix:
-        version: ["v1_16","v1_15","v1_14","v1_13"]
+        version: ["v1_17","v1_16","v1_15","v1_14","v1_13"]
         namespace: ["default","flink"]
         mode: ["native", "standalone"]
         test:
@@ -79,6 +79,8 @@ jobs:
         include:
           - namespace: flink
             extraArgs: '--create-namespace --set 
"watchNamespaces={default,flink}"'
+          - version: v1_17
+            image: flink:1.17
           - version: v1_16
             image: flink:1.16
           - version: v1_15
diff --git a/docs/content/docs/custom-resource/job-management.md 
b/docs/content/docs/custom-resource/job-management.md
index 45d08403..a3f37a89 100644
--- a/docs/content/docs/custom-resource/job-management.md
+++ b/docs/content/docs/custom-resource/job-management.md
@@ -111,8 +111,8 @@ kind: FlinkDeployment
 metadata:
   name: basic-checkpoint-ha-example
 spec:
-  image: flink:1.16
-  flinkVersion: v1_16
+  image: flink:1.17
+  flinkVersion: v1_17
   flinkConfiguration:
     taskmanager.numberOfTaskSlots: "2"
     state.savepoints.dir: file:///flink-data/savepoints
diff --git a/docs/content/docs/custom-resource/overview.md 
b/docs/content/docs/custom-resource/overview.md
index 5c15c6a8..83cc17b9 100644
--- a/docs/content/docs/custom-resource/overview.md
+++ b/docs/content/docs/custom-resource/overview.md
@@ -114,8 +114,8 @@ metadata:
   namespace: default
   name: basic-example
 spec:
-  image: flink:1.16
-  flinkVersion: v1_16
+  image: flink:1.17
+  flinkVersion: v1_17
   flinkConfiguration:
     taskmanager.numberOfTaskSlots: "2"
   serviceAccount: flink
diff --git a/docs/content/docs/custom-resource/pod-template.md 
b/docs/content/docs/custom-resource/pod-template.md
index 5a428f1c..6d704c92 100644
--- a/docs/content/docs/custom-resource/pod-template.md
+++ b/docs/content/docs/custom-resource/pod-template.md
@@ -49,8 +49,8 @@ metadata:
   namespace: default
   name: pod-template-example
 spec:
-  image: flink:1.16
-  flinkVersion: v1_16
+  image: flink:1.17
+  flinkVersion: v1_17
   flinkConfiguration:
     taskmanager.numberOfTaskSlots: "2"
   serviceAccount: flink
diff --git a/docs/content/docs/operations/ingress.md 
b/docs/content/docs/operations/ingress.md
index ab3c94f0..0947f2d2 100644
--- a/docs/content/docs/operations/ingress.md
+++ b/docs/content/docs/operations/ingress.md
@@ -35,8 +35,8 @@ metadata:
   namespace: default
   name: advanced-ingress
 spec:
-  image: flink:1.16
-  flinkVersion: v1_16
+  image: flink:1.17
+  flinkVersion: v1_17
   ingress:
     template: "flink.k8s.io/{{namespace}}/{{name}}(/|$)(.*)"
     className: "nginx"
diff --git a/e2e-tests/data/flinkdep-cr.yaml b/e2e-tests/data/flinkdep-cr.yaml
index 2170fbe1..e4273787 100644
--- a/e2e-tests/data/flinkdep-cr.yaml
+++ b/e2e-tests/data/flinkdep-cr.yaml
@@ -22,8 +22,8 @@ metadata:
   namespace: default
   name: flink-example-statemachine
 spec:
-  image: flink:1.16
-  flinkVersion: v1_16
+  image: flink:1.17
+  flinkVersion: v1_17
   ingress:
     template: "/{{namespace}}/{{name}}(/|$)(.*)"
     className: "nginx"
diff --git a/e2e-tests/data/multi-sessionjob.yaml 
b/e2e-tests/data/multi-sessionjob.yaml
index 276fe77a..160dae68 100644
--- a/e2e-tests/data/multi-sessionjob.yaml
+++ b/e2e-tests/data/multi-sessionjob.yaml
@@ -22,8 +22,8 @@ metadata:
   namespace: default
   name: session-cluster-1
 spec:
-  image: flink:1.16
-  flinkVersion: v1_16
+  image: flink:1.17
+  flinkVersion: v1_17
   ingress:
     template: "/{{namespace}}/{{name}}(/|$)(.*)"
     className: "nginx"
@@ -74,8 +74,8 @@ metadata:
   namespace: flink
   name: session-cluster-1
 spec:
-  image: flink:1.16
-  flinkVersion: v1_16
+  image: flink:1.17
+  flinkVersion: v1_17
   ingress:
     template: "/{{namespace}}/{{name}}(/|$)(.*)"
     className: "nginx"
diff --git a/e2e-tests/data/sessionjob-cr.yaml 
b/e2e-tests/data/sessionjob-cr.yaml
index cc2f93fb..d10e1cd3 100644
--- a/e2e-tests/data/sessionjob-cr.yaml
+++ b/e2e-tests/data/sessionjob-cr.yaml
@@ -22,8 +22,8 @@ metadata:
   namespace: default
   name: session-cluster-1
 spec:
-  image: flink:1.16
-  flinkVersion: v1_16
+  image: flink:1.17
+  flinkVersion: v1_17
   ingress:
     template: "/{{namespace}}/{{name}}(/|$)(.*)"
     className: "nginx"
diff --git a/examples/advanced-ingress.yaml b/examples/advanced-ingress.yaml
index 50532194..c67e2f36 100644
--- a/examples/advanced-ingress.yaml
+++ b/examples/advanced-ingress.yaml
@@ -21,8 +21,8 @@ kind: FlinkDeployment
 metadata:
   name: advanced-ingress
 spec:
-  image: flink:1.16
-  flinkVersion: v1_16
+  image: flink:1.17
+  flinkVersion: v1_17
   ingress:
     template: "/{{namespace}}/{{name}}(/|$)(.*)"
     className: "nginx"
diff --git a/examples/basic-checkpoint-ha.yaml 
b/examples/basic-checkpoint-ha.yaml
index c06c67ab..a6d035d4 100644
--- a/examples/basic-checkpoint-ha.yaml
+++ b/examples/basic-checkpoint-ha.yaml
@@ -21,8 +21,8 @@ kind: FlinkDeployment
 metadata:
   name: basic-checkpoint-ha-example
 spec:
-  image: flink:1.16
-  flinkVersion: v1_16
+  image: flink:1.17
+  flinkVersion: v1_17
   flinkConfiguration:
     taskmanager.numberOfTaskSlots: "2"
     state.savepoints.dir: file:///flink-data/savepoints
diff --git a/examples/basic-ingress.yaml b/examples/basic-ingress.yaml
index f60a434a..b81c4f8f 100644
--- a/examples/basic-ingress.yaml
+++ b/examples/basic-ingress.yaml
@@ -21,8 +21,8 @@ kind: FlinkDeployment
 metadata:
   name: basic-ingress
 spec:
-  image: flink:1.16
-  flinkVersion: v1_16
+  image: flink:1.17
+  flinkVersion: v1_17
   ingress:
     template: "{{name}}.{{namespace}}.flink.k8s.io"
   flinkConfiguration:
diff --git a/examples/basic-reactive.yaml b/examples/basic-reactive.yaml
index 8901440a..3a6ad240 100644
--- a/examples/basic-reactive.yaml
+++ b/examples/basic-reactive.yaml
@@ -21,8 +21,8 @@ kind: FlinkDeployment
 metadata:
   name: basic-reactive-example
 spec:
-  image: flink:1.16
-  flinkVersion: v1_16
+  image: flink:1.17
+  flinkVersion: v1_17
   flinkConfiguration:
     scheduler-mode: REACTIVE
     taskmanager.numberOfTaskSlots: "2"
diff --git a/examples/basic-session-deployment-and-job.yaml 
b/examples/basic-session-deployment-and-job.yaml
index 28457b60..ea1d9923 100644
--- a/examples/basic-session-deployment-and-job.yaml
+++ b/examples/basic-session-deployment-and-job.yaml
@@ -21,8 +21,8 @@ kind: FlinkDeployment
 metadata:
   name: basic-session-deployment-example
 spec:
-  image: flink:1.16
-  flinkVersion: v1_16
+  image: flink:1.17
+  flinkVersion: v1_17
   jobManager:
     resource:
       memory: "2048m"
diff --git a/examples/basic-session-deployment-only.yaml 
b/examples/basic-session-deployment-only.yaml
index 42c2d715..044554e0 100644
--- a/examples/basic-session-deployment-only.yaml
+++ b/examples/basic-session-deployment-only.yaml
@@ -21,8 +21,8 @@ kind: FlinkDeployment
 metadata:
   name: basic-session-deployment-only-example
 spec:
-  image: flink:1.16
-  flinkVersion: v1_16
+  image: flink:1.17
+  flinkVersion: v1_17
   flinkConfiguration:
     taskmanager.numberOfTaskSlots: "2"
   serviceAccount: flink
diff --git a/examples/basic.yaml b/examples/basic.yaml
index 2cee8ab3..66153d15 100644
--- a/examples/basic.yaml
+++ b/examples/basic.yaml
@@ -21,8 +21,8 @@ kind: FlinkDeployment
 metadata:
   name: basic-example
 spec:
-  image: flink:1.16
-  flinkVersion: v1_16
+  image: flink:1.17
+  flinkVersion: v1_17
   flinkConfiguration:
     taskmanager.numberOfTaskSlots: "2"
   serviceAccount: flink
diff --git a/examples/custom-logging.yaml b/examples/custom-logging.yaml
index 7f3c8412..e3dc70c3 100644
--- a/examples/custom-logging.yaml
+++ b/examples/custom-logging.yaml
@@ -21,8 +21,8 @@ kind: FlinkDeployment
 metadata:
   name: custom-logging-example
 spec:
-  image: flink:1.16
-  flinkVersion: v1_16
+  image: flink:1.17
+  flinkVersion: v1_17
   flinkConfiguration:
     taskmanager.numberOfTaskSlots: "2"
   serviceAccount: flink
diff --git a/examples/flink-python-example/Dockerfile 
b/examples/flink-python-example/Dockerfile
index 47052007..3046bdf8 100644
--- a/examples/flink-python-example/Dockerfile
+++ b/examples/flink-python-example/Dockerfile
@@ -36,7 +36,7 @@ apt-get clean && \
 rm -rf /var/lib/apt/lists/*
 
 # install PyFlink
-RUN pip3 install "apache-flink>=1.16.0,<1.17.0"
+RUN pip3 install "apache-flink>=1.16.0,<1.17.1"
 
 # add python script
 USER flink
diff --git a/examples/pod-template.yaml b/examples/pod-template.yaml
index a5e0ff20..17ec3c8a 100644
--- a/examples/pod-template.yaml
+++ b/examples/pod-template.yaml
@@ -21,8 +21,8 @@ kind: FlinkDeployment
 metadata:
   name: pod-template-example
 spec:
-  image: flink:1.16
-  flinkVersion: v1_16
+  image: flink:1.17
+  flinkVersion: v1_17
   flinkConfiguration:
     taskmanager.numberOfTaskSlots: "2"
   serviceAccount: flink
diff --git a/flink-kubernetes-docs/pom.xml b/flink-kubernetes-docs/pom.xml
index 65ed4cb9..4bad2d46 100644
--- a/flink-kubernetes-docs/pom.xml
+++ b/flink-kubernetes-docs/pom.xml
@@ -54,7 +54,6 @@ under the License.
         <dependency>
             <groupId>org.junit.jupiter</groupId>
             <artifactId>junit-jupiter-api</artifactId>
-            <version>${junit.jupiter.version}</version>
             <scope>test</scope>
         </dependency>
 
diff --git a/flink-kubernetes-operator-api/pom.xml 
b/flink-kubernetes-operator-api/pom.xml
index 05a85b1f..8ed1976d 100644
--- a/flink-kubernetes-operator-api/pom.xml
+++ b/flink-kubernetes-operator-api/pom.xml
@@ -127,7 +127,6 @@ under the License.
         <dependency>
             <groupId>org.junit.jupiter</groupId>
             <artifactId>junit-jupiter-params</artifactId>
-            <version>${junit.jupiter.version}</version>
             <scope>test</scope>
         </dependency>
     </dependencies>
diff --git 
a/flink-kubernetes-operator-api/src/test/java/org/apache/flink/kubernetes/operator/api/utils/BaseTestUtils.java
 
b/flink-kubernetes-operator-api/src/test/java/org/apache/flink/kubernetes/operator/api/utils/BaseTestUtils.java
index ea4e1074..372f9d0e 100644
--- 
a/flink-kubernetes-operator-api/src/test/java/org/apache/flink/kubernetes/operator/api/utils/BaseTestUtils.java
+++ 
b/flink-kubernetes-operator-api/src/test/java/org/apache/flink/kubernetes/operator/api/utils/BaseTestUtils.java
@@ -59,7 +59,7 @@ public class BaseTestUtils {
     public static final String SAMPLE_JAR = "local:///tmp/sample.jar";
 
     public static FlinkDeployment buildSessionCluster() {
-        return buildSessionCluster(FlinkVersion.v1_15);
+        return buildSessionCluster(FlinkVersion.v1_17);
     }
 
     public static FlinkDeployment buildSessionCluster(FlinkVersion version) {
@@ -83,11 +83,11 @@ public class BaseTestUtils {
     }
 
     public static FlinkDeployment buildApplicationCluster() {
-        return buildApplicationCluster(FlinkVersion.v1_15);
+        return buildApplicationCluster(FlinkVersion.v1_17);
     }
 
     public static FlinkDeployment buildApplicationCluster(String name, String 
namespace) {
-        return buildApplicationCluster(name, namespace, FlinkVersion.v1_15);
+        return buildApplicationCluster(name, namespace, FlinkVersion.v1_17);
     }
 
     public static FlinkDeployment buildApplicationCluster(FlinkVersion 
version) {
diff --git 
a/flink-kubernetes-operator/src/test/resources/log4j2-test.properties 
b/flink-kubernetes-operator-api/src/test/resources/log4j2-test.properties
similarity index 98%
copy from flink-kubernetes-operator/src/test/resources/log4j2-test.properties
copy to flink-kubernetes-operator-api/src/test/resources/log4j2-test.properties
index 3b7cec7a..47b66644 100644
--- a/flink-kubernetes-operator/src/test/resources/log4j2-test.properties
+++ b/flink-kubernetes-operator-api/src/test/resources/log4j2-test.properties
@@ -16,7 +16,7 @@
 # limitations under the License.
 
################################################################################
 
-rootLogger.level = DEBUG
+rootLogger.level = INFO
 rootLogger.appenderRef.console.ref = ConsoleAppender
 
 # Log all infos to the console
diff --git 
a/flink-kubernetes-operator-api/src/test/resources/test-deployment-with-unknown-fields.yaml
 
b/flink-kubernetes-operator-api/src/test/resources/test-deployment-with-unknown-fields.yaml
index e130046a..d82ef371 100644
--- 
a/flink-kubernetes-operator-api/src/test/resources/test-deployment-with-unknown-fields.yaml
+++ 
b/flink-kubernetes-operator-api/src/test/resources/test-deployment-with-unknown-fields.yaml
@@ -21,8 +21,8 @@ kind: FlinkDeployment
 metadata:
   name: basic-example
 spec:
-  image: flink:1.16
-  flinkVersion: v1_16
+  image: flink:1.17
+  flinkVersion: v1_17
   flinkConfiguration:
     taskmanager.numberOfTaskSlots: "2"
   serviceAccount: flink
diff --git 
a/flink-kubernetes-operator-api/src/test/resources/test-deployment.yaml 
b/flink-kubernetes-operator-api/src/test/resources/test-deployment.yaml
index 2cee8ab3..66153d15 100644
--- a/flink-kubernetes-operator-api/src/test/resources/test-deployment.yaml
+++ b/flink-kubernetes-operator-api/src/test/resources/test-deployment.yaml
@@ -21,8 +21,8 @@ kind: FlinkDeployment
 metadata:
   name: basic-example
 spec:
-  image: flink:1.16
-  flinkVersion: v1_16
+  image: flink:1.17
+  flinkVersion: v1_17
   flinkConfiguration:
     taskmanager.numberOfTaskSlots: "2"
   serviceAccount: flink
diff --git a/flink-kubernetes-operator-autoscaler/pom.xml 
b/flink-kubernetes-operator-autoscaler/pom.xml
index 267fdee9..66307e8c 100644
--- a/flink-kubernetes-operator-autoscaler/pom.xml
+++ b/flink-kubernetes-operator-autoscaler/pom.xml
@@ -81,7 +81,6 @@ under the License.
         <dependency>
             <groupId>org.junit.jupiter</groupId>
             <artifactId>junit-jupiter-params</artifactId>
-            <version>${junit.jupiter.version}</version>
             <scope>test</scope>
         </dependency>
 
diff --git 
a/flink-kubernetes-operator-autoscaler/src/main/java/org/apache/flink/kubernetes/operator/autoscaler/ScalingExecutor.java
 
b/flink-kubernetes-operator-autoscaler/src/main/java/org/apache/flink/kubernetes/operator/autoscaler/ScalingExecutor.java
index 15d9dcba..a5da1cb7 100644
--- 
a/flink-kubernetes-operator-autoscaler/src/main/java/org/apache/flink/kubernetes/operator/autoscaler/ScalingExecutor.java
+++ 
b/flink-kubernetes-operator-autoscaler/src/main/java/org/apache/flink/kubernetes/operator/autoscaler/ScalingExecutor.java
@@ -19,8 +19,6 @@ package org.apache.flink.kubernetes.operator.autoscaler;
 
 import org.apache.flink.annotation.VisibleForTesting;
 import org.apache.flink.api.common.JobStatus;
-import org.apache.flink.configuration.ConfigOption;
-import org.apache.flink.configuration.ConfigOptions;
 import org.apache.flink.configuration.Configuration;
 import org.apache.flink.kubernetes.operator.api.AbstractFlinkResource;
 import 
org.apache.flink.kubernetes.operator.autoscaler.config.AutoScalerOptions;
@@ -43,6 +41,7 @@ import java.util.HashMap;
 import java.util.Map;
 import java.util.SortedMap;
 
+import static 
org.apache.flink.configuration.PipelineOptions.PARALLELISM_OVERRIDES;
 import static 
org.apache.flink.kubernetes.operator.autoscaler.config.AutoScalerOptions.SCALING_ENABLED;
 import static 
org.apache.flink.kubernetes.operator.autoscaler.config.AutoScalerOptions.STABILIZATION_INTERVAL;
 import static 
org.apache.flink.kubernetes.operator.autoscaler.metrics.ScalingMetric.EXPECTED_PROCESSING_RATE;
@@ -53,15 +52,6 @@ import static 
org.apache.flink.kubernetes.operator.autoscaler.metrics.ScalingMet
 
 /** Class responsible for executing scaling decisions. */
 public class ScalingExecutor {
-
-    public static final ConfigOption<Map<String, String>> 
PARALLELISM_OVERRIDES =
-            ConfigOptions.key("pipeline.jobvertex-parallelism-overrides")
-                    .mapType()
-                    .defaultValue(Collections.emptyMap())
-                    .withDescription(
-                            "A parallelism override map (jobVertexId -> 
parallelism) which will be used to update"
-                                    + " the parallelism of the corresponding 
job vertices of submitted JobGraphs.");
-
     public static final String SCALING_SUMMARY_ENTRY =
             " Vertex ID %s | Parallelism %d -> %d | Processing capacity %.2f 
-> %.2f | Target data rate %.2f";
     public static final String SCALING_SUMMARY_HEADER_SCALING_DISABLED =
diff --git 
a/flink-kubernetes-operator-autoscaler/src/test/java/org/apache/flink/kubernetes/operator/autoscaler/ScalingExecutorTest.java
 
b/flink-kubernetes-operator-autoscaler/src/test/java/org/apache/flink/kubernetes/operator/autoscaler/ScalingExecutorTest.java
index b38059a4..dbdf1451 100644
--- 
a/flink-kubernetes-operator-autoscaler/src/test/java/org/apache/flink/kubernetes/operator/autoscaler/ScalingExecutorTest.java
+++ 
b/flink-kubernetes-operator-autoscaler/src/test/java/org/apache/flink/kubernetes/operator/autoscaler/ScalingExecutorTest.java
@@ -44,7 +44,7 @@ import java.util.HashMap;
 import java.util.List;
 import java.util.Map;
 
-import static 
org.apache.flink.kubernetes.operator.autoscaler.ScalingExecutor.PARALLELISM_OVERRIDES;
+import static 
org.apache.flink.configuration.PipelineOptions.PARALLELISM_OVERRIDES;
 import static 
org.apache.flink.kubernetes.operator.autoscaler.ScalingExecutor.SCALING_SUMMARY_ENTRY;
 import static 
org.apache.flink.kubernetes.operator.autoscaler.ScalingExecutor.SCALING_SUMMARY_HEADER_SCALING_DISABLED;
 import static 
org.apache.flink.kubernetes.operator.autoscaler.ScalingExecutor.SCALING_SUMMARY_HEADER_SCALING_ENABLED;
diff --git 
a/flink-kubernetes-operator-autoscaler/src/test/resources/log4j2-test.properties
 
b/flink-kubernetes-operator-autoscaler/src/test/resources/log4j2-test.properties
index 3b7cec7a..47b66644 100644
--- 
a/flink-kubernetes-operator-autoscaler/src/test/resources/log4j2-test.properties
+++ 
b/flink-kubernetes-operator-autoscaler/src/test/resources/log4j2-test.properties
@@ -16,7 +16,7 @@
 # limitations under the License.
 
################################################################################
 
-rootLogger.level = DEBUG
+rootLogger.level = INFO
 rootLogger.appenderRef.console.ref = ConsoleAppender
 
 # Log all infos to the console
diff --git a/flink-kubernetes-operator/pom.xml 
b/flink-kubernetes-operator/pom.xml
index 3cb46b8c..735dfacb 100644
--- a/flink-kubernetes-operator/pom.xml
+++ b/flink-kubernetes-operator/pom.xml
@@ -191,7 +191,6 @@ under the License.
         <dependency>
             <groupId>org.junit.jupiter</groupId>
             <artifactId>junit-jupiter-params</artifactId>
-            <version>${junit.jupiter.version}</version>
             <scope>test</scope>
         </dependency>
     </dependencies>
diff --git 
a/flink-kubernetes-operator/src/main/java/org/apache/flink/kubernetes/operator/config/FlinkConfigBuilder.java
 
b/flink-kubernetes-operator/src/main/java/org/apache/flink/kubernetes/operator/config/FlinkConfigBuilder.java
index dd2fd525..f78c8b0a 100644
--- 
a/flink-kubernetes-operator/src/main/java/org/apache/flink/kubernetes/operator/config/FlinkConfigBuilder.java
+++ 
b/flink-kubernetes-operator/src/main/java/org/apache/flink/kubernetes/operator/config/FlinkConfigBuilder.java
@@ -27,7 +27,6 @@ import org.apache.flink.configuration.DeploymentOptions;
 import org.apache.flink.configuration.DeploymentOptionsInternal;
 import org.apache.flink.configuration.HighAvailabilityOptions;
 import org.apache.flink.configuration.JobManagerOptions;
-import org.apache.flink.configuration.MemorySize;
 import org.apache.flink.configuration.PipelineOptions;
 import org.apache.flink.configuration.TaskManagerOptions;
 import org.apache.flink.kubernetes.configuration.KubernetesConfigOptions;
@@ -118,7 +117,13 @@ public class FlinkConfigBuilder {
 
     protected FlinkConfigBuilder applyImage() {
         if (!StringUtils.isNullOrWhitespaceOnly(spec.getImage())) {
-            effectiveConfig.set(KubernetesConfigOptions.CONTAINER_IMAGE, 
spec.getImage());
+            String configKey;
+            if (spec.getFlinkVersion().isNewerVersionThan(FlinkVersion.v1_16)) 
{
+                configKey = KubernetesConfigOptions.CONTAINER_IMAGE.key();
+            } else {
+                configKey = "kubernetes.container.image";
+            }
+            effectiveConfig.setString(configKey, spec.getImage());
         }
         return this;
     }
@@ -187,9 +192,8 @@ public class FlinkConfigBuilder {
 
     protected FlinkConfigBuilder applyCommonPodTemplate() throws IOException {
         if (spec.getPodTemplate() != null) {
-            effectiveConfig.set(
-                    KubernetesConfigOptions.KUBERNETES_POD_TEMPLATE,
-                    createTempFile(spec.getPodTemplate()));
+            effectiveConfig.setString(
+                    "kubernetes.pod-template-file", 
createTempFile(spec.getPodTemplate()));
         }
         return this;
     }
@@ -379,24 +383,41 @@ public class FlinkConfigBuilder {
         }
     }
 
-    private static void setResource(
-            Resource resource, Configuration effectiveConfig, boolean isJM) {
+    private void setResource(Resource resource, Configuration effectiveConfig, 
boolean isJM) {
         if (resource != null) {
-            final ConfigOption<MemorySize> memoryConfigOption =
+            var memoryConfigOption =
                     isJM
                             ? JobManagerOptions.TOTAL_PROCESS_MEMORY
                             : TaskManagerOptions.TOTAL_PROCESS_MEMORY;
-            final ConfigOption<Double> cpuConfigOption =
-                    isJM
-                            ? KubernetesConfigOptions.JOB_MANAGER_CPU
-                            : KubernetesConfigOptions.TASK_MANAGER_CPU;
             if (resource.getMemory() != null) {
                 effectiveConfig.setString(memoryConfigOption.key(), 
resource.getMemory());
             }
-            if (resource.getCpu() != null) {
-                effectiveConfig.setDouble(cpuConfigOption.key(), 
resource.getCpu());
+
+            configureCpu(resource, effectiveConfig, isJM);
+        }
+    }
+
+    private void configureCpu(Resource resource, Configuration conf, boolean 
isJM) {
+        if (resource.getCpu() == null) {
+            return;
+        }
+
+        boolean newConfKeys = 
spec.getFlinkVersion().isNewerVersionThan(FlinkVersion.v1_16);
+        String configKey;
+        if (isJM) {
+            if (newConfKeys) {
+                configKey = KubernetesConfigOptions.JOB_MANAGER_CPU.key();
+            } else {
+                configKey = "kubernetes.jobmanager.cpu";
+            }
+        } else {
+            if (newConfKeys) {
+                configKey = KubernetesConfigOptions.TASK_MANAGER_CPU.key();
+            } else {
+                configKey = "kubernetes.taskmanager.cpu";
             }
         }
+        conf.setDouble(configKey, resource.getCpu());
     }
 
     private static void setPodTemplate(
diff --git 
a/flink-kubernetes-operator/src/main/java/org/apache/flink/kubernetes/operator/service/AbstractFlinkService.java
 
b/flink-kubernetes-operator/src/main/java/org/apache/flink/kubernetes/operator/service/AbstractFlinkService.java
index aef44c6f..90866abd 100644
--- 
a/flink-kubernetes-operator/src/main/java/org/apache/flink/kubernetes/operator/service/AbstractFlinkService.java
+++ 
b/flink-kubernetes-operator/src/main/java/org/apache/flink/kubernetes/operator/service/AbstractFlinkService.java
@@ -722,6 +722,9 @@ public abstract class AbstractFlinkService implements 
FlinkService {
                             savepoint,
                             
conf.get(FLINK_VERSION).isNewerVersionThan(FlinkVersion.v1_14)
                                     ? RestoreMode.DEFAULT
+                                    : null,
+                            
conf.get(FLINK_VERSION).isNewerVersionThan(FlinkVersion.v1_16)
+                                    ? conf.toMap()
                                     : null);
             LOG.info("Submitting job: {} to session cluster.", 
jobID.toHexString());
             return clusterClient
@@ -740,7 +743,8 @@ public abstract class AbstractFlinkService implements 
FlinkService {
         }
     }
 
-    private JarUploadResponseBody uploadJar(
+    @VisibleForTesting
+    protected JarUploadResponseBody uploadJar(
             ObjectMeta objectMeta, FlinkSessionJobSpec spec, Configuration 
conf) throws Exception {
         String targetDir = artifactManager.generateJarDir(objectMeta, spec);
         File jarFile = artifactManager.fetch(findJarURI(spec.getJob()), conf, 
targetDir);
@@ -788,7 +792,8 @@ public abstract class AbstractFlinkService implements 
FlinkService {
         }
     }
 
-    private void deleteJar(Configuration conf, String jarId) {
+    @VisibleForTesting
+    protected void deleteJar(Configuration conf, String jarId) {
         LOG.debug("Deleting the jar: {}", jarId);
         try (RestClusterClient<String> clusterClient =
                 (RestClusterClient<String>) getClusterClient(conf)) {
diff --git 
a/flink-kubernetes-operator/src/main/java/org/apache/flink/kubernetes/operator/service/FlinkResourceContextFactory.java
 
b/flink-kubernetes-operator/src/main/java/org/apache/flink/kubernetes/operator/service/FlinkResourceContextFactory.java
index a20e46cf..9ba9ca3f 100644
--- 
a/flink-kubernetes-operator/src/main/java/org/apache/flink/kubernetes/operator/service/FlinkResourceContextFactory.java
+++ 
b/flink-kubernetes-operator/src/main/java/org/apache/flink/kubernetes/operator/service/FlinkResourceContextFactory.java
@@ -97,10 +97,10 @@ public class FlinkResourceContextFactory {
                 mode -> {
                     switch (mode) {
                         case NATIVE:
-                            LOG.info("Using NativeFlinkService");
+                            LOG.debug("Using NativeFlinkService");
                             return new NativeFlinkService(kubernetesClient, 
configManager);
                         case STANDALONE:
-                            LOG.info("Using StandaloneFlinkService");
+                            LOG.debug("Using StandaloneFlinkService");
                             return new 
StandaloneFlinkService(kubernetesClient, configManager);
                         default:
                             throw new UnsupportedOperationException(
@@ -111,7 +111,6 @@ public class FlinkResourceContextFactory {
 
     @VisibleForTesting
     protected FlinkService getOrCreateFlinkService(FlinkDeployment deployment) 
{
-        LOG.info("Getting service for {}", deployment.getMetadata().getName());
         return getOrCreateFlinkService(getDeploymentMode(deployment));
     }
 
diff --git a/flink-kubernetes-operator/src/main/resources/META-INF/NOTICE 
b/flink-kubernetes-operator/src/main/resources/META-INF/NOTICE
index 81159f45..efd84882 100644
--- a/flink-kubernetes-operator/src/main/resources/META-INF/NOTICE
+++ b/flink-kubernetes-operator/src/main/resources/META-INF/NOTICE
@@ -20,33 +20,33 @@ This project bundles the following dependencies under the 
Apache Software Licens
 - commons-cli:commons-cli:1.5.0
 - commons-collections:commons-collections:3.2.2
 - commons-io:commons-io:2.11.0
-- io.fabric8:kubernetes-client-api:jar:6.5.1
-- io.fabric8:kubernetes-client:jar:6.5.1
-- io.fabric8:kubernetes-httpclient-okhttp:jar:6.5.1
-- io.fabric8:kubernetes-model-admissionregistration:jar:6.5.1
-- io.fabric8:kubernetes-model-apiextensions:jar:6.5.1
-- io.fabric8:kubernetes-model-apps:jar:6.5.1
-- io.fabric8:kubernetes-model-autoscaling:jar:6.5.1
-- io.fabric8:kubernetes-model-batch:jar:6.5.1
-- io.fabric8:kubernetes-model-certificates:jar:6.5.1
-- io.fabric8:kubernetes-model-common:jar:6.5.1
-- io.fabric8:kubernetes-model-coordination:jar:6.5.1
-- io.fabric8:kubernetes-model-core:jar:6.5.1
-- io.fabric8:kubernetes-model-discovery:jar:6.5.1
-- io.fabric8:kubernetes-model-events:jar:6.5.1
-- io.fabric8:kubernetes-model-extensions:jar:6.5.1
-- io.fabric8:kubernetes-model-flowcontrol:jar:6.5.1
-- io.fabric8:kubernetes-model-gatewayapi:jar:6.5.1
-- io.fabric8:kubernetes-model-metrics:jar:6.5.1
-- io.fabric8:kubernetes-model-networking:jar:6.5.1
-- io.fabric8:kubernetes-model-node:jar:6.5.1
-- io.fabric8:kubernetes-model-policy:jar:6.5.1
-- io.fabric8:kubernetes-model-rbac:jar:6.5.1
-- io.fabric8:kubernetes-model-scheduling:jar:6.5.1
-- io.fabric8:kubernetes-model-storageclass:jar:6.5.1
+- io.fabric8:kubernetes-client-api:jar:6.7.0
+- io.fabric8:kubernetes-client:jar:6.7.0
+- io.fabric8:kubernetes-httpclient-okhttp:jar:6.7.0
+- io.fabric8:kubernetes-model-admissionregistration:jar:6.7.0
+- io.fabric8:kubernetes-model-apiextensions:jar:6.7.0
+- io.fabric8:kubernetes-model-apps:jar:6.7.0
+- io.fabric8:kubernetes-model-autoscaling:jar:6.7.0
+- io.fabric8:kubernetes-model-batch:jar:6.7.0
+- io.fabric8:kubernetes-model-certificates:jar:6.7.0
+- io.fabric8:kubernetes-model-common:jar:6.7.0
+- io.fabric8:kubernetes-model-coordination:jar:6.7.0
+- io.fabric8:kubernetes-model-core:jar:6.7.0
+- io.fabric8:kubernetes-model-discovery:jar:6.7.0
+- io.fabric8:kubernetes-model-events:jar:6.7.0
+- io.fabric8:kubernetes-model-extensions:jar:6.7.0
+- io.fabric8:kubernetes-model-flowcontrol:jar:6.7.0
+- io.fabric8:kubernetes-model-gatewayapi:jar:6.7.0
+- io.fabric8:kubernetes-model-metrics:jar:6.7.0
+- io.fabric8:kubernetes-model-networking:jar:6.7.0
+- io.fabric8:kubernetes-model-node:jar:6.7.0
+- io.fabric8:kubernetes-model-policy:jar:6.7.0
+- io.fabric8:kubernetes-model-rbac:jar:6.7.0
+- io.fabric8:kubernetes-model-scheduling:jar:6.7.0
+- io.fabric8:kubernetes-model-storageclass:jar:6.7.0
 - io.fabric8:zjsonpatch:jar:0.3.0
-- io.javaoperatorsdk:operator-framework-core:jar:4.3.0
-- io.javaoperatorsdk:operator-framework:jar:4.3.0
+- io.javaoperatorsdk:operator-framework-core:jar:4.3.5
+- io.javaoperatorsdk:operator-framework:jar:4.3.5
 - org.apache.commons:commons-compress:1.21
 - org.apache.commons:commons-lang3:3.12.0
 - org.apache.commons:commons-math3:3.6.1
diff --git 
a/flink-kubernetes-operator/src/test/java/org/apache/flink/kubernetes/operator/config/FlinkConfigBuilderTest.java
 
b/flink-kubernetes-operator/src/test/java/org/apache/flink/kubernetes/operator/config/FlinkConfigBuilderTest.java
index a90fe373..1bb05337 100644
--- 
a/flink-kubernetes-operator/src/test/java/org/apache/flink/kubernetes/operator/config/FlinkConfigBuilderTest.java
+++ 
b/flink-kubernetes-operator/src/test/java/org/apache/flink/kubernetes/operator/config/FlinkConfigBuilderTest.java
@@ -76,6 +76,8 @@ import static 
org.apache.flink.kubernetes.operator.config.FlinkConfigBuilder.DEF
 import static 
org.apache.flink.kubernetes.utils.Constants.CONFIG_FILE_LOG4J_NAME;
 import static org.hamcrest.MatcherAssert.assertThat;
 import static org.hamcrest.Matchers.containsInAnyOrder;
+import static org.junit.jupiter.api.Assertions.assertEquals;
+import static org.junit.jupiter.api.Assertions.assertNotNull;
 
 /** FlinkConfigBuilderTest. */
 public class FlinkConfigBuilderTest {
@@ -105,7 +107,7 @@ public class FlinkConfigBuilderTest {
     public void testApplyImage() {
         final Configuration configuration =
                 new FlinkConfigBuilder(flinkDeployment, new 
Configuration()).applyImage().build();
-        Assertions.assertEquals(IMAGE, 
configuration.get(KubernetesConfigOptions.CONTAINER_IMAGE));
+        assertEquals(IMAGE, 
configuration.get(KubernetesConfigOptions.CONTAINER_IMAGE));
     }
 
     @Test
@@ -114,7 +116,7 @@ public class FlinkConfigBuilderTest {
                 new FlinkConfigBuilder(flinkDeployment, new Configuration())
                         .applyImagePullPolicy()
                         .build();
-        Assertions.assertEquals(
+        assertEquals(
                 IMAGE_POLICY,
                 
configuration.get(KubernetesConfigOptions.CONTAINER_IMAGE_PULL_POLICY).toString());
     }
@@ -125,12 +127,12 @@ public class FlinkConfigBuilderTest {
                 new FlinkConfigBuilder(flinkDeployment, new Configuration())
                         .applyFlinkConfiguration()
                         .build();
-        Assertions.assertEquals(2, (int) 
configuration.get(TaskManagerOptions.NUM_TASK_SLOTS));
-        Assertions.assertEquals(
+        assertEquals(2, (int) 
configuration.get(TaskManagerOptions.NUM_TASK_SLOTS));
+        assertEquals(
                 KubernetesConfigOptions.ServiceExposedType.ClusterIP,
                 
configuration.get(KubernetesConfigOptions.REST_SERVICE_EXPOSED_TYPE));
-        Assertions.assertEquals(false, 
configuration.get(WebOptions.CANCEL_ENABLE));
-        Assertions.assertEquals(
+        assertEquals(false, configuration.get(WebOptions.CANCEL_ENABLE));
+        assertEquals(
                 flinkDeployment.getMetadata().getName(), 
configuration.get(PipelineOptions.NAME));
 
         FlinkDeployment deployment = 
ReconciliationUtils.clone(flinkDeployment);
@@ -145,7 +147,7 @@ public class FlinkConfigBuilderTest {
                 new FlinkConfigBuilder(deployment, new Configuration())
                         .applyFlinkConfiguration()
                         .build();
-        Assertions.assertEquals(
+        assertEquals(
                 KubernetesConfigOptions.ServiceExposedType.LoadBalancer,
                 
configuration.get(KubernetesConfigOptions.REST_SERVICE_EXPOSED_TYPE));
 
@@ -160,7 +162,7 @@ public class FlinkConfigBuilderTest {
                                                         .getCanonicalName()))
                         .applyFlinkConfiguration()
                         .build();
-        Assertions.assertEquals(
+        assertEquals(
                 DEFAULT_CHECKPOINTING_INTERVAL,
                 
configuration.get(ExecutionCheckpointingOptions.CHECKPOINTING_INTERVAL));
 
@@ -169,7 +171,7 @@ public class FlinkConfigBuilderTest {
                 new FlinkConfigBuilder(deployment, new Configuration())
                         .applyFlinkConfiguration()
                         .build();
-        Assertions.assertEquals(false, 
configuration.get(WebOptions.CANCEL_ENABLE));
+        assertEquals(false, configuration.get(WebOptions.CANCEL_ENABLE));
     }
 
     @ParameterizedTest
@@ -200,7 +202,7 @@ public class FlinkConfigBuilderTest {
                         configuration.get(DeploymentOptionsInternal.CONF_DIR),
                         CONFIG_FILE_LOG4J_NAME);
         Assertions.assertTrue(log4jFile.exists() && log4jFile.isFile() && 
log4jFile.canRead());
-        Assertions.assertEquals(CUSTOM_LOG_CONFIG, 
Files.readString(log4jFile.toPath()));
+        assertEquals(CUSTOM_LOG_CONFIG, Files.readString(log4jFile.toPath()));
     }
 
     @Test
@@ -221,8 +223,8 @@ public class FlinkConfigBuilderTest {
                                 configuration.getString(
                                         
KubernetesConfigOptions.TASK_MANAGER_POD_TEMPLATE)),
                         Pod.class);
-        Assertions.assertEquals("container0", 
jmPod.getSpec().getContainers().get(0).getName());
-        Assertions.assertEquals("container0", 
tmPod.getSpec().getContainers().get(0).getName());
+        assertEquals("container0", 
jmPod.getSpec().getContainers().get(0).getName());
+        assertEquals("container0", 
tmPod.getSpec().getContainers().get(0).getName());
 
         flinkDeployment.getSpec().setPodTemplate(null);
         flinkDeployment.getSpec().setTaskManager(null);
@@ -262,7 +264,7 @@ public class FlinkConfigBuilderTest {
                 new FlinkConfigBuilder(flinkDeployment, new Configuration())
                         .applyIngressDomain()
                         .build();
-        Assertions.assertEquals(
+        assertEquals(
                 KubernetesConfigOptions.ServiceExposedType.ClusterIP,
                 
configuration.get(KubernetesConfigOptions.REST_SERVICE_EXPOSED_TYPE));
     }
@@ -273,11 +275,32 @@ public class FlinkConfigBuilderTest {
                 new FlinkConfigBuilder(flinkDeployment, new Configuration())
                         .applyServiceAccount()
                         .build();
-        Assertions.assertEquals(
+        assertEquals(
                 SERVICE_ACCOUNT,
                 
configuration.get(KubernetesConfigOptions.KUBERNETES_SERVICE_ACCOUNT));
     }
 
+    @Test
+    public void testDeprecatedConfigKeys() throws Exception {
+        FlinkDeployment deploymentClone = 
ReconciliationUtils.clone(flinkDeployment);
+
+        // We must use deprecated configs for 1.16 and before
+        deploymentClone.getSpec().setFlinkVersion(FlinkVersion.v1_16);
+        deploymentClone.getSpec().setPodTemplate(new Pod());
+        Configuration configuration =
+                new FlinkConfigBuilder(deploymentClone, new Configuration())
+                        .applyJobManagerSpec()
+                        .applyTaskManagerSpec()
+                        .applyCommonPodTemplate()
+                        .build();
+
+        var confMap = configuration.toMap();
+
+        assertEquals("1.0", confMap.get("kubernetes.jobmanager.cpu"));
+        assertEquals("1.0", confMap.get("kubernetes.taskmanager.cpu"));
+        assertNotNull(confMap.get("kubernetes.pod-template-file"));
+    }
+
     @Test
     public void testApplyJobManagerSpec() throws Exception {
         FlinkDeployment deploymentClone = 
ReconciliationUtils.clone(flinkDeployment);
@@ -287,12 +310,11 @@ public class FlinkConfigBuilderTest {
                         .applyJobManagerSpec()
                         .build();
 
-        Assertions.assertEquals(
+        assertEquals(
                 MemorySize.parse("2048m"),
                 configuration.get(JobManagerOptions.TOTAL_PROCESS_MEMORY));
-        Assertions.assertEquals(
-                Double.valueOf(1), 
configuration.get(KubernetesConfigOptions.JOB_MANAGER_CPU));
-        Assertions.assertEquals(
+        assertEquals(Double.valueOf(1), 
configuration.get(KubernetesConfigOptions.JOB_MANAGER_CPU));
+        assertEquals(
                 Integer.valueOf(2),
                 
configuration.get(KubernetesConfigOptions.KUBERNETES_JOBMANAGER_REPLICAS));
 
@@ -350,7 +372,7 @@ public class FlinkConfigBuilderTest {
                                 configuration.getString(
                                         
KubernetesConfigOptions.JOB_MANAGER_POD_TEMPLATE)),
                         Pod.class);
-        Assertions.assertEquals("pod1 api version", jmPod.getApiVersion());
+        assertEquals("pod1 api version", jmPod.getApiVersion());
         
assertMainContainerEphemeralStorage(jmPod.getSpec().getContainers().get(1), 
"2G");
     }
 
@@ -363,10 +385,10 @@ public class FlinkConfigBuilderTest {
                         .applyTaskManagerSpec()
                         .build();
 
-        Assertions.assertEquals(
+        assertEquals(
                 MemorySize.parse("2048m"),
                 configuration.get(TaskManagerOptions.TOTAL_PROCESS_MEMORY));
-        Assertions.assertEquals(
+        assertEquals(
                 Double.valueOf(1), 
configuration.get(KubernetesConfigOptions.TASK_MANAGER_CPU));
 
         Pod tmPod =
@@ -423,7 +445,7 @@ public class FlinkConfigBuilderTest {
                                 configuration.getString(
                                         
KubernetesConfigOptions.TASK_MANAGER_POD_TEMPLATE)),
                         Pod.class);
-        Assertions.assertEquals("pod2 api version", tmPod.getApiVersion());
+        assertEquals("pod2 api version", tmPod.getApiVersion());
         
assertMainContainerEphemeralStorage(tmPod.getSpec().getContainers().get(1), 
"2G");
     }
 
@@ -438,13 +460,12 @@ public class FlinkConfigBuilderTest {
                         .build();
         Assertions.assertTrue(
                 
configuration.getBoolean(SavepointConfigOptions.SAVEPOINT_IGNORE_UNCLAIMED_STATE));
-        Assertions.assertEquals(
+        assertEquals(
                 KubernetesDeploymentTarget.APPLICATION.getName(),
                 configuration.get(DeploymentOptions.TARGET));
-        Assertions.assertEquals(SAMPLE_JAR, 
configuration.get(PipelineOptions.JARS).get(0));
-        Assertions.assertEquals(
-                Integer.valueOf(2), 
configuration.get(CoreOptions.DEFAULT_PARALLELISM));
-        Assertions.assertEquals(
+        assertEquals(SAMPLE_JAR, 
configuration.get(PipelineOptions.JARS).get(0));
+        assertEquals(Integer.valueOf(2), 
configuration.get(CoreOptions.DEFAULT_PARALLELISM));
+        assertEquals(
                 List.of("--test", "123"),
                 configuration.get(ApplicationConfiguration.APPLICATION_ARGS));
 
@@ -458,7 +479,7 @@ public class FlinkConfigBuilderTest {
                         .applyJobOrSessionSpec()
                         .build();
 
-        Assertions.assertEquals(12, 
configuration.get(CoreOptions.DEFAULT_PARALLELISM));
+        assertEquals(12, configuration.get(CoreOptions.DEFAULT_PARALLELISM));
     }
 
     @Test
@@ -521,15 +542,15 @@ public class FlinkConfigBuilderTest {
                         .applyJobOrSessionSpec()
                         .build();
 
-        Assertions.assertEquals("remote", 
configuration.getString(DeploymentOptions.TARGET));
-        Assertions.assertEquals(
+        assertEquals("remote", 
configuration.getString(DeploymentOptions.TARGET));
+        assertEquals(
                 
StandaloneKubernetesConfigOptionsInternal.ClusterMode.APPLICATION,
                 
configuration.get(StandaloneKubernetesConfigOptionsInternal.CLUSTER_MODE));
-        Assertions.assertEquals(6, 
configuration.getInteger(CoreOptions.DEFAULT_PARALLELISM));
-        Assertions.assertEquals(
+        assertEquals(6, 
configuration.getInteger(CoreOptions.DEFAULT_PARALLELISM));
+        assertEquals(
                 entryClass,
                 
configuration.getString(ApplicationConfiguration.APPLICATION_MAIN_CLASS));
-        Assertions.assertEquals(
+        assertEquals(
                 3,
                 configuration.get(
                         
StandaloneKubernetesConfigOptionsInternal.KUBERNETES_TASKMANAGER_REPLICAS));
@@ -547,7 +568,7 @@ public class FlinkConfigBuilderTest {
                         .applyTaskManagerSpec()
                         .applyJobOrSessionSpec()
                         .build();
-        Assertions.assertEquals(
+        assertEquals(
                 5,
                 configuration.get(
                         
StandaloneKubernetesConfigOptionsInternal.KUBERNETES_TASKMANAGER_REPLICAS));
@@ -569,11 +590,11 @@ public class FlinkConfigBuilderTest {
                         .applyJobOrSessionSpec()
                         .build();
 
-        Assertions.assertEquals("remote", 
configuration.getString(DeploymentOptions.TARGET));
-        Assertions.assertEquals(
+        assertEquals("remote", 
configuration.getString(DeploymentOptions.TARGET));
+        assertEquals(
                 StandaloneKubernetesConfigOptionsInternal.ClusterMode.SESSION,
                 
configuration.get(StandaloneKubernetesConfigOptionsInternal.CLUSTER_MODE));
-        Assertions.assertEquals(
+        assertEquals(
                 5,
                 configuration.get(
                         
StandaloneKubernetesConfigOptionsInternal.KUBERNETES_TASKMANAGER_REPLICAS));
@@ -591,8 +612,8 @@ public class FlinkConfigBuilderTest {
         final String clusterId = flinkDeployment.getMetadata().getName();
         // Most configs have been tested by previous unit tests, thus we only 
verify the namespace
         // and clusterId here.
-        Assertions.assertEquals(namespace, 
configuration.get(KubernetesConfigOptions.NAMESPACE));
-        Assertions.assertEquals(clusterId, 
configuration.get(KubernetesConfigOptions.CLUSTER_ID));
+        assertEquals(namespace, 
configuration.get(KubernetesConfigOptions.NAMESPACE));
+        assertEquals(clusterId, 
configuration.get(KubernetesConfigOptions.CLUSTER_ID));
     }
 
     @Test
@@ -600,30 +621,28 @@ public class FlinkConfigBuilderTest {
         Resource resource = 
flinkDeployment.getSpec().getTaskManager().getResource();
 
         Pod pod = FlinkConfigBuilder.applyResourceToPodTemplate(null, 
resource);
-        Assertions.assertEquals(
-                Constants.MAIN_CONTAINER_NAME, 
pod.getSpec().getContainers().get(0).getName());
+        assertEquals(Constants.MAIN_CONTAINER_NAME, 
pod.getSpec().getContainers().get(0).getName());
         
assertMainContainerEphemeralStorage(pod.getSpec().getContainers().get(0), "2G");
 
         Pod podWithMetadata = new Pod();
         ObjectMeta metaData = new ObjectMeta();
         podWithMetadata.setMetadata(metaData);
         pod = FlinkConfigBuilder.applyResourceToPodTemplate(podWithMetadata, 
resource);
-        Assertions.assertEquals(metaData, pod.getMetadata());
-        Assertions.assertEquals(
-                Constants.MAIN_CONTAINER_NAME, 
pod.getSpec().getContainers().get(0).getName());
+        assertEquals(metaData, pod.getMetadata());
+        assertEquals(Constants.MAIN_CONTAINER_NAME, 
pod.getSpec().getContainers().get(0).getName());
         
assertMainContainerEphemeralStorage(pod.getSpec().getContainers().get(0), "2G");
     }
 
     private void assertMainContainerEphemeralStorage(
             Container container, String expectedEphemeralStorage) {
-        Assertions.assertEquals(
+        assertEquals(
                 expectedEphemeralStorage,
                 container
                         .getResources()
                         .getLimits()
                         .get(CrdConstants.EPHEMERAL_STORAGE)
                         .toString());
-        Assertions.assertEquals(
+        assertEquals(
                 expectedEphemeralStorage,
                 container
                         .getResources()
diff --git 
a/flink-kubernetes-operator/src/test/java/org/apache/flink/kubernetes/operator/metrics/KubernetesOperatorMetricGroupTest.java
 
b/flink-kubernetes-operator/src/test/java/org/apache/flink/kubernetes/operator/metrics/KubernetesOperatorMetricGroupTest.java
index 36b42f83..aec62f01 100644
--- 
a/flink-kubernetes-operator/src/test/java/org/apache/flink/kubernetes/operator/metrics/KubernetesOperatorMetricGroupTest.java
+++ 
b/flink-kubernetes-operator/src/test/java/org/apache/flink/kubernetes/operator/metrics/KubernetesOperatorMetricGroupTest.java
@@ -63,7 +63,7 @@ public class KubernetesOperatorMetricGroupTest {
                         "flink-kubernetes-operator"),
                 group.getAllVariables());
 
-        registry.shutdown().get();
+        registry.close();
     }
 
     @Test
@@ -96,7 +96,7 @@ public class KubernetesOperatorMetricGroupTest {
                         "flink-kubernetes-operator"),
                 group.getAllVariables());
 
-        registry.shutdown().get();
+        registry.close();
     }
 
     @Test
@@ -145,7 +145,7 @@ public class KubernetesOperatorMetricGroupTest {
                         "<resourcetype>",
                         "FlinkSessionJob"),
                 resourceGroup.getAllVariables());
-        registry.shutdown().get();
+        registry.close();
     }
 
     private static MetricRegistryConfiguration fromConfiguration(Configuration 
configuration) {
diff --git 
a/flink-kubernetes-operator/src/test/java/org/apache/flink/kubernetes/operator/service/NativeFlinkServiceTest.java
 
b/flink-kubernetes-operator/src/test/java/org/apache/flink/kubernetes/operator/service/NativeFlinkServiceTest.java
index 805cf582..feaf49eb 100644
--- 
a/flink-kubernetes-operator/src/test/java/org/apache/flink/kubernetes/operator/service/NativeFlinkServiceTest.java
+++ 
b/flink-kubernetes-operator/src/test/java/org/apache/flink/kubernetes/operator/service/NativeFlinkServiceTest.java
@@ -29,6 +29,7 @@ import 
org.apache.flink.kubernetes.configuration.KubernetesConfigOptions;
 import org.apache.flink.kubernetes.operator.TestUtils;
 import org.apache.flink.kubernetes.operator.TestingClusterClient;
 import org.apache.flink.kubernetes.operator.api.FlinkDeployment;
+import org.apache.flink.kubernetes.operator.api.spec.FlinkSessionJobSpec;
 import org.apache.flink.kubernetes.operator.api.spec.FlinkVersion;
 import org.apache.flink.kubernetes.operator.api.spec.JobSpec;
 import org.apache.flink.kubernetes.operator.api.spec.UpgradeMode;
@@ -48,6 +49,8 @@ import org.apache.flink.runtime.rest.messages.TriggerId;
 import 
org.apache.flink.runtime.rest.messages.job.savepoints.SavepointTriggerMessageParameters;
 import 
org.apache.flink.runtime.rest.messages.job.savepoints.SavepointTriggerRequestBody;
 import org.apache.flink.runtime.rest.util.RestMapperUtils;
+import org.apache.flink.runtime.webmonitor.handlers.JarRunRequestBody;
+import org.apache.flink.runtime.webmonitor.handlers.JarUploadResponseBody;
 
 import 
org.apache.flink.shaded.jackson2.com.fasterxml.jackson.core.JsonProcessingException;
 import 
org.apache.flink.shaded.jackson2.com.fasterxml.jackson.databind.ObjectMapper;
@@ -512,6 +515,57 @@ public class NativeFlinkServiceTest {
         assertEquals(DeletionPropagation.BACKGROUND, propagation.get(1));
     }
 
+    @Test
+    public void testSendConfigOnRunJar() throws Exception {
+        var jarRuns = new ArrayList<JarRunRequestBody>();
+        var flinkService =
+                new NativeFlinkService(client, configManager) {
+                    @Override
+                    public ClusterClient<String> 
getClusterClient(Configuration conf)
+                            throws Exception {
+                        var client = new TestingClusterClient<String>(conf);
+                        client.setRequestProcessor(
+                                (h, p, b) -> {
+                                    jarRuns.add((JarRunRequestBody) b);
+                                    return 
CompletableFuture.completedFuture(null);
+                                });
+                        return client;
+                    }
+
+                    @Override
+                    protected JarUploadResponseBody uploadJar(
+                            ObjectMeta objectMeta, FlinkSessionJobSpec spec, 
Configuration conf) {
+                        return new JarUploadResponseBody("test");
+                    }
+
+                    @Override
+                    protected void deleteJar(Configuration conf, String jarId) 
{}
+                };
+
+        var session = TestUtils.buildSessionCluster();
+        session.getSpec().setFlinkVersion(FlinkVersion.v1_17);
+        session.getStatus()
+                .getReconciliationStatus()
+                .serializeAndSetLastReconciledSpec(session.getSpec(), session);
+
+        var job = TestUtils.buildSessionJob();
+        var deployConf = configManager.getSessionJobConfig(session, 
job.getSpec());
+        flinkService.submitJobToSessionCluster(job.getMetadata(), 
job.getSpec(), deployConf, null);
+
+        // Make sure that deploy conf was passed to jar run
+        assertEquals(deployConf.toMap(), 
jarRuns.get(0).getFlinkConfiguration().toMap());
+
+        session.getSpec().setFlinkVersion(FlinkVersion.v1_16);
+        session.getStatus()
+                .getReconciliationStatus()
+                .serializeAndSetLastReconciledSpec(session.getSpec(), session);
+
+        deployConf = configManager.getSessionJobConfig(session, job.getSpec());
+        flinkService.submitJobToSessionCluster(job.getMetadata(), 
job.getSpec(), deployConf, null);
+
+        assertTrue(jarRuns.get(1).getFlinkConfiguration().toMap().isEmpty());
+    }
+
     class TestingNativeFlinkService extends NativeFlinkService {
         private Configuration runtimeConfig;
 
diff --git 
a/flink-kubernetes-operator/src/test/resources/log4j2-test.properties 
b/flink-kubernetes-operator/src/test/resources/log4j2-test.properties
index 3b7cec7a..47b66644 100644
--- a/flink-kubernetes-operator/src/test/resources/log4j2-test.properties
+++ b/flink-kubernetes-operator/src/test/resources/log4j2-test.properties
@@ -16,7 +16,7 @@
 # limitations under the License.
 
################################################################################
 
-rootLogger.level = DEBUG
+rootLogger.level = INFO
 rootLogger.appenderRef.console.ref = ConsoleAppender
 
 # Log all infos to the console
diff --git a/flink-kubernetes-standalone/pom.xml 
b/flink-kubernetes-standalone/pom.xml
index ab0f5589..7bfcc3ca 100644
--- a/flink-kubernetes-standalone/pom.xml
+++ b/flink-kubernetes-standalone/pom.xml
@@ -61,7 +61,6 @@ under the License.
         <dependency>
             <groupId>org.junit.jupiter</groupId>
             <artifactId>junit-jupiter-api</artifactId>
-            <version>${junit.jupiter.version}</version>
             <scope>test</scope>
         </dependency>
 
@@ -78,6 +77,13 @@ under the License.
             <version>${fabric8.version}</version>
             <scope>test</scope>
         </dependency>
+
+        <dependency>
+            <groupId>org.apache.logging.log4j</groupId>
+            <artifactId>log4j-slf4j-impl</artifactId>
+            <version>${log4j.version}</version>
+            <scope>test</scope>
+        </dependency>
     </dependencies>
 
     <build>
@@ -103,10 +109,6 @@ under the License.
                                 </includes>
                             </artifactSet>
                             <relocations>
-                                <relocation>
-                                    <pattern>io.fabric8</pattern>
-                                    
<shadedPattern>org.apache.flink.kubernetes.shaded.io.fabric8</shadedPattern>
-                                </relocation>
                                 <!-- FLINK-29384, Since the transitive 
snakeyaml that we are filtering out from
                                     flink-kubernetes was relocated we have to 
bundle a relocated new version -->
                                 <relocation>
diff --git 
a/flink-kubernetes-standalone/src/main/java/org/apache/flink/kubernetes/operator/kubeclient/Fabric8FlinkStandaloneKubeClient.java
 
b/flink-kubernetes-standalone/src/main/java/org/apache/flink/kubernetes/operator/kubeclient/Fabric8FlinkStandaloneKubeClient.java
index 76063e25..677f4c35 100644
--- 
a/flink-kubernetes-standalone/src/main/java/org/apache/flink/kubernetes/operator/kubeclient/Fabric8FlinkStandaloneKubeClient.java
+++ 
b/flink-kubernetes-standalone/src/main/java/org/apache/flink/kubernetes/operator/kubeclient/Fabric8FlinkStandaloneKubeClient.java
@@ -22,11 +22,10 @@ import org.apache.flink.configuration.Configuration;
 import org.apache.flink.kubernetes.configuration.KubernetesConfigOptions;
 import org.apache.flink.kubernetes.kubeclient.Fabric8FlinkKubeClient;
 import org.apache.flink.kubernetes.operator.utils.StandaloneKubernetesUtils;
-
-import io.fabric8.kubernetes.api.model.DeletionPropagation;
-import io.fabric8.kubernetes.api.model.apps.Deployment;
-import io.fabric8.kubernetes.client.DefaultKubernetesClient;
-import io.fabric8.kubernetes.client.NamespacedKubernetesClient;
+import 
org.apache.flink.kubernetes.shaded.io.fabric8.kubernetes.api.model.DeletionPropagation;
+import 
org.apache.flink.kubernetes.shaded.io.fabric8.kubernetes.api.model.apps.Deployment;
+import 
org.apache.flink.kubernetes.shaded.io.fabric8.kubernetes.client.DefaultKubernetesClient;
+import 
org.apache.flink.kubernetes.shaded.io.fabric8.kubernetes.client.NamespacedKubernetesClient;
 
 import java.util.concurrent.ExecutorService;
 
diff --git 
a/flink-kubernetes-standalone/src/main/java/org/apache/flink/kubernetes/operator/kubeclient/FlinkStandaloneKubeClient.java
 
b/flink-kubernetes-standalone/src/main/java/org/apache/flink/kubernetes/operator/kubeclient/FlinkStandaloneKubeClient.java
index 9a7f8207..160d9d8d 100644
--- 
a/flink-kubernetes-standalone/src/main/java/org/apache/flink/kubernetes/operator/kubeclient/FlinkStandaloneKubeClient.java
+++ 
b/flink-kubernetes-standalone/src/main/java/org/apache/flink/kubernetes/operator/kubeclient/FlinkStandaloneKubeClient.java
@@ -18,8 +18,7 @@
 package org.apache.flink.kubernetes.operator.kubeclient;
 
 import org.apache.flink.kubernetes.kubeclient.FlinkKubeClient;
-
-import io.fabric8.kubernetes.api.model.apps.Deployment;
+import 
org.apache.flink.kubernetes.shaded.io.fabric8.kubernetes.api.model.apps.Deployment;
 
 /** Extension of the FlinkKubeClient that is used for Flink standalone 
deployments. */
 public interface FlinkStandaloneKubeClient extends FlinkKubeClient {
diff --git 
a/flink-kubernetes-standalone/src/main/java/org/apache/flink/kubernetes/operator/kubeclient/decorators/CmdStandaloneJobManagerDecorator.java
 
b/flink-kubernetes-standalone/src/main/java/org/apache/flink/kubernetes/operator/kubeclient/decorators/CmdStandaloneJobManagerDecorator.java
index a2353e38..2d063eaf 100644
--- 
a/flink-kubernetes-standalone/src/main/java/org/apache/flink/kubernetes/operator/kubeclient/decorators/CmdStandaloneJobManagerDecorator.java
+++ 
b/flink-kubernetes-standalone/src/main/java/org/apache/flink/kubernetes/operator/kubeclient/decorators/CmdStandaloneJobManagerDecorator.java
@@ -20,11 +20,10 @@ package 
org.apache.flink.kubernetes.operator.kubeclient.decorators;
 import org.apache.flink.kubernetes.kubeclient.FlinkPod;
 import 
org.apache.flink.kubernetes.kubeclient.decorators.AbstractKubernetesStepDecorator;
 import 
org.apache.flink.kubernetes.operator.kubeclient.parameters.StandaloneKubernetesJobManagerParameters;
+import 
org.apache.flink.kubernetes.shaded.io.fabric8.kubernetes.api.model.Container;
+import 
org.apache.flink.kubernetes.shaded.io.fabric8.kubernetes.api.model.ContainerBuilder;
 import org.apache.flink.kubernetes.utils.Constants;
 
-import io.fabric8.kubernetes.api.model.Container;
-import io.fabric8.kubernetes.api.model.ContainerBuilder;
-
 import java.util.ArrayList;
 import java.util.List;
 
diff --git 
a/flink-kubernetes-standalone/src/main/java/org/apache/flink/kubernetes/operator/kubeclient/decorators/CmdStandaloneTaskManagerDecorator.java
 
b/flink-kubernetes-standalone/src/main/java/org/apache/flink/kubernetes/operator/kubeclient/decorators/CmdStandaloneTaskManagerDecorator.java
index 1533e0b2..fc2642b8 100644
--- 
a/flink-kubernetes-standalone/src/main/java/org/apache/flink/kubernetes/operator/kubeclient/decorators/CmdStandaloneTaskManagerDecorator.java
+++ 
b/flink-kubernetes-standalone/src/main/java/org/apache/flink/kubernetes/operator/kubeclient/decorators/CmdStandaloneTaskManagerDecorator.java
@@ -20,9 +20,8 @@ package 
org.apache.flink.kubernetes.operator.kubeclient.decorators;
 import org.apache.flink.kubernetes.kubeclient.FlinkPod;
 import 
org.apache.flink.kubernetes.kubeclient.decorators.AbstractKubernetesStepDecorator;
 import 
org.apache.flink.kubernetes.operator.kubeclient.parameters.StandaloneKubernetesTaskManagerParameters;
-
-import io.fabric8.kubernetes.api.model.Container;
-import io.fabric8.kubernetes.api.model.ContainerBuilder;
+import 
org.apache.flink.kubernetes.shaded.io.fabric8.kubernetes.api.model.Container;
+import 
org.apache.flink.kubernetes.shaded.io.fabric8.kubernetes.api.model.ContainerBuilder;
 
 import static org.apache.flink.util.Preconditions.checkNotNull;
 
diff --git 
a/flink-kubernetes-standalone/src/main/java/org/apache/flink/kubernetes/operator/kubeclient/decorators/InitStandaloneTaskManagerDecorator.java
 
b/flink-kubernetes-standalone/src/main/java/org/apache/flink/kubernetes/operator/kubeclient/decorators/InitStandaloneTaskManagerDecorator.java
index adbc1a95..73f7088e 100644
--- 
a/flink-kubernetes-standalone/src/main/java/org/apache/flink/kubernetes/operator/kubeclient/decorators/InitStandaloneTaskManagerDecorator.java
+++ 
b/flink-kubernetes-standalone/src/main/java/org/apache/flink/kubernetes/operator/kubeclient/decorators/InitStandaloneTaskManagerDecorator.java
@@ -21,17 +21,16 @@ import org.apache.flink.kubernetes.kubeclient.FlinkPod;
 import 
org.apache.flink.kubernetes.kubeclient.decorators.AbstractKubernetesStepDecorator;
 import org.apache.flink.kubernetes.kubeclient.resources.KubernetesToleration;
 import 
org.apache.flink.kubernetes.operator.kubeclient.parameters.StandaloneKubernetesTaskManagerParameters;
+import 
org.apache.flink.kubernetes.shaded.io.fabric8.kubernetes.api.model.Container;
+import 
org.apache.flink.kubernetes.shaded.io.fabric8.kubernetes.api.model.ContainerBuilder;
+import 
org.apache.flink.kubernetes.shaded.io.fabric8.kubernetes.api.model.ContainerPortBuilder;
+import 
org.apache.flink.kubernetes.shaded.io.fabric8.kubernetes.api.model.EnvVar;
+import 
org.apache.flink.kubernetes.shaded.io.fabric8.kubernetes.api.model.EnvVarBuilder;
+import 
org.apache.flink.kubernetes.shaded.io.fabric8.kubernetes.api.model.PodBuilder;
+import 
org.apache.flink.kubernetes.shaded.io.fabric8.kubernetes.api.model.ResourceRequirements;
 import org.apache.flink.kubernetes.utils.Constants;
 import org.apache.flink.kubernetes.utils.KubernetesUtils;
 
-import io.fabric8.kubernetes.api.model.Container;
-import io.fabric8.kubernetes.api.model.ContainerBuilder;
-import io.fabric8.kubernetes.api.model.ContainerPortBuilder;
-import io.fabric8.kubernetes.api.model.EnvVar;
-import io.fabric8.kubernetes.api.model.EnvVarBuilder;
-import io.fabric8.kubernetes.api.model.PodBuilder;
-import io.fabric8.kubernetes.api.model.ResourceRequirements;
-
 import java.util.Collections;
 import java.util.List;
 import java.util.Optional;
diff --git 
a/flink-kubernetes-standalone/src/main/java/org/apache/flink/kubernetes/operator/kubeclient/decorators/UserLibMountDecorator.java
 
b/flink-kubernetes-standalone/src/main/java/org/apache/flink/kubernetes/operator/kubeclient/decorators/UserLibMountDecorator.java
index 686bf0f1..ae92872c 100644
--- 
a/flink-kubernetes-standalone/src/main/java/org/apache/flink/kubernetes/operator/kubeclient/decorators/UserLibMountDecorator.java
+++ 
b/flink-kubernetes-standalone/src/main/java/org/apache/flink/kubernetes/operator/kubeclient/decorators/UserLibMountDecorator.java
@@ -20,14 +20,13 @@ package 
org.apache.flink.kubernetes.operator.kubeclient.decorators;
 import org.apache.flink.kubernetes.kubeclient.FlinkPod;
 import 
org.apache.flink.kubernetes.kubeclient.decorators.AbstractKubernetesStepDecorator;
 import 
org.apache.flink.kubernetes.operator.kubeclient.parameters.StandaloneKubernetesJobManagerParameters;
-
-import io.fabric8.kubernetes.api.model.Container;
-import io.fabric8.kubernetes.api.model.ContainerBuilder;
-import io.fabric8.kubernetes.api.model.Pod;
-import io.fabric8.kubernetes.api.model.PodBuilder;
-import io.fabric8.kubernetes.api.model.Volume;
-import io.fabric8.kubernetes.api.model.VolumeBuilder;
-import io.fabric8.kubernetes.api.model.VolumeMount;
+import 
org.apache.flink.kubernetes.shaded.io.fabric8.kubernetes.api.model.Container;
+import 
org.apache.flink.kubernetes.shaded.io.fabric8.kubernetes.api.model.ContainerBuilder;
+import org.apache.flink.kubernetes.shaded.io.fabric8.kubernetes.api.model.Pod;
+import 
org.apache.flink.kubernetes.shaded.io.fabric8.kubernetes.api.model.PodBuilder;
+import 
org.apache.flink.kubernetes.shaded.io.fabric8.kubernetes.api.model.Volume;
+import 
org.apache.flink.kubernetes.shaded.io.fabric8.kubernetes.api.model.VolumeBuilder;
+import 
org.apache.flink.kubernetes.shaded.io.fabric8.kubernetes.api.model.VolumeMount;
 
 import java.util.List;
 
diff --git 
a/flink-kubernetes-standalone/src/main/java/org/apache/flink/kubernetes/operator/kubeclient/factory/StandaloneKubernetesJobManagerFactory.java
 
b/flink-kubernetes-standalone/src/main/java/org/apache/flink/kubernetes/operator/kubeclient/factory/StandaloneKubernetesJobManagerFactory.java
index f789db93..b01d8b08 100644
--- 
a/flink-kubernetes-standalone/src/main/java/org/apache/flink/kubernetes/operator/kubeclient/factory/StandaloneKubernetesJobManagerFactory.java
+++ 
b/flink-kubernetes-standalone/src/main/java/org/apache/flink/kubernetes/operator/kubeclient/factory/StandaloneKubernetesJobManagerFactory.java
@@ -34,16 +34,15 @@ import 
org.apache.flink.kubernetes.operator.kubeclient.decorators.CmdStandaloneJ
 import 
org.apache.flink.kubernetes.operator.kubeclient.decorators.UserLibMountDecorator;
 import 
org.apache.flink.kubernetes.operator.kubeclient.parameters.StandaloneKubernetesJobManagerParameters;
 import org.apache.flink.kubernetes.operator.utils.StandaloneKubernetesUtils;
+import 
org.apache.flink.kubernetes.shaded.io.fabric8.kubernetes.api.model.Container;
+import 
org.apache.flink.kubernetes.shaded.io.fabric8.kubernetes.api.model.HasMetadata;
+import org.apache.flink.kubernetes.shaded.io.fabric8.kubernetes.api.model.Pod;
+import 
org.apache.flink.kubernetes.shaded.io.fabric8.kubernetes.api.model.PodBuilder;
+import 
org.apache.flink.kubernetes.shaded.io.fabric8.kubernetes.api.model.apps.Deployment;
+import 
org.apache.flink.kubernetes.shaded.io.fabric8.kubernetes.api.model.apps.DeploymentBuilder;
 import org.apache.flink.kubernetes.utils.Constants;
 import org.apache.flink.util.Preconditions;
 
-import io.fabric8.kubernetes.api.model.Container;
-import io.fabric8.kubernetes.api.model.HasMetadata;
-import io.fabric8.kubernetes.api.model.Pod;
-import io.fabric8.kubernetes.api.model.PodBuilder;
-import io.fabric8.kubernetes.api.model.apps.Deployment;
-import io.fabric8.kubernetes.api.model.apps.DeploymentBuilder;
-
 import java.io.IOException;
 import java.util.ArrayList;
 import java.util.List;
diff --git 
a/flink-kubernetes-standalone/src/main/java/org/apache/flink/kubernetes/operator/kubeclient/factory/StandaloneKubernetesTaskManagerFactory.java
 
b/flink-kubernetes-standalone/src/main/java/org/apache/flink/kubernetes/operator/kubeclient/factory/StandaloneKubernetesTaskManagerFactory.java
index 1a8c705d..55ef5dca 100644
--- 
a/flink-kubernetes-standalone/src/main/java/org/apache/flink/kubernetes/operator/kubeclient/factory/StandaloneKubernetesTaskManagerFactory.java
+++ 
b/flink-kubernetes-standalone/src/main/java/org/apache/flink/kubernetes/operator/kubeclient/factory/StandaloneKubernetesTaskManagerFactory.java
@@ -29,14 +29,13 @@ import 
org.apache.flink.kubernetes.operator.kubeclient.decorators.CmdStandaloneT
 import 
org.apache.flink.kubernetes.operator.kubeclient.decorators.InitStandaloneTaskManagerDecorator;
 import 
org.apache.flink.kubernetes.operator.kubeclient.parameters.StandaloneKubernetesTaskManagerParameters;
 import org.apache.flink.kubernetes.operator.utils.StandaloneKubernetesUtils;
+import org.apache.flink.kubernetes.shaded.io.fabric8.kubernetes.api.model.Pod;
+import 
org.apache.flink.kubernetes.shaded.io.fabric8.kubernetes.api.model.PodBuilder;
+import 
org.apache.flink.kubernetes.shaded.io.fabric8.kubernetes.api.model.apps.Deployment;
+import 
org.apache.flink.kubernetes.shaded.io.fabric8.kubernetes.api.model.apps.DeploymentBuilder;
 import org.apache.flink.kubernetes.utils.Constants;
 import org.apache.flink.util.Preconditions;
 
-import io.fabric8.kubernetes.api.model.Pod;
-import io.fabric8.kubernetes.api.model.PodBuilder;
-import io.fabric8.kubernetes.api.model.apps.Deployment;
-import io.fabric8.kubernetes.api.model.apps.DeploymentBuilder;
-
 import java.util.stream.Collectors;
 
 /** Utility class for constructing the TaskManager Deployment when deploying 
in standalone mode. */
diff --git 
a/flink-kubernetes-standalone/src/main/java/org/apache/flink/kubernetes/operator/standalone/KubernetesStandaloneClusterDescriptor.java
 
b/flink-kubernetes-standalone/src/main/java/org/apache/flink/kubernetes/operator/standalone/KubernetesStandaloneClusterDescriptor.java
index 1bed020b..751223f8 100644
--- 
a/flink-kubernetes-standalone/src/main/java/org/apache/flink/kubernetes/operator/standalone/KubernetesStandaloneClusterDescriptor.java
+++ 
b/flink-kubernetes-standalone/src/main/java/org/apache/flink/kubernetes/operator/standalone/KubernetesStandaloneClusterDescriptor.java
@@ -41,6 +41,7 @@ import 
org.apache.flink.kubernetes.operator.kubeclient.factory.StandaloneKuberne
 import 
org.apache.flink.kubernetes.operator.kubeclient.factory.StandaloneKubernetesTaskManagerFactory;
 import 
org.apache.flink.kubernetes.operator.kubeclient.parameters.StandaloneKubernetesJobManagerParameters;
 import 
org.apache.flink.kubernetes.operator.kubeclient.parameters.StandaloneKubernetesTaskManagerParameters;
+import 
org.apache.flink.kubernetes.shaded.io.fabric8.kubernetes.api.model.apps.Deployment;
 import org.apache.flink.kubernetes.utils.Constants;
 import org.apache.flink.kubernetes.utils.KubernetesUtils;
 import org.apache.flink.runtime.highavailability.HighAvailabilityServicesUtils;
@@ -48,7 +49,6 @@ import 
org.apache.flink.runtime.highavailability.nonha.standalone.StandaloneClie
 import org.apache.flink.runtime.jobmanager.HighAvailabilityMode;
 import org.apache.flink.runtime.rpc.AddressResolution;
 
-import io.fabric8.kubernetes.api.model.apps.Deployment;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 
diff --git 
a/flink-kubernetes-standalone/src/test/java/org/apache/flink/kubernetes/operator/kubeclient/Fabric8FlinkStandaloneKubeClientTest.java
 
b/flink-kubernetes-standalone/src/test/java/org/apache/flink/kubernetes/operator/kubeclient/Fabric8FlinkStandaloneKubeClientTest.java
index 9c52eb32..671a07d0 100644
--- 
a/flink-kubernetes-standalone/src/test/java/org/apache/flink/kubernetes/operator/kubeclient/Fabric8FlinkStandaloneKubeClientTest.java
+++ 
b/flink-kubernetes-standalone/src/test/java/org/apache/flink/kubernetes/operator/kubeclient/Fabric8FlinkStandaloneKubeClientTest.java
@@ -26,10 +26,13 @@ import 
org.apache.flink.kubernetes.operator.kubeclient.factory.StandaloneKuberne
 import 
org.apache.flink.kubernetes.operator.kubeclient.parameters.StandaloneKubernetesJobManagerParameters;
 import 
org.apache.flink.kubernetes.operator.kubeclient.parameters.StandaloneKubernetesTaskManagerParameters;
 import org.apache.flink.kubernetes.operator.kubeclient.utils.TestUtils;
+import 
org.apache.flink.kubernetes.shaded.io.fabric8.kubernetes.api.model.apps.Deployment;
+import org.apache.flink.kubernetes.shaded.io.fabric8.kubernetes.client.Config;
+import 
org.apache.flink.kubernetes.shaded.io.fabric8.kubernetes.client.ConfigBuilder;
+import 
org.apache.flink.kubernetes.shaded.io.fabric8.kubernetes.client.DefaultKubernetesClient;
+import 
org.apache.flink.kubernetes.shaded.io.fabric8.kubernetes.client.NamespacedKubernetesClient;
 import org.apache.flink.util.concurrent.Executors;
 
-import io.fabric8.kubernetes.api.model.apps.Deployment;
-import io.fabric8.kubernetes.client.NamespacedKubernetesClient;
 import io.fabric8.kubernetes.client.server.mock.EnableKubernetesMockClient;
 import io.fabric8.kubernetes.client.server.mock.KubernetesMockServer;
 import org.junit.jupiter.api.BeforeEach;
@@ -37,15 +40,13 @@ import org.junit.jupiter.api.Test;
 
 import java.util.List;
 
+import static 
org.apache.flink.kubernetes.operator.kubeclient.utils.TestUtils.TEST_NAMESPACE;
 import static org.junit.jupiter.api.Assertions.assertEquals;
 
 /** @link Fabric8FlinkStandaloneKubeClient unit tests */
-@EnableKubernetesMockClient(crud = true)
+@EnableKubernetesMockClient(crud = true, https = false)
 public class Fabric8FlinkStandaloneKubeClientTest {
-    private static final String NAMESPACE = "test";
-
-    KubernetesMockServer mockServer;
-    protected NamespacedKubernetesClient kubernetesClient;
+    KubernetesMockServer mockWebServer;
     private FlinkStandaloneKubeClient flinkKubeClient;
     private StandaloneKubernetesTaskManagerParameters taskManagerParameters;
     private Deployment tmDeployment;
@@ -55,11 +56,10 @@ public class Fabric8FlinkStandaloneKubeClientTest {
     @BeforeEach
     public final void setup() {
         flinkConfig = TestUtils.createTestFlinkConfig();
-        kubernetesClient = mockServer.createClient();
 
         flinkKubeClient =
                 new Fabric8FlinkStandaloneKubeClient(
-                        flinkConfig, kubernetesClient, 
Executors.newDirectExecutorService());
+                        flinkConfig, getClient(), 
Executors.newDirectExecutorService());
         clusterSpecification = TestUtils.createClusterSpecification();
 
         taskManagerParameters =
@@ -73,9 +73,8 @@ public class Fabric8FlinkStandaloneKubeClientTest {
     @Test
     public void testCreateTaskManagerDeployment() {
         flinkKubeClient.createTaskManagerDeployment(tmDeployment);
-
         final List<Deployment> resultedDeployments =
-                
kubernetesClient.apps().deployments().inNamespace(NAMESPACE).list().getItems();
+                
getClient().apps().deployments().inNamespace(TEST_NAMESPACE).list().getItems();
         assertEquals(1, resultedDeployments.size());
     }
 
@@ -92,13 +91,22 @@ public class Fabric8FlinkStandaloneKubeClientTest {
         flinkKubeClient.createTaskManagerDeployment(tmDeployment);
 
         List<Deployment> resultedDeployments =
-                
kubernetesClient.apps().deployments().inNamespace(NAMESPACE).list().getItems();
+                
getClient().apps().deployments().inNamespace(TEST_NAMESPACE).list().getItems();
         assertEquals(2, resultedDeployments.size());
 
         
flinkKubeClient.stopAndCleanupCluster(taskManagerParameters.getClusterId());
 
         resultedDeployments =
-                
kubernetesClient.apps().deployments().inNamespace(NAMESPACE).list().getItems();
+                
getClient().apps().deployments().inNamespace(TEST_NAMESPACE).list().getItems();
         assertEquals(0, resultedDeployments.size());
     }
+
+    private NamespacedKubernetesClient getClient() {
+        var config =
+                new ConfigBuilder(Config.empty())
+                        .withMasterUrl(mockWebServer.url("/").toString())
+                        .withHttp2Disable(true)
+                        .build();
+        return new DefaultKubernetesClient(config).inNamespace(TEST_NAMESPACE);
+    }
 }
diff --git 
a/flink-kubernetes-standalone/src/test/java/org/apache/flink/kubernetes/operator/kubeclient/decorators/InitStandaloneTaskManagerDecoratorTest.java
 
b/flink-kubernetes-standalone/src/test/java/org/apache/flink/kubernetes/operator/kubeclient/decorators/InitStandaloneTaskManagerDecoratorTest.java
index 36483adf..49da3158 100644
--- 
a/flink-kubernetes-standalone/src/test/java/org/apache/flink/kubernetes/operator/kubeclient/decorators/InitStandaloneTaskManagerDecoratorTest.java
+++ 
b/flink-kubernetes-standalone/src/test/java/org/apache/flink/kubernetes/operator/kubeclient/decorators/InitStandaloneTaskManagerDecoratorTest.java
@@ -23,16 +23,16 @@ import 
org.apache.flink.kubernetes.operator.kubeclient.parameters.ParametersTest
 import 
org.apache.flink.kubernetes.operator.kubeclient.parameters.StandaloneKubernetesTaskManagerParameters;
 import org.apache.flink.kubernetes.operator.kubeclient.utils.TestUtils;
 import org.apache.flink.kubernetes.operator.utils.StandaloneKubernetesUtils;
+import 
org.apache.flink.kubernetes.shaded.io.fabric8.kubernetes.api.model.Container;
+import 
org.apache.flink.kubernetes.shaded.io.fabric8.kubernetes.api.model.ContainerPort;
+import 
org.apache.flink.kubernetes.shaded.io.fabric8.kubernetes.api.model.ContainerPortBuilder;
+import 
org.apache.flink.kubernetes.shaded.io.fabric8.kubernetes.api.model.EnvVar;
+import 
org.apache.flink.kubernetes.shaded.io.fabric8.kubernetes.api.model.LocalObjectReference;
+import org.apache.flink.kubernetes.shaded.io.fabric8.kubernetes.api.model.Pod;
+import 
org.apache.flink.kubernetes.shaded.io.fabric8.kubernetes.api.model.Quantity;
+import 
org.apache.flink.kubernetes.shaded.io.fabric8.kubernetes.api.model.ResourceRequirements;
 import org.apache.flink.kubernetes.utils.Constants;
 
-import io.fabric8.kubernetes.api.model.Container;
-import io.fabric8.kubernetes.api.model.ContainerPort;
-import io.fabric8.kubernetes.api.model.ContainerPortBuilder;
-import io.fabric8.kubernetes.api.model.EnvVar;
-import io.fabric8.kubernetes.api.model.LocalObjectReference;
-import io.fabric8.kubernetes.api.model.Pod;
-import io.fabric8.kubernetes.api.model.Quantity;
-import io.fabric8.kubernetes.api.model.ResourceRequirements;
 import org.junit.jupiter.api.BeforeEach;
 import org.junit.jupiter.api.Test;
 
diff --git 
a/flink-kubernetes-standalone/src/test/java/org/apache/flink/kubernetes/operator/kubeclient/decorators/UserLibMountDecoratorTest.java
 
b/flink-kubernetes-standalone/src/test/java/org/apache/flink/kubernetes/operator/kubeclient/decorators/UserLibMountDecoratorTest.java
index ba229965..1ff98df3 100644
--- 
a/flink-kubernetes-standalone/src/test/java/org/apache/flink/kubernetes/operator/kubeclient/decorators/UserLibMountDecoratorTest.java
+++ 
b/flink-kubernetes-standalone/src/test/java/org/apache/flink/kubernetes/operator/kubeclient/decorators/UserLibMountDecoratorTest.java
@@ -23,12 +23,12 @@ import org.apache.flink.configuration.PipelineOptions;
 import org.apache.flink.kubernetes.kubeclient.FlinkPod;
 import 
org.apache.flink.kubernetes.operator.kubeclient.parameters.StandaloneKubernetesJobManagerParameters;
 import 
org.apache.flink.kubernetes.operator.standalone.StandaloneKubernetesConfigOptionsInternal;
+import 
org.apache.flink.kubernetes.shaded.io.fabric8.kubernetes.api.model.ContainerBuilder;
+import 
org.apache.flink.kubernetes.shaded.io.fabric8.kubernetes.api.model.PodBuilder;
+import 
org.apache.flink.kubernetes.shaded.io.fabric8.kubernetes.api.model.PodSpecBuilder;
+import 
org.apache.flink.kubernetes.shaded.io.fabric8.kubernetes.api.model.VolumeBuilder;
+import 
org.apache.flink.kubernetes.shaded.io.fabric8.kubernetes.api.model.VolumeMount;
 
-import io.fabric8.kubernetes.api.model.ContainerBuilder;
-import io.fabric8.kubernetes.api.model.PodBuilder;
-import io.fabric8.kubernetes.api.model.PodSpecBuilder;
-import io.fabric8.kubernetes.api.model.VolumeBuilder;
-import io.fabric8.kubernetes.api.model.VolumeMount;
 import org.junit.jupiter.api.Test;
 
 import java.util.Collections;
diff --git 
a/flink-kubernetes-standalone/src/test/java/org/apache/flink/kubernetes/operator/kubeclient/factory/StandaloneKubernetesJobManagerFactoryTest.java
 
b/flink-kubernetes-standalone/src/test/java/org/apache/flink/kubernetes/operator/kubeclient/factory/StandaloneKubernetesJobManagerFactoryTest.java
index 4e341c58..29883805 100644
--- 
a/flink-kubernetes-standalone/src/test/java/org/apache/flink/kubernetes/operator/kubeclient/factory/StandaloneKubernetesJobManagerFactoryTest.java
+++ 
b/flink-kubernetes-standalone/src/test/java/org/apache/flink/kubernetes/operator/kubeclient/factory/StandaloneKubernetesJobManagerFactoryTest.java
@@ -30,20 +30,20 @@ import 
org.apache.flink.kubernetes.operator.kubeclient.parameters.ParametersTest
 import 
org.apache.flink.kubernetes.operator.kubeclient.parameters.StandaloneKubernetesJobManagerParameters;
 import org.apache.flink.kubernetes.operator.kubeclient.utils.TestUtils;
 import org.apache.flink.kubernetes.operator.utils.StandaloneKubernetesUtils;
+import 
org.apache.flink.kubernetes.shaded.io.fabric8.kubernetes.api.model.ConfigMap;
+import 
org.apache.flink.kubernetes.shaded.io.fabric8.kubernetes.api.model.Container;
+import 
org.apache.flink.kubernetes.shaded.io.fabric8.kubernetes.api.model.ContainerPort;
+import 
org.apache.flink.kubernetes.shaded.io.fabric8.kubernetes.api.model.ContainerPortBuilder;
+import 
org.apache.flink.kubernetes.shaded.io.fabric8.kubernetes.api.model.HasMetadata;
+import 
org.apache.flink.kubernetes.shaded.io.fabric8.kubernetes.api.model.ObjectMeta;
+import 
org.apache.flink.kubernetes.shaded.io.fabric8.kubernetes.api.model.OwnerReference;
+import 
org.apache.flink.kubernetes.shaded.io.fabric8.kubernetes.api.model.PodSpec;
+import 
org.apache.flink.kubernetes.shaded.io.fabric8.kubernetes.api.model.Quantity;
+import 
org.apache.flink.kubernetes.shaded.io.fabric8.kubernetes.api.model.ResourceRequirements;
+import 
org.apache.flink.kubernetes.shaded.io.fabric8.kubernetes.api.model.Service;
+import 
org.apache.flink.kubernetes.shaded.io.fabric8.kubernetes.api.model.apps.DeploymentSpec;
 import org.apache.flink.kubernetes.utils.Constants;
 
-import io.fabric8.kubernetes.api.model.ConfigMap;
-import io.fabric8.kubernetes.api.model.Container;
-import io.fabric8.kubernetes.api.model.ContainerPort;
-import io.fabric8.kubernetes.api.model.ContainerPortBuilder;
-import io.fabric8.kubernetes.api.model.HasMetadata;
-import io.fabric8.kubernetes.api.model.ObjectMeta;
-import io.fabric8.kubernetes.api.model.OwnerReference;
-import io.fabric8.kubernetes.api.model.PodSpec;
-import io.fabric8.kubernetes.api.model.Quantity;
-import io.fabric8.kubernetes.api.model.ResourceRequirements;
-import io.fabric8.kubernetes.api.model.Service;
-import io.fabric8.kubernetes.api.model.apps.DeploymentSpec;
 import org.junit.jupiter.api.BeforeEach;
 import org.junit.jupiter.api.Test;
 
diff --git 
a/flink-kubernetes-standalone/src/test/java/org/apache/flink/kubernetes/operator/kubeclient/factory/StandaloneKubernetesTaskManagerFactoryTest.java
 
b/flink-kubernetes-standalone/src/test/java/org/apache/flink/kubernetes/operator/kubeclient/factory/StandaloneKubernetesTaskManagerFactoryTest.java
index 194cfa67..86271b40 100644
--- 
a/flink-kubernetes-standalone/src/test/java/org/apache/flink/kubernetes/operator/kubeclient/factory/StandaloneKubernetesTaskManagerFactoryTest.java
+++ 
b/flink-kubernetes-standalone/src/test/java/org/apache/flink/kubernetes/operator/kubeclient/factory/StandaloneKubernetesTaskManagerFactoryTest.java
@@ -23,17 +23,17 @@ import 
org.apache.flink.kubernetes.operator.kubeclient.parameters.ParametersTest
 import 
org.apache.flink.kubernetes.operator.kubeclient.parameters.StandaloneKubernetesTaskManagerParameters;
 import org.apache.flink.kubernetes.operator.kubeclient.utils.TestUtils;
 import org.apache.flink.kubernetes.operator.utils.StandaloneKubernetesUtils;
+import 
org.apache.flink.kubernetes.shaded.io.fabric8.kubernetes.api.model.Container;
+import 
org.apache.flink.kubernetes.shaded.io.fabric8.kubernetes.api.model.ContainerPort;
+import 
org.apache.flink.kubernetes.shaded.io.fabric8.kubernetes.api.model.ContainerPortBuilder;
+import 
org.apache.flink.kubernetes.shaded.io.fabric8.kubernetes.api.model.ObjectMeta;
+import 
org.apache.flink.kubernetes.shaded.io.fabric8.kubernetes.api.model.OwnerReference;
+import 
org.apache.flink.kubernetes.shaded.io.fabric8.kubernetes.api.model.PodSpec;
+import 
org.apache.flink.kubernetes.shaded.io.fabric8.kubernetes.api.model.Quantity;
+import 
org.apache.flink.kubernetes.shaded.io.fabric8.kubernetes.api.model.ResourceRequirements;
+import 
org.apache.flink.kubernetes.shaded.io.fabric8.kubernetes.api.model.apps.Deployment;
 import org.apache.flink.kubernetes.utils.Constants;
 
-import io.fabric8.kubernetes.api.model.Container;
-import io.fabric8.kubernetes.api.model.ContainerPort;
-import io.fabric8.kubernetes.api.model.ContainerPortBuilder;
-import io.fabric8.kubernetes.api.model.ObjectMeta;
-import io.fabric8.kubernetes.api.model.OwnerReference;
-import io.fabric8.kubernetes.api.model.PodSpec;
-import io.fabric8.kubernetes.api.model.Quantity;
-import io.fabric8.kubernetes.api.model.ResourceRequirements;
-import io.fabric8.kubernetes.api.model.apps.Deployment;
 import org.junit.jupiter.api.BeforeEach;
 import org.junit.jupiter.api.Test;
 
diff --git 
a/flink-kubernetes-standalone/src/test/java/org/apache/flink/kubernetes/operator/kubeclient/parameters/ParametersTestBase.java
 
b/flink-kubernetes-standalone/src/test/java/org/apache/flink/kubernetes/operator/kubeclient/parameters/ParametersTestBase.java
index c8fb5372..0bac2b15 100644
--- 
a/flink-kubernetes-standalone/src/test/java/org/apache/flink/kubernetes/operator/kubeclient/parameters/ParametersTestBase.java
+++ 
b/flink-kubernetes-standalone/src/test/java/org/apache/flink/kubernetes/operator/kubeclient/parameters/ParametersTestBase.java
@@ -22,17 +22,16 @@ import org.apache.flink.configuration.Configuration;
 import org.apache.flink.kubernetes.configuration.KubernetesConfigOptions;
 import org.apache.flink.kubernetes.kubeclient.FlinkPod;
 import org.apache.flink.kubernetes.operator.kubeclient.utils.TestUtils;
+import 
org.apache.flink.kubernetes.shaded.io.fabric8.kubernetes.api.model.Container;
+import 
org.apache.flink.kubernetes.shaded.io.fabric8.kubernetes.api.model.ContainerBuilder;
+import 
org.apache.flink.kubernetes.shaded.io.fabric8.kubernetes.api.model.ContainerPortBuilder;
+import 
org.apache.flink.kubernetes.shaded.io.fabric8.kubernetes.api.model.EnvVar;
+import 
org.apache.flink.kubernetes.shaded.io.fabric8.kubernetes.api.model.EnvVarBuilder;
+import 
org.apache.flink.kubernetes.shaded.io.fabric8.kubernetes.api.model.LocalObjectReference;
+import 
org.apache.flink.kubernetes.shaded.io.fabric8.kubernetes.api.model.PodBuilder;
+import 
org.apache.flink.kubernetes.shaded.io.fabric8.kubernetes.api.model.ResourceRequirements;
 import org.apache.flink.kubernetes.utils.KubernetesUtils;
 
-import io.fabric8.kubernetes.api.model.Container;
-import io.fabric8.kubernetes.api.model.ContainerBuilder;
-import io.fabric8.kubernetes.api.model.ContainerPortBuilder;
-import io.fabric8.kubernetes.api.model.EnvVar;
-import io.fabric8.kubernetes.api.model.EnvVarBuilder;
-import io.fabric8.kubernetes.api.model.LocalObjectReference;
-import io.fabric8.kubernetes.api.model.PodBuilder;
-import io.fabric8.kubernetes.api.model.ResourceRequirements;
-
 import java.util.ArrayList;
 import java.util.Arrays;
 import java.util.Collections;
diff --git 
a/flink-kubernetes-standalone/src/test/java/org/apache/flink/kubernetes/operator/standalone/KubernetesStandaloneClusterDescriptorTest.java
 
b/flink-kubernetes-standalone/src/test/java/org/apache/flink/kubernetes/operator/standalone/KubernetesStandaloneClusterDescriptorTest.java
index 5a0d6a0e..84ba9ce6 100644
--- 
a/flink-kubernetes-standalone/src/test/java/org/apache/flink/kubernetes/operator/standalone/KubernetesStandaloneClusterDescriptorTest.java
+++ 
b/flink-kubernetes-standalone/src/test/java/org/apache/flink/kubernetes/operator/standalone/KubernetesStandaloneClusterDescriptorTest.java
@@ -27,11 +27,14 @@ import 
org.apache.flink.kubernetes.kubeclient.decorators.ExternalServiceDecorato
 import 
org.apache.flink.kubernetes.operator.kubeclient.Fabric8FlinkStandaloneKubeClient;
 import 
org.apache.flink.kubernetes.operator.kubeclient.FlinkStandaloneKubeClient;
 import org.apache.flink.kubernetes.operator.kubeclient.utils.TestUtils;
+import 
org.apache.flink.kubernetes.shaded.io.fabric8.kubernetes.api.model.apps.Deployment;
+import org.apache.flink.kubernetes.shaded.io.fabric8.kubernetes.client.Config;
+import 
org.apache.flink.kubernetes.shaded.io.fabric8.kubernetes.client.ConfigBuilder;
+import 
org.apache.flink.kubernetes.shaded.io.fabric8.kubernetes.client.DefaultKubernetesClient;
+import 
org.apache.flink.kubernetes.shaded.io.fabric8.kubernetes.client.NamespacedKubernetesClient;
 import org.apache.flink.kubernetes.utils.Constants;
 import org.apache.flink.util.concurrent.Executors;
 
-import io.fabric8.kubernetes.api.model.apps.Deployment;
-import io.fabric8.kubernetes.client.NamespacedKubernetesClient;
 import io.fabric8.kubernetes.client.server.mock.EnableKubernetesMockClient;
 import io.fabric8.kubernetes.client.server.mock.KubernetesMockServer;
 import org.junit.jupiter.api.BeforeEach;
@@ -40,28 +43,26 @@ import org.junit.jupiter.api.Test;
 import java.util.List;
 import java.util.stream.Collectors;
 
+import static 
org.apache.flink.kubernetes.operator.kubeclient.utils.TestUtils.TEST_NAMESPACE;
 import static org.hamcrest.MatcherAssert.assertThat;
 import static org.hamcrest.Matchers.containsInAnyOrder;
 import static org.junit.jupiter.api.Assertions.assertEquals;
 import static org.junit.jupiter.api.Assertions.assertTrue;
 
 /** @link KubernetesStandaloneClusterDescriptor unit tests */
-@EnableKubernetesMockClient(crud = true)
+@EnableKubernetesMockClient(crud = true, https = false)
 public class KubernetesStandaloneClusterDescriptorTest {
-
+    KubernetesMockServer mockWebServer;
     private KubernetesStandaloneClusterDescriptor clusterDescriptor;
-    KubernetesMockServer mockServer;
-    private NamespacedKubernetesClient kubernetesClient;
     private FlinkStandaloneKubeClient flinkKubeClient;
     private Configuration flinkConfig = new Configuration();
 
     @BeforeEach
     public void setup() {
         flinkConfig = TestUtils.createTestFlinkConfig();
-        kubernetesClient = 
mockServer.createClient().inNamespace(TestUtils.TEST_NAMESPACE);
         flinkKubeClient =
                 new Fabric8FlinkStandaloneKubeClient(
-                        flinkConfig, kubernetesClient, 
Executors.newDirectExecutorService());
+                        flinkConfig, getClient(), 
Executors.newDirectExecutorService());
 
         clusterDescriptor = new 
KubernetesStandaloneClusterDescriptor(flinkConfig, flinkKubeClient);
     }
@@ -77,7 +78,7 @@ public class KubernetesStandaloneClusterDescriptorTest {
         var clusterClientProvider = 
clusterDescriptor.deploySessionCluster(clusterSpecification);
 
         List<Deployment> deployments =
-                kubernetesClient
+                getClient()
                         .apps()
                         .deployments()
                         .inNamespace(TestUtils.TEST_NAMESPACE)
@@ -134,7 +135,7 @@ public class KubernetesStandaloneClusterDescriptorTest {
                         
ApplicationConfiguration.fromConfiguration(flinkConfig));
 
         List<Deployment> deployments =
-                kubernetesClient
+                getClient()
                         .apps()
                         .deployments()
                         .inNamespace(TestUtils.TEST_NAMESPACE)
@@ -185,7 +186,7 @@ public class KubernetesStandaloneClusterDescriptorTest {
                 clusterSpecification,
                 new ApplicationConfiguration(new String[] {"--test", "123"}, 
"test"));
         List<Deployment> deployments =
-                kubernetesClient
+                getClient()
                         .apps()
                         .deployments()
                         .inNamespace(TestUtils.TEST_NAMESPACE)
@@ -206,4 +207,13 @@ public class KubernetesStandaloneClusterDescriptorTest {
                 
jmDeployment.getSpec().getTemplate().getSpec().getContainers().stream()
                         .anyMatch(c -> c.getArgs().contains("123")));
     }
+
+    private NamespacedKubernetesClient getClient() {
+        var config =
+                new ConfigBuilder(Config.empty())
+                        .withMasterUrl(mockWebServer.url("/").toString())
+                        .withHttp2Disable(true)
+                        .build();
+        return new DefaultKubernetesClient(config).inNamespace(TEST_NAMESPACE);
+    }
 }
diff --git 
a/flink-kubernetes-operator/src/test/resources/log4j2-test.properties 
b/flink-kubernetes-standalone/src/test/resources/log4j2-test.properties
similarity index 98%
copy from flink-kubernetes-operator/src/test/resources/log4j2-test.properties
copy to flink-kubernetes-standalone/src/test/resources/log4j2-test.properties
index 3b7cec7a..47b66644 100644
--- a/flink-kubernetes-operator/src/test/resources/log4j2-test.properties
+++ b/flink-kubernetes-standalone/src/test/resources/log4j2-test.properties
@@ -16,7 +16,7 @@
 # limitations under the License.
 
################################################################################
 
-rootLogger.level = DEBUG
+rootLogger.level = INFO
 rootLogger.appenderRef.console.ref = ConsoleAppender
 
 # Log all infos to the console
diff --git a/flink-kubernetes-webhook/pom.xml b/flink-kubernetes-webhook/pom.xml
index cc3c5b7a..38efd7dd 100644
--- a/flink-kubernetes-webhook/pom.xml
+++ b/flink-kubernetes-webhook/pom.xml
@@ -63,7 +63,6 @@ under the License.
         <dependency>
             <groupId>org.junit.jupiter</groupId>
             <artifactId>junit-jupiter-api</artifactId>
-            <version>${junit.jupiter.version}</version>
             <scope>test</scope>
         </dependency>
 
diff --git a/pom.xml b/pom.xml
index 4cb02154..b1ec3820 100644
--- a/pom.xml
+++ b/pom.xml
@@ -73,19 +73,19 @@ under the License.
         <maven-javadoc-plugin.version>3.3.2</maven-javadoc-plugin.version>
         
<git-commit-id-maven-plugin.version>5.0.0</git-commit-id-maven-plugin.version>
 
-        <operator.sdk.version>4.3.0</operator.sdk.version>
+        <operator.sdk.version>4.3.5</operator.sdk.version>
         
<operator.sdk.admission-controller.version>0.2.0</operator.sdk.admission-controller.version>
+        <operator.sdk.jenvtest.version>0.9.1</operator.sdk.jenvtest.version>
 
-        <fabric8.version>6.5.1</fabric8.version>
+        <fabric8.version>6.7.0</fabric8.version>
 
         <lombok.version>1.18.22</lombok.version>
         <commons-lang3.version>3.12.0</commons-lang3.version>
         <commons-io.version>2.11.0</commons-io.version>
-        <flink.version>1.16.1</flink.version>
+        <flink.version>1.17.1</flink.version>
 
         <slf4j.version>1.7.36</slf4j.version>
         <log4j.version>2.17.1</log4j.version>
-        <junit.jupiter.version>5.8.2</junit.jupiter.version>
 
         <spotless.version>2.4.2</spotless.version>
         <it.skip>true</it.skip>
@@ -105,6 +105,13 @@ under the License.
                 <scope>import</scope>
                 <version>2.15.0</version>
             </dependency>
+            <dependency>
+                <groupId>org.junit</groupId>
+                <artifactId>junit-bom</artifactId>
+                <version>5.9.3</version>
+                <type>pom</type>
+                <scope>import</scope>
+            </dependency>
         </dependencies>
     </dependencyManagement>
 


Reply via email to