This is an automated email from the ASF dual-hosted git repository.

gyfora pushed a commit to branch main
in repository https://gitbox.apache.org/repos/asf/flink-kubernetes-operator.git

commit ca848f9da9de9f075b0e12e00b057d6c8a85b771
Author: Usamah Jassat <[email protected]>
AuthorDate: Tue Apr 26 18:41:41 2022 +0100

    [FLINK-27443] Add k8s parameters and decorators for standalone JM and TM 
pods
---
 flink-kubernetes-shaded/pom.xml                    |  22 +++
 .../pom.xml                                        |  80 +++++-----
 .../CmdStandaloneJobManagerDecorator.java          |  84 ++++++++++
 .../CmdStandaloneTaskManagerDecorator.java         |  51 ++++++
 .../InitStandaloneTaskManagerDecorator.java        | 145 +++++++++++++++++
 .../decorators/UserLibMountDecorator.java          |  81 ++++++++++
 .../StandaloneKubernetesJobManagerParameters.java  |  83 ++++++++++
 .../StandaloneKubernetesTaskManagerParameters.java | 138 ++++++++++++++++
 .../StandaloneKubernetesConfigOptionsInternal.java |  48 ++++++
 .../operator/utils/StandaloneKubernetesUtils.java  |  64 ++++++++
 .../CmdStandaloneJobManagerDecoratorTest.java      |  82 ++++++++++
 .../CmdStandaloneTaskManagerDecoratorTest.java     |  64 ++++++++
 .../InitStandaloneTaskManagerDecoratorTest.java    | 176 +++++++++++++++++++++
 .../decorators/UserLibMountDecoratorTest.java      |  80 ++++++++++
 .../kubeclient/parameters/ParametersTestBase.java  | 145 +++++++++++++++++
 ...andaloneKubernetesJobManagerParametersTest.java | 105 ++++++++++++
 ...ndaloneKubernetesTaskManagerParametersTest.java | 159 +++++++++++++++++++
 .../operator/kubeclient/utils/TestUtils.java       |  92 +++++++++++
 pom.xml                                            |   6 +-
 19 files changed, 1667 insertions(+), 38 deletions(-)

diff --git a/flink-kubernetes-shaded/pom.xml b/flink-kubernetes-shaded/pom.xml
index 7fb94ef1..543043b3 100644
--- a/flink-kubernetes-shaded/pom.xml
+++ b/flink-kubernetes-shaded/pom.xml
@@ -43,6 +43,17 @@ under the License.
                 </exclusion>
             </exclusions>
         </dependency>
+        <dependency>
+            <groupId>org.apache.flink</groupId>
+            <artifactId>flink-kubernetes-standalone</artifactId>
+            <version>${project.version}</version>
+            <exclusions>
+                <exclusion>
+                    <groupId>*</groupId>
+                    <artifactId>*</artifactId>
+                </exclusion>
+            </exclusions>
+        </dependency>
     </dependencies>
 
     <build>
@@ -50,6 +61,7 @@ under the License.
             <plugin>
                 <groupId>org.apache.maven.plugins</groupId>
                 <artifactId>maven-shade-plugin</artifactId>
+                <version>3.3.0</version>
                 <executions>
                     <execution>
                         <id>shade-flink-operator</id>
@@ -69,6 +81,16 @@ under the License.
                                     
<shadedPattern>org.apache.flink.kubernetes.shaded.io.fabric8</shadedPattern>
                                 </relocation>
                             </relocations>
+                            <filters>
+                                <filter>
+                                    <artifact>*:*</artifact>
+                                    <excludes>
+                                        
<exclude>META-INF/DEPENDENCIES</exclude>
+                                        <exclude>META-INF/LICENSE</exclude>
+                                        <exclude>META-INF/MANIFEST.MF</exclude>
+                                    </excludes>
+                                </filter>
+                            </filters>
                         </configuration>
                     </execution>
                 </executions>
diff --git a/flink-kubernetes-shaded/pom.xml 
b/flink-kubernetes-standalone/pom.xml
similarity index 50%
copy from flink-kubernetes-shaded/pom.xml
copy to flink-kubernetes-standalone/pom.xml
index 7fb94ef1..f2a1c2f6 100644
--- a/flink-kubernetes-shaded/pom.xml
+++ b/flink-kubernetes-standalone/pom.xml
@@ -20,18 +20,27 @@ under the License.
          xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 
http://maven.apache.org/xsd/maven-4.0.0.xsd";>
     <modelVersion>4.0.0</modelVersion>
 
+
     <parent>
-      <groupId>org.apache.flink</groupId>
-      <artifactId>flink-kubernetes-operator-parent</artifactId>
-      <version>1.2-SNAPSHOT</version>
-      <relativePath>..</relativePath>
+        <artifactId>flink-kubernetes-operator-parent</artifactId>
+        <groupId>org.apache.flink</groupId>
+        <version>1.2-SNAPSHOT</version>
+        <relativePath>..</relativePath>
     </parent>
 
-    <artifactId>flink-kubernetes-shaded</artifactId>
-    <name>Flink Kubernetes Shaded</name>
+
+
+    <artifactId>flink-kubernetes-standalone</artifactId>
+    <name>Flink Kubernetes Standalone</name>
     <packaging>jar</packaging>
 
     <dependencies>
+        <dependency>
+            <groupId>io.fabric8</groupId>
+            <artifactId>kubernetes-client</artifactId>
+            <version>${fabric8.version}</version>
+        </dependency>
+
         <dependency>
             <groupId>org.apache.flink</groupId>
             <artifactId>flink-kubernetes</artifactId>
@@ -43,36 +52,33 @@ under the License.
                 </exclusion>
             </exclusions>
         </dependency>
-    </dependencies>
 
-    <build>
-        <plugins>
-            <plugin>
-                <groupId>org.apache.maven.plugins</groupId>
-                <artifactId>maven-shade-plugin</artifactId>
-                <executions>
-                    <execution>
-                        <id>shade-flink-operator</id>
-                        <phase>package</phase>
-                        <goals>
-                            <goal>shade</goal>
-                        </goals>
-                        <configuration>
-                            <artifactSet>
-                                <includes combine.children="append">
-                                    <include>*:*</include>
-                                </includes>
-                            </artifactSet>
-                            <relocations>
-                                <relocation>
-                                    <pattern>io.fabric8</pattern>
-                                    
<shadedPattern>org.apache.flink.kubernetes.shaded.io.fabric8</shadedPattern>
-                                </relocation>
-                            </relocations>
-                        </configuration>
-                    </execution>
-                </executions>
-            </plugin>
-        </plugins>
-    </build>
+        <dependency>
+            <groupId>org.apache.flink</groupId>
+            <artifactId>flink-clients</artifactId>
+            <version>${flink.version}</version>
+        </dependency>
+
+        <!-- Test -->
+        <dependency>
+            <groupId>org.junit.jupiter</groupId>
+            <artifactId>junit-jupiter-api</artifactId>
+            <version>${junit.jupiter.version}</version>
+            <scope>test</scope>
+        </dependency>
+
+        <dependency>
+            <groupId>org.hamcrest</groupId>
+            <artifactId>hamcrest-all</artifactId>
+            <version>${hamcrest.version}</version>
+            <scope>test</scope>
+        </dependency>
+
+        <dependency>
+            <groupId>io.fabric8</groupId>
+            <artifactId>kubernetes-server-mock</artifactId>
+            <version>${fabric8.version}</version>
+            <scope>test</scope>
+        </dependency>
+    </dependencies>
 </project>
diff --git 
a/flink-kubernetes-standalone/src/main/java/org/apache/flink/kubernetes/operator/kubeclient/decorators/CmdStandaloneJobManagerDecorator.java
 
b/flink-kubernetes-standalone/src/main/java/org/apache/flink/kubernetes/operator/kubeclient/decorators/CmdStandaloneJobManagerDecorator.java
new file mode 100644
index 00000000..430abe2e
--- /dev/null
+++ 
b/flink-kubernetes-standalone/src/main/java/org/apache/flink/kubernetes/operator/kubeclient/decorators/CmdStandaloneJobManagerDecorator.java
@@ -0,0 +1,84 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.kubernetes.operator.kubeclient.decorators;
+
+import org.apache.flink.kubernetes.kubeclient.FlinkPod;
+import 
org.apache.flink.kubernetes.kubeclient.decorators.AbstractKubernetesStepDecorator;
+import 
org.apache.flink.kubernetes.operator.kubeclient.parameters.StandaloneKubernetesJobManagerParameters;
+
+import io.fabric8.kubernetes.api.model.Container;
+import io.fabric8.kubernetes.api.model.ContainerBuilder;
+
+import java.util.ArrayList;
+import java.util.List;
+
+import static org.apache.flink.util.Preconditions.checkNotNull;
+
+/**
+ * Attach the command and args to the main container for running the 
JobManager in standalone mode.
+ */
+public class CmdStandaloneJobManagerDecorator extends 
AbstractKubernetesStepDecorator {
+
+    public static final String JOBMANAGER_ENTRYPOINT_ARG = "jobmanager";
+    public static final String APPLICATION_MODE_ARG = "standalone-job";
+
+    private final StandaloneKubernetesJobManagerParameters 
kubernetesJobManagerParameters;
+
+    public CmdStandaloneJobManagerDecorator(
+            StandaloneKubernetesJobManagerParameters 
kubernetesJobManagerParameters) {
+        this.kubernetesJobManagerParameters = 
checkNotNull(kubernetesJobManagerParameters);
+    }
+
+    @Override
+    public FlinkPod decorateFlinkPod(FlinkPod flinkPod) {
+        final Container mainContainerWithStartCmd;
+        if (kubernetesJobManagerParameters.isApplicationCluster()) {
+            mainContainerWithStartCmd = 
decorateApplicationContainer(flinkPod.getMainContainer());
+        } else {
+            mainContainerWithStartCmd = 
decorateSessionContainer(flinkPod.getMainContainer());
+        }
+        return new 
FlinkPod.Builder(flinkPod).withMainContainer(mainContainerWithStartCmd).build();
+    }
+
+    private Container decorateSessionContainer(Container mainContainer) {
+        return new ContainerBuilder(mainContainer)
+                
.withCommand(kubernetesJobManagerParameters.getContainerEntrypoint())
+                .withArgs(JOBMANAGER_ENTRYPOINT_ARG)
+                .build();
+    }
+
+    private Container decorateApplicationContainer(Container mainContainer) {
+        return new ContainerBuilder(mainContainer)
+                
.withCommand(kubernetesJobManagerParameters.getContainerEntrypoint())
+                .withArgs(getApplicationClusterArgs())
+                .build();
+    }
+
+    private List<String> getApplicationClusterArgs() {
+        List<String> args = new ArrayList<>();
+        args.add(APPLICATION_MODE_ARG);
+
+        String mainClass = kubernetesJobManagerParameters.getMainClass();
+        if (mainClass != null) {
+            args.add("--job-classname");
+            args.add(mainClass);
+        }
+
+        return args;
+    }
+}
diff --git 
a/flink-kubernetes-standalone/src/main/java/org/apache/flink/kubernetes/operator/kubeclient/decorators/CmdStandaloneTaskManagerDecorator.java
 
b/flink-kubernetes-standalone/src/main/java/org/apache/flink/kubernetes/operator/kubeclient/decorators/CmdStandaloneTaskManagerDecorator.java
new file mode 100644
index 00000000..1533e0b2
--- /dev/null
+++ 
b/flink-kubernetes-standalone/src/main/java/org/apache/flink/kubernetes/operator/kubeclient/decorators/CmdStandaloneTaskManagerDecorator.java
@@ -0,0 +1,51 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.kubernetes.operator.kubeclient.decorators;
+
+import org.apache.flink.kubernetes.kubeclient.FlinkPod;
+import 
org.apache.flink.kubernetes.kubeclient.decorators.AbstractKubernetesStepDecorator;
+import 
org.apache.flink.kubernetes.operator.kubeclient.parameters.StandaloneKubernetesTaskManagerParameters;
+
+import io.fabric8.kubernetes.api.model.Container;
+import io.fabric8.kubernetes.api.model.ContainerBuilder;
+
+import static org.apache.flink.util.Preconditions.checkNotNull;
+
+/**
+ * Attach the command and args to the main container for running the 
TaskManager in standalone mode.
+ */
+public class CmdStandaloneTaskManagerDecorator extends 
AbstractKubernetesStepDecorator {
+    public static final String TASKMANAGER_ENTRYPOINT_ARG = "taskmanager";
+
+    private final StandaloneKubernetesTaskManagerParameters 
kubernetesTaskManagerParameters;
+
+    public CmdStandaloneTaskManagerDecorator(
+            StandaloneKubernetesTaskManagerParameters 
kubernetesTaskManagerParameters) {
+        this.kubernetesTaskManagerParameters = 
checkNotNull(kubernetesTaskManagerParameters);
+    }
+
+    @Override
+    public FlinkPod decorateFlinkPod(FlinkPod flinkPod) {
+        final Container mainContainerWithStartCmd =
+                new ContainerBuilder(flinkPod.getMainContainer())
+                        
.withCommand(kubernetesTaskManagerParameters.getContainerEntrypoint())
+                        .withArgs(TASKMANAGER_ENTRYPOINT_ARG)
+                        .build();
+        return new 
FlinkPod.Builder(flinkPod).withMainContainer(mainContainerWithStartCmd).build();
+    }
+}
diff --git 
a/flink-kubernetes-standalone/src/main/java/org/apache/flink/kubernetes/operator/kubeclient/decorators/InitStandaloneTaskManagerDecorator.java
 
b/flink-kubernetes-standalone/src/main/java/org/apache/flink/kubernetes/operator/kubeclient/decorators/InitStandaloneTaskManagerDecorator.java
new file mode 100644
index 00000000..adbc1a95
--- /dev/null
+++ 
b/flink-kubernetes-standalone/src/main/java/org/apache/flink/kubernetes/operator/kubeclient/decorators/InitStandaloneTaskManagerDecorator.java
@@ -0,0 +1,145 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.kubernetes.operator.kubeclient.decorators;
+
+import org.apache.flink.kubernetes.kubeclient.FlinkPod;
+import 
org.apache.flink.kubernetes.kubeclient.decorators.AbstractKubernetesStepDecorator;
+import org.apache.flink.kubernetes.kubeclient.resources.KubernetesToleration;
+import 
org.apache.flink.kubernetes.operator.kubeclient.parameters.StandaloneKubernetesTaskManagerParameters;
+import org.apache.flink.kubernetes.utils.Constants;
+import org.apache.flink.kubernetes.utils.KubernetesUtils;
+
+import io.fabric8.kubernetes.api.model.Container;
+import io.fabric8.kubernetes.api.model.ContainerBuilder;
+import io.fabric8.kubernetes.api.model.ContainerPortBuilder;
+import io.fabric8.kubernetes.api.model.EnvVar;
+import io.fabric8.kubernetes.api.model.EnvVarBuilder;
+import io.fabric8.kubernetes.api.model.PodBuilder;
+import io.fabric8.kubernetes.api.model.ResourceRequirements;
+
+import java.util.Collections;
+import java.util.List;
+import java.util.Optional;
+import java.util.stream.Collectors;
+
+import static org.apache.flink.util.Preconditions.checkNotNull;
+
+/**
+ * An initializer for the TaskManager {@link 
org.apache.flink.kubernetes.kubeclient.FlinkPod} in
+ * standalone mode.
+ */
+public class InitStandaloneTaskManagerDecorator extends 
AbstractKubernetesStepDecorator {
+    private final StandaloneKubernetesTaskManagerParameters 
kubernetesTaskManagerParameters;
+
+    public InitStandaloneTaskManagerDecorator(
+            StandaloneKubernetesTaskManagerParameters 
kubernetesTaskManagerParameters) {
+        this.kubernetesTaskManagerParameters = 
checkNotNull(kubernetesTaskManagerParameters);
+    }
+
+    @Override
+    public FlinkPod decorateFlinkPod(FlinkPod flinkPod) {
+        final PodBuilder basicPodBuilder = new 
PodBuilder(flinkPod.getPodWithoutMainContainer());
+
+        // Overwrite fields
+        final String serviceAccountName = 
kubernetesTaskManagerParameters.getServiceAccount();
+
+        basicPodBuilder
+                .withApiVersion(Constants.API_VERSION)
+                .editOrNewSpec()
+                .withServiceAccount(serviceAccountName)
+                .withServiceAccountName(serviceAccountName)
+                .endSpec();
+
+        // Merge fields
+        basicPodBuilder
+                .editOrNewMetadata()
+                .addToLabels(kubernetesTaskManagerParameters.getLabels())
+                
.addToAnnotations(kubernetesTaskManagerParameters.getAnnotations())
+                .endMetadata()
+                .editOrNewSpec()
+                
.addToImagePullSecrets(kubernetesTaskManagerParameters.getImagePullSecrets())
+                
.addToNodeSelector(kubernetesTaskManagerParameters.getNodeSelector())
+                .addAllToTolerations(
+                        
kubernetesTaskManagerParameters.getTolerations().stream()
+                                .map(e -> 
KubernetesToleration.fromMap(e).getInternalResource())
+                                .collect(Collectors.toList()))
+                .endSpec();
+
+        final Container basicMainContainer = 
decorateMainContainer(flinkPod.getMainContainer());
+
+        return new FlinkPod.Builder(flinkPod)
+                .withPod(basicPodBuilder.build())
+                .withMainContainer(basicMainContainer)
+                .build();
+    }
+
+    private Container decorateMainContainer(Container container) {
+        final ContainerBuilder mainContainerBuilder = new 
ContainerBuilder(container);
+
+        // Overwrite fields
+        final ResourceRequirements requirementsInPodTemplate =
+                container.getResources() == null
+                        ? new ResourceRequirements()
+                        : container.getResources();
+        final ResourceRequirements resourceRequirements =
+                KubernetesUtils.getResourceRequirements(
+                        requirementsInPodTemplate,
+                        
kubernetesTaskManagerParameters.getTaskManagerMemoryMB(),
+                        kubernetesTaskManagerParameters.getMemoryLimitFactor(),
+                        kubernetesTaskManagerParameters.getTaskManagerCPU(),
+                        kubernetesTaskManagerParameters.getCpuLimitFactor(),
+                        Collections.emptyMap(),
+                        Collections.emptyMap());
+        final String image = kubernetesTaskManagerParameters.getImage();
+        final String imagePullPolicy = 
kubernetesTaskManagerParameters.getImagePullPolicy().name();
+        mainContainerBuilder
+                .withName(Constants.MAIN_CONTAINER_NAME)
+                .withImage(image)
+                .withImagePullPolicy(imagePullPolicy)
+                .withResources(resourceRequirements);
+
+        // Merge fields
+        mainContainerBuilder
+                .addToPorts(
+                        new ContainerPortBuilder()
+                                .withName(Constants.TASK_MANAGER_RPC_PORT_NAME)
+                                
.withContainerPort(kubernetesTaskManagerParameters.getRPCPort())
+                                .build())
+                .addAllToEnv(getCustomizedEnvs());
+        getFlinkLogDirEnv().ifPresent(mainContainerBuilder::addToEnv);
+
+        return mainContainerBuilder.build();
+    }
+
+    private List<EnvVar> getCustomizedEnvs() {
+        return 
kubernetesTaskManagerParameters.getEnvironments().entrySet().stream()
+                .map(
+                        kv ->
+                                new EnvVarBuilder()
+                                        .withName(kv.getKey())
+                                        .withValue(kv.getValue())
+                                        .build())
+                .collect(Collectors.toList());
+    }
+
+    private Optional<EnvVar> getFlinkLogDirEnv() {
+        return kubernetesTaskManagerParameters
+                .getFlinkLogDirInPod()
+                .map(logDir -> new EnvVar(Constants.ENV_FLINK_LOG_DIR, logDir, 
null));
+    }
+}
diff --git 
a/flink-kubernetes-standalone/src/main/java/org/apache/flink/kubernetes/operator/kubeclient/decorators/UserLibMountDecorator.java
 
b/flink-kubernetes-standalone/src/main/java/org/apache/flink/kubernetes/operator/kubeclient/decorators/UserLibMountDecorator.java
new file mode 100644
index 00000000..a66026b2
--- /dev/null
+++ 
b/flink-kubernetes-standalone/src/main/java/org/apache/flink/kubernetes/operator/kubeclient/decorators/UserLibMountDecorator.java
@@ -0,0 +1,81 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.kubernetes.operator.kubeclient.decorators;
+
+import org.apache.flink.kubernetes.kubeclient.FlinkPod;
+import 
org.apache.flink.kubernetes.kubeclient.decorators.AbstractKubernetesStepDecorator;
+import 
org.apache.flink.kubernetes.operator.kubeclient.parameters.StandaloneKubernetesJobManagerParameters;
+
+import io.fabric8.kubernetes.api.model.Container;
+import io.fabric8.kubernetes.api.model.ContainerBuilder;
+import io.fabric8.kubernetes.api.model.Pod;
+import io.fabric8.kubernetes.api.model.PodBuilder;
+import io.fabric8.kubernetes.api.model.Volume;
+import io.fabric8.kubernetes.api.model.VolumeBuilder;
+
+import static org.apache.flink.util.Preconditions.checkNotNull;
+
+/**
+ * Mount the Flink User Lib directory to enable Flink to pick up a Jars 
defined in
+ * pipeline.classpaths. Used for starting standalone application clusters
+ */
+public class UserLibMountDecorator extends AbstractKubernetesStepDecorator {
+    private static final String USER_LIB_VOLUME = "user-lib-dir";
+    private static final String USER_LIB_PATH = "/opt/flink/usrlib";
+
+    private final StandaloneKubernetesJobManagerParameters 
kubernetesJobManagerParameters;
+
+    public UserLibMountDecorator(
+            StandaloneKubernetesJobManagerParameters 
kubernetesJobManagerParameters) {
+        this.kubernetesJobManagerParameters = 
checkNotNull(kubernetesJobManagerParameters);
+    }
+
+    @Override
+    public FlinkPod decorateFlinkPod(FlinkPod flinkPod) {
+        if (!kubernetesJobManagerParameters.isApplicationCluster()) {
+            return flinkPod;
+        }
+
+        final Volume userLibVolume =
+                new VolumeBuilder()
+                        .withName(USER_LIB_VOLUME)
+                        .withNewEmptyDir()
+                        .endEmptyDir()
+                        .build();
+
+        final Pod pod =
+                new PodBuilder(flinkPod.getPodWithoutMainContainer())
+                        .editSpec()
+                        .addNewVolumeLike(userLibVolume)
+                        .endVolume()
+                        .endSpec()
+                        .build();
+
+        final Container mountedMainContainer =
+                new ContainerBuilder(flinkPod.getMainContainer())
+                        .addNewVolumeMount()
+                        .withName(USER_LIB_VOLUME)
+                        .withMountPath(USER_LIB_PATH)
+                        .endVolumeMount()
+                        .build();
+        return new FlinkPod.Builder(flinkPod)
+                .withPod(pod)
+                .withMainContainer(mountedMainContainer)
+                .build();
+    }
+}
diff --git 
a/flink-kubernetes-standalone/src/main/java/org/apache/flink/kubernetes/operator/kubeclient/parameters/StandaloneKubernetesJobManagerParameters.java
 
b/flink-kubernetes-standalone/src/main/java/org/apache/flink/kubernetes/operator/kubeclient/parameters/StandaloneKubernetesJobManagerParameters.java
new file mode 100644
index 00000000..bdc28d48
--- /dev/null
+++ 
b/flink-kubernetes-standalone/src/main/java/org/apache/flink/kubernetes/operator/kubeclient/parameters/StandaloneKubernetesJobManagerParameters.java
@@ -0,0 +1,83 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.kubernetes.operator.kubeclient.parameters;
+
+import org.apache.flink.client.deployment.ClusterSpecification;
+import org.apache.flink.client.deployment.application.ApplicationConfiguration;
+import org.apache.flink.configuration.Configuration;
+import org.apache.flink.kubernetes.configuration.KubernetesConfigOptions;
+import 
org.apache.flink.kubernetes.kubeclient.parameters.KubernetesJobManagerParameters;
+import 
org.apache.flink.kubernetes.operator.standalone.StandaloneKubernetesConfigOptionsInternal;
+import org.apache.flink.kubernetes.operator.utils.StandaloneKubernetesUtils;
+
+import java.util.Collections;
+import java.util.HashMap;
+import java.util.Map;
+
+/**
+ * A Utility class that helps to parse, verify and manage the Kubernetes 
parameters that are used
+ * for constructing the JobManager deployment used for standalone cluster 
deployments.
+ */
+public class StandaloneKubernetesJobManagerParameters extends 
KubernetesJobManagerParameters {
+
+    public StandaloneKubernetesJobManagerParameters(
+            Configuration flinkConfig, ClusterSpecification 
clusterSpecification) {
+        super(flinkConfig, clusterSpecification);
+    }
+
+    @Override
+    public Map<String, String> getLabels() {
+        final Map<String, String> labels = new HashMap<>();
+        labels.putAll(getSelectors());
+        labels.putAll(
+                flinkConfig
+                        
.getOptional(KubernetesConfigOptions.JOB_MANAGER_LABELS)
+                        .orElse(Collections.emptyMap()));
+        return Collections.unmodifiableMap(labels);
+    }
+
+    @Override
+    public Map<String, String> getSelectors() {
+        return Collections.unmodifiableMap(
+                
StandaloneKubernetesUtils.getJobManagerSelectors(getClusterId()));
+    }
+
+    @Override
+    public Map<String, String> getCommonLabels() {
+        return Collections.unmodifiableMap(
+                StandaloneKubernetesUtils.getCommonLabels(getClusterId()));
+    }
+
+    @Override
+    public boolean isInternalServiceEnabled() {
+        return true;
+    }
+
+    public boolean isApplicationCluster() {
+        return flinkConfig
+                .get(StandaloneKubernetesConfigOptionsInternal.CLUSTER_MODE)
+                
.equals(StandaloneKubernetesConfigOptionsInternal.ClusterMode.APPLICATION);
+    }
+
+    public String getMainClass() {
+        if (!isApplicationCluster()) {
+            return null;
+        }
+        return 
flinkConfig.getString(ApplicationConfiguration.APPLICATION_MAIN_CLASS);
+    }
+}
diff --git 
a/flink-kubernetes-standalone/src/main/java/org/apache/flink/kubernetes/operator/kubeclient/parameters/StandaloneKubernetesTaskManagerParameters.java
 
b/flink-kubernetes-standalone/src/main/java/org/apache/flink/kubernetes/operator/kubeclient/parameters/StandaloneKubernetesTaskManagerParameters.java
new file mode 100644
index 00000000..692614bf
--- /dev/null
+++ 
b/flink-kubernetes-standalone/src/main/java/org/apache/flink/kubernetes/operator/kubeclient/parameters/StandaloneKubernetesTaskManagerParameters.java
@@ -0,0 +1,138 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.kubernetes.operator.kubeclient.parameters;
+
+import org.apache.flink.client.deployment.ClusterSpecification;
+import org.apache.flink.configuration.Configuration;
+import org.apache.flink.configuration.TaskManagerOptions;
+import org.apache.flink.kubernetes.configuration.KubernetesConfigOptions;
+import 
org.apache.flink.kubernetes.kubeclient.parameters.AbstractKubernetesParameters;
+import 
org.apache.flink.kubernetes.operator.standalone.StandaloneKubernetesConfigOptionsInternal;
+import org.apache.flink.kubernetes.operator.utils.StandaloneKubernetesUtils;
+import org.apache.flink.kubernetes.utils.KubernetesUtils;
+import org.apache.flink.util.Preconditions;
+
+import java.io.File;
+import java.util.Collections;
+import java.util.HashMap;
+import java.util.List;
+import java.util.Map;
+import java.util.Optional;
+
+/**
+ * A utility class that helps to parse, verify and manage the Kubernetes 
parameters that are used
+ * for constructing the TaskManager deployment used for standalone deployments.
+ */
+public class StandaloneKubernetesTaskManagerParameters extends 
AbstractKubernetesParameters {
+    private final ClusterSpecification clusterSpecification;
+
+    public StandaloneKubernetesTaskManagerParameters(
+            Configuration flinkConfig, ClusterSpecification 
clusterSpecification) {
+        super(flinkConfig);
+        this.clusterSpecification = clusterSpecification;
+    }
+
+    @Override
+    public Map<String, String> getLabels() {
+        final Map<String, String> labels = new HashMap<>();
+        labels.putAll(
+                flinkConfig
+                        
.getOptional(KubernetesConfigOptions.TASK_MANAGER_LABELS)
+                        .orElse(Collections.emptyMap()));
+        labels.putAll(getSelectors());
+        return Collections.unmodifiableMap(labels);
+    }
+
+    @Override
+    public Map<String, String> getSelectors() {
+        return 
StandaloneKubernetesUtils.getTaskManagerSelectors(getClusterId());
+    }
+
+    @Override
+    public Map<String, String> getNodeSelector() {
+        return Collections.unmodifiableMap(
+                flinkConfig
+                        
.getOptional(KubernetesConfigOptions.TASK_MANAGER_NODE_SELECTOR)
+                        .orElse(Collections.emptyMap()));
+    }
+
+    @Override
+    public Map<String, String> getEnvironments() {
+        // TMs have environment set using the pod template.
+        return new HashMap<>();
+    }
+
+    @Override
+    public Map<String, String> getAnnotations() {
+        return flinkConfig
+                .getOptional(KubernetesConfigOptions.TASK_MANAGER_ANNOTATIONS)
+                .orElse(Collections.emptyMap());
+    }
+
+    @Override
+    public List<Map<String, String>> getTolerations() {
+        return flinkConfig
+                .getOptional(KubernetesConfigOptions.TASK_MANAGER_TOLERATIONS)
+                .orElse(Collections.emptyList());
+    }
+
+    public int getReplicas() {
+        int replicas =
+                flinkConfig.get(
+                        
StandaloneKubernetesConfigOptionsInternal.KUBERNETES_TASKMANAGER_REPLICAS);
+        Preconditions.checkArgument(
+                replicas > 0,
+                "'%s' should not be configured less than 1.",
+                
StandaloneKubernetesConfigOptionsInternal.KUBERNETES_TASKMANAGER_REPLICAS.key());
+        return replicas;
+    }
+
+    public String getServiceAccount() {
+        return 
flinkConfig.get(KubernetesConfigOptions.TASK_MANAGER_SERVICE_ACCOUNT);
+    }
+
+    public int getTaskManagerMemoryMB() {
+        return clusterSpecification.getTaskManagerMemoryMB();
+    }
+
+    public double getTaskManagerCPU() {
+        return flinkConfig.getDouble(KubernetesConfigOptions.TASK_MANAGER_CPU);
+    }
+
+    public int getRPCPort() {
+        final int taskManagerRpcPort =
+                KubernetesUtils.parsePort(flinkConfig, 
TaskManagerOptions.RPC_PORT);
+        Preconditions.checkArgument(
+                taskManagerRpcPort > 0, "%s should not be 0.", 
TaskManagerOptions.RPC_PORT.key());
+        return taskManagerRpcPort;
+    }
+
+    public Optional<File> getPodTemplateFilePath() {
+        return flinkConfig
+                .getOptional(KubernetesConfigOptions.TASK_MANAGER_POD_TEMPLATE)
+                .map(File::new);
+    }
+
+    public double getMemoryLimitFactor() {
+        return 
flinkConfig.get(KubernetesConfigOptions.TASK_MANAGER_MEMORY_LIMIT_FACTOR);
+    }
+
+    public double getCpuLimitFactor() {
+        return 
flinkConfig.get(KubernetesConfigOptions.TASK_MANAGER_CPU_LIMIT_FACTOR);
+    }
+}
diff --git 
a/flink-kubernetes-standalone/src/main/java/org/apache/flink/kubernetes/operator/standalone/StandaloneKubernetesConfigOptionsInternal.java
 
b/flink-kubernetes-standalone/src/main/java/org/apache/flink/kubernetes/operator/standalone/StandaloneKubernetesConfigOptionsInternal.java
new file mode 100644
index 00000000..bb1af0e9
--- /dev/null
+++ 
b/flink-kubernetes-standalone/src/main/java/org/apache/flink/kubernetes/operator/standalone/StandaloneKubernetesConfigOptionsInternal.java
@@ -0,0 +1,48 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.kubernetes.operator.standalone;
+
+import org.apache.flink.configuration.ConfigOption;
+
+import static org.apache.flink.configuration.ConfigOptions.key;
+
+/**
+ * This class holds internal configuration constants used by flink operator 
when deploying flink
+ * clusters in standalone mode.
+ */
+public class StandaloneKubernetesConfigOptionsInternal {
+    public static final ConfigOption<Integer> KUBERNETES_TASKMANAGER_REPLICAS =
+            key("kubernetes.internal.taskmanager.replicas")
+                    .intType()
+                    .defaultValue(1)
+                    .withDescription(
+                            "Specify how many pods will be in the TaskManager 
pool. For "
+                                    + "standalone kubernetes Flink sessions 
clusters.");
+
+    public static final ConfigOption<ClusterMode> CLUSTER_MODE =
+            key("kubernetes.internal.cluster-mode")
+                    .enumType(ClusterMode.class)
+                    .defaultValue(ClusterMode.APPLICATION)
+                    .withDescription("Specify what mode the cluster will be 
deployed in.");
+
+    /** The different modes that a Flink cluster can be deployed in. */
+    public enum ClusterMode {
+        APPLICATION,
+        SESSION
+    }
+}
diff --git 
a/flink-kubernetes-standalone/src/main/java/org/apache/flink/kubernetes/operator/utils/StandaloneKubernetesUtils.java
 
b/flink-kubernetes-standalone/src/main/java/org/apache/flink/kubernetes/operator/utils/StandaloneKubernetesUtils.java
new file mode 100644
index 00000000..ca801e4c
--- /dev/null
+++ 
b/flink-kubernetes-standalone/src/main/java/org/apache/flink/kubernetes/operator/utils/StandaloneKubernetesUtils.java
@@ -0,0 +1,64 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.kubernetes.operator.utils;
+
+import org.apache.flink.kubernetes.utils.Constants;
+
+import java.util.Collections;
+import java.util.HashMap;
+import java.util.Map;
+
+/** Standalone Kubernetes Utils. */
+public class StandaloneKubernetesUtils {
+
+    public static final String LABEL_TYPE_STANDALONE_TYPE = 
"flink-standalone-kubernetes";
+    private static final String TM_DEPLOYMENT_POSTFIX = "-taskmanager";
+
+    public static String getTaskManagerDeploymentName(String clusterId) {
+        return clusterId + TM_DEPLOYMENT_POSTFIX;
+    }
+
+    public static String getJobManagerDeploymentName(String clusterId) {
+        return clusterId;
+    }
+
+    public static Map<String, String> getCommonLabels(String clusterId) {
+        Map<String, String> commonLabels = new HashMap();
+        commonLabels.put(Constants.LABEL_TYPE_KEY, LABEL_TYPE_STANDALONE_TYPE);
+        commonLabels.put(Constants.LABEL_APP_KEY, clusterId);
+        return commonLabels;
+    }
+
+    public static Map<String, String> getTaskManagerSelectors(String 
clusterId) {
+        Map<String, String> labels = getCommonLabels(clusterId);
+        labels.put(Constants.LABEL_COMPONENT_KEY, 
Constants.LABEL_COMPONENT_TASK_MANAGER);
+        return Collections.unmodifiableMap(labels);
+    }
+
+    public static Map<String, String> getJobManagerSelectors(String clusterId) 
{
+        final Map<String, String> labels = getCommonLabels(clusterId);
+        labels.put(Constants.LABEL_COMPONENT_KEY, 
Constants.LABEL_COMPONENT_JOB_MANAGER);
+        return Collections.unmodifiableMap(labels);
+    }
+
+    public static Map<String, String> getConfigMapLabels(String clusterId, 
String type) {
+        Map<String, String> labels = new HashMap(getCommonLabels(clusterId));
+        labels.put("configmap-type", type);
+        return Collections.unmodifiableMap(labels);
+    }
+}
diff --git 
a/flink-kubernetes-standalone/src/test/java/org/apache/flink/kubernetes/operator/kubeclient/decorators/CmdStandaloneJobManagerDecoratorTest.java
 
b/flink-kubernetes-standalone/src/test/java/org/apache/flink/kubernetes/operator/kubeclient/decorators/CmdStandaloneJobManagerDecoratorTest.java
new file mode 100644
index 00000000..49355574
--- /dev/null
+++ 
b/flink-kubernetes-standalone/src/test/java/org/apache/flink/kubernetes/operator/kubeclient/decorators/CmdStandaloneJobManagerDecoratorTest.java
@@ -0,0 +1,82 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.kubernetes.operator.kubeclient.decorators;
+
+import org.apache.flink.client.deployment.ClusterSpecification;
+import org.apache.flink.configuration.Configuration;
+import org.apache.flink.kubernetes.configuration.KubernetesConfigOptions;
+import org.apache.flink.kubernetes.kubeclient.FlinkPod;
+import 
org.apache.flink.kubernetes.operator.kubeclient.parameters.StandaloneKubernetesJobManagerParameters;
+import 
org.apache.flink.kubernetes.operator.standalone.StandaloneKubernetesConfigOptionsInternal;
+
+import org.junit.jupiter.api.BeforeEach;
+import org.junit.jupiter.api.Test;
+
+import static org.hamcrest.MatcherAssert.assertThat;
+import static org.hamcrest.Matchers.containsInAnyOrder;
+
+/** @link CmdStandaloneJobManagerDecorator unit tests */
+public class CmdStandaloneJobManagerDecoratorTest {
+
+    private static final String MOCK_ENTRYPATH = "./docker-entrypath";
+
+    private StandaloneKubernetesJobManagerParameters jmParameters;
+    private Configuration configuration;
+    private CmdStandaloneJobManagerDecorator decorator;
+
+    @BeforeEach
+    public void setup() {
+        configuration = new Configuration();
+        configuration.setString(KubernetesConfigOptions.KUBERNETES_ENTRY_PATH, 
MOCK_ENTRYPATH);
+        jmParameters =
+                new StandaloneKubernetesJobManagerParameters(
+                        configuration,
+                        new ClusterSpecification.ClusterSpecificationBuilder()
+                                .createClusterSpecification());
+
+        decorator = new CmdStandaloneJobManagerDecorator(jmParameters);
+    }
+
+    @Test
+    public void testSessionCommandAdded() {
+        configuration.set(
+                StandaloneKubernetesConfigOptionsInternal.CLUSTER_MODE,
+                StandaloneKubernetesConfigOptionsInternal.ClusterMode.SESSION);
+
+        FlinkPod decoratedPod = decorator.decorateFlinkPod(new 
FlinkPod.Builder().build());
+        assertThat(
+                decoratedPod.getMainContainer().getCommand(), 
containsInAnyOrder(MOCK_ENTRYPATH));
+        assertThat(
+                decoratedPod.getMainContainer().getArgs(),
+                
containsInAnyOrder(CmdStandaloneJobManagerDecorator.JOBMANAGER_ENTRYPOINT_ARG));
+    }
+
+    @Test
+    public void testApplicationCommandAdded() {
+        configuration.set(
+                StandaloneKubernetesConfigOptionsInternal.CLUSTER_MODE,
+                
StandaloneKubernetesConfigOptionsInternal.ClusterMode.APPLICATION);
+
+        FlinkPod decoratedPod = decorator.decorateFlinkPod(new 
FlinkPod.Builder().build());
+        assertThat(
+                decoratedPod.getMainContainer().getCommand(), 
containsInAnyOrder(MOCK_ENTRYPATH));
+        assertThat(
+                decoratedPod.getMainContainer().getArgs(),
+                
containsInAnyOrder(CmdStandaloneJobManagerDecorator.APPLICATION_MODE_ARG));
+    }
+}
diff --git 
a/flink-kubernetes-standalone/src/test/java/org/apache/flink/kubernetes/operator/kubeclient/decorators/CmdStandaloneTaskManagerDecoratorTest.java
 
b/flink-kubernetes-standalone/src/test/java/org/apache/flink/kubernetes/operator/kubeclient/decorators/CmdStandaloneTaskManagerDecoratorTest.java
new file mode 100644
index 00000000..09d623c0
--- /dev/null
+++ 
b/flink-kubernetes-standalone/src/test/java/org/apache/flink/kubernetes/operator/kubeclient/decorators/CmdStandaloneTaskManagerDecoratorTest.java
@@ -0,0 +1,64 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.kubernetes.operator.kubeclient.decorators;
+
+import org.apache.flink.client.deployment.ClusterSpecification;
+import org.apache.flink.configuration.Configuration;
+import org.apache.flink.kubernetes.configuration.KubernetesConfigOptions;
+import org.apache.flink.kubernetes.kubeclient.FlinkPod;
+import 
org.apache.flink.kubernetes.operator.kubeclient.parameters.StandaloneKubernetesTaskManagerParameters;
+
+import org.junit.jupiter.api.BeforeEach;
+import org.junit.jupiter.api.Test;
+
+import static org.hamcrest.MatcherAssert.assertThat;
+import static org.hamcrest.Matchers.containsInAnyOrder;
+
+/** @link CmdStandaloneTaskManagerDecorator unit tests */
+public class CmdStandaloneTaskManagerDecoratorTest {
+
+    private static final String MOCK_ENTRYPATH = "./docker-entrypath";
+
+    private StandaloneKubernetesTaskManagerParameters tmParameters;
+    private Configuration configuration;
+    private CmdStandaloneTaskManagerDecorator decorator;
+
+    @BeforeEach
+    public void setup() {
+        configuration = new Configuration();
+        configuration.setString(KubernetesConfigOptions.KUBERNETES_ENTRY_PATH, 
MOCK_ENTRYPATH);
+        tmParameters =
+                new StandaloneKubernetesTaskManagerParameters(
+                        configuration,
+                        new ClusterSpecification.ClusterSpecificationBuilder()
+                                .createClusterSpecification());
+
+        decorator = new CmdStandaloneTaskManagerDecorator(tmParameters);
+    }
+
+    @Test
+    public void testCommandAdded() {
+        FlinkPod decoratedPod = decorator.decorateFlinkPod(new 
FlinkPod.Builder().build());
+
+        assertThat(
+                decoratedPod.getMainContainer().getCommand(), 
containsInAnyOrder(MOCK_ENTRYPATH));
+        assertThat(
+                decoratedPod.getMainContainer().getArgs(),
+                
containsInAnyOrder(CmdStandaloneTaskManagerDecorator.TASKMANAGER_ENTRYPOINT_ARG));
+    }
+}
diff --git 
a/flink-kubernetes-standalone/src/test/java/org/apache/flink/kubernetes/operator/kubeclient/decorators/InitStandaloneTaskManagerDecoratorTest.java
 
b/flink-kubernetes-standalone/src/test/java/org/apache/flink/kubernetes/operator/kubeclient/decorators/InitStandaloneTaskManagerDecoratorTest.java
new file mode 100644
index 00000000..36483adf
--- /dev/null
+++ 
b/flink-kubernetes-standalone/src/test/java/org/apache/flink/kubernetes/operator/kubeclient/decorators/InitStandaloneTaskManagerDecoratorTest.java
@@ -0,0 +1,176 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.kubernetes.operator.kubeclient.decorators;
+
+import org.apache.flink.client.deployment.ClusterSpecification;
+import org.apache.flink.kubernetes.kubeclient.FlinkPod;
+import 
org.apache.flink.kubernetes.operator.kubeclient.parameters.ParametersTestBase;
+import 
org.apache.flink.kubernetes.operator.kubeclient.parameters.StandaloneKubernetesTaskManagerParameters;
+import org.apache.flink.kubernetes.operator.kubeclient.utils.TestUtils;
+import org.apache.flink.kubernetes.operator.utils.StandaloneKubernetesUtils;
+import org.apache.flink.kubernetes.utils.Constants;
+
+import io.fabric8.kubernetes.api.model.Container;
+import io.fabric8.kubernetes.api.model.ContainerPort;
+import io.fabric8.kubernetes.api.model.ContainerPortBuilder;
+import io.fabric8.kubernetes.api.model.EnvVar;
+import io.fabric8.kubernetes.api.model.LocalObjectReference;
+import io.fabric8.kubernetes.api.model.Pod;
+import io.fabric8.kubernetes.api.model.Quantity;
+import io.fabric8.kubernetes.api.model.ResourceRequirements;
+import org.junit.jupiter.api.BeforeEach;
+import org.junit.jupiter.api.Test;
+
+import java.util.ArrayList;
+import java.util.Arrays;
+import java.util.HashMap;
+import java.util.List;
+import java.util.Map;
+import java.util.stream.Collectors;
+
+import static org.hamcrest.MatcherAssert.assertThat;
+import static org.hamcrest.Matchers.containsInAnyOrder;
+import static org.junit.jupiter.api.Assertions.assertEquals;
+
+/** @link InitStandaloneTaskManagerDecorator unit tests */
+public class InitStandaloneTaskManagerDecoratorTest extends ParametersTestBase 
{
+
+    private Pod resultPod;
+    private Container resultMainContainer;
+
+    @BeforeEach
+    public void setup() {
+        setupFlinkConfig();
+
+        ClusterSpecification clusterSpecification = 
TestUtils.createClusterSpecification();
+        StandaloneKubernetesTaskManagerParameters 
kubernetesTaskManagerParameters =
+                new StandaloneKubernetesTaskManagerParameters(flinkConfig, 
clusterSpecification);
+        InitStandaloneTaskManagerDecorator decorator =
+                new 
InitStandaloneTaskManagerDecorator(kubernetesTaskManagerParameters);
+
+        final FlinkPod resultFlinkPod = 
decorator.decorateFlinkPod(createPodTemplate());
+        resultPod = resultFlinkPod.getPodWithoutMainContainer();
+        resultMainContainer = resultFlinkPod.getMainContainer();
+    }
+
+    @Test
+    public void testOverrideApiVersion() {
+        assertEquals(Constants.API_VERSION, resultPod.getApiVersion());
+    }
+
+    @Test
+    public void testMainContainerName() {
+        assertEquals(Constants.MAIN_CONTAINER_NAME, 
resultMainContainer.getName());
+    }
+
+    @Test
+    public void testOverrideMainContainerImage() {
+        assertEquals(TestUtils.IMAGE, resultMainContainer.getImage());
+    }
+
+    @Test
+    public void testOverrideMainContainerImagePullPolicy() {
+        assertEquals(TestUtils.IMAGE_POLICY, 
resultMainContainer.getImagePullPolicy());
+    }
+
+    @Test
+    public void testOverrideMainContainerResourceRequirements() {
+        final ResourceRequirements resourceRequirements = 
resultMainContainer.getResources();
+
+        final Map<String, Quantity> requests = 
resourceRequirements.getRequests();
+        assertEquals(Double.toString(TestUtils.TASK_MANAGER_CPU), 
requests.get("cpu").getAmount());
+        assertEquals(
+                String.valueOf(TestUtils.TASK_MANAGER_MEMORY_MB),
+                requests.get("memory").getAmount());
+
+        final Map<String, Quantity> limits = resourceRequirements.getLimits();
+        assertEquals(Double.toString(TestUtils.TASK_MANAGER_CPU), 
limits.get("cpu").getAmount());
+        assertEquals(
+                String.valueOf(TestUtils.TASK_MANAGER_MEMORY_MB), 
limits.get("memory").getAmount());
+    }
+
+    @Test
+    public void testMergeMainContainerPorts() {
+        final List<ContainerPort> expectedContainerPorts =
+                Arrays.asList(
+                        new ContainerPortBuilder()
+                                .withName(Constants.TASK_MANAGER_RPC_PORT_NAME)
+                                
.withContainerPort(Constants.TASK_MANAGER_RPC_PORT)
+                                .build(),
+                        new ContainerPortBuilder()
+                                .withName(TEMPLATE_PORT_NAME)
+                                .withContainerPort(TEMPLATE_PORT)
+                                .build());
+
+        assertThat(
+                resultMainContainer.getPorts(),
+                containsInAnyOrder(expectedContainerPorts.toArray()));
+    }
+
+    @Test
+    public void testMergePodLabels() {
+        final Map<String, String> expectedLabels =
+                new HashMap<>(
+                        
StandaloneKubernetesUtils.getTaskManagerSelectors(TestUtils.CLUSTER_ID));
+        expectedLabels.putAll(userLabels);
+        expectedLabels.putAll(templateLabels);
+
+        assertEquals(expectedLabels, resultPod.getMetadata().getLabels());
+    }
+
+    @Test
+    public void testMergePodAnnotations() {
+        final Map<String, String> expectedAnnotations = new 
HashMap<>(userAnnotations);
+        expectedAnnotations.putAll(templateAnnotations);
+        assertEquals(expectedAnnotations, 
resultPod.getMetadata().getAnnotations());
+    }
+
+    @Test
+    public void testOverridePodServiceAccountName() {
+        assertEquals(TestUtils.SERVICE_ACCOUNT, 
resultPod.getSpec().getServiceAccountName());
+    }
+
+    @Test
+    public void testMergeImagePullSecrets() {
+        final List<String> resultSecrets =
+                resultPod.getSpec().getImagePullSecrets().stream()
+                        .map(LocalObjectReference::getName)
+                        .collect(Collectors.toList());
+        final List<String> expectedImagePullSecrets = new 
ArrayList<>(userImagePullSecrets);
+        expectedImagePullSecrets.addAll(templateImagePullSecrets);
+
+        assertThat(resultSecrets, 
containsInAnyOrder(expectedImagePullSecrets.toArray()));
+    }
+
+    @Test
+    public void testMergeNodeSelector() {
+        final Map<String, String> expectedNodeSelectors = new 
HashMap<>(userNodeSelectors);
+        expectedNodeSelectors.putAll(templateNodeSelector);
+        assertEquals(expectedNodeSelectors, 
resultPod.getSpec().getNodeSelector());
+    }
+
+    @Test
+    public void testEnvs() {
+        final List<EnvVar> envVars = resultMainContainer.getEnv();
+
+        final Map<String, String> envs = new HashMap<>();
+        envVars.forEach(env -> envs.put(env.getName(), env.getValue()));
+
+        assertEquals(templateEnvs, envs);
+    }
+}
diff --git 
a/flink-kubernetes-standalone/src/test/java/org/apache/flink/kubernetes/operator/kubeclient/decorators/UserLibMountDecoratorTest.java
 
b/flink-kubernetes-standalone/src/test/java/org/apache/flink/kubernetes/operator/kubeclient/decorators/UserLibMountDecoratorTest.java
new file mode 100644
index 00000000..07c8af20
--- /dev/null
+++ 
b/flink-kubernetes-standalone/src/test/java/org/apache/flink/kubernetes/operator/kubeclient/decorators/UserLibMountDecoratorTest.java
@@ -0,0 +1,80 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.kubernetes.operator.kubeclient.decorators;
+
+import org.apache.flink.client.deployment.ClusterSpecification;
+import org.apache.flink.configuration.Configuration;
+import org.apache.flink.kubernetes.kubeclient.FlinkPod;
+import 
org.apache.flink.kubernetes.operator.kubeclient.parameters.StandaloneKubernetesJobManagerParameters;
+import 
org.apache.flink.kubernetes.operator.standalone.StandaloneKubernetesConfigOptionsInternal;
+
+import io.fabric8.kubernetes.api.model.VolumeMount;
+import org.junit.jupiter.api.Test;
+
+import static org.junit.jupiter.api.Assertions.assertEquals;
+
+/** @link UserLibMountDecorator unit tests */
+public class UserLibMountDecoratorTest {
+
+    @Test
+    public void testVolumeAdded() {
+        StandaloneKubernetesJobManagerParameters jmParameters =
+                createJmParamsWithClusterMode(
+                        
StandaloneKubernetesConfigOptionsInternal.ClusterMode.APPLICATION);
+
+        UserLibMountDecorator decorator = new 
UserLibMountDecorator(jmParameters);
+
+        FlinkPod baseFlinkPod = new FlinkPod.Builder().build();
+
+        assertEquals(0, 
baseFlinkPod.getMainContainer().getVolumeMounts().size());
+        assertEquals(0, 
baseFlinkPod.getPodWithoutMainContainer().getSpec().getVolumes().size());
+
+        FlinkPod decoratedPod = decorator.decorateFlinkPod(baseFlinkPod);
+        assertEquals(1, 
decoratedPod.getMainContainer().getVolumeMounts().size());
+        assertEquals(1, 
decoratedPod.getPodWithoutMainContainer().getSpec().getVolumes().size());
+
+        VolumeMount volumeMount = 
decoratedPod.getMainContainer().getVolumeMounts().get(0);
+
+        assertEquals("/opt/flink/usrlib", volumeMount.getMountPath());
+    }
+
+    @Test
+    public void testVolumeNotAdded() {
+        StandaloneKubernetesJobManagerParameters jmParameters =
+                createJmParamsWithClusterMode(
+                        
StandaloneKubernetesConfigOptionsInternal.ClusterMode.SESSION);
+        UserLibMountDecorator decorator = new 
UserLibMountDecorator(jmParameters);
+
+        FlinkPod baseFlinkPod = new FlinkPod.Builder().build();
+        assertEquals(0, 
baseFlinkPod.getMainContainer().getVolumeMounts().size());
+        assertEquals(0, 
baseFlinkPod.getPodWithoutMainContainer().getSpec().getVolumes().size());
+
+        FlinkPod decoratedPod = decorator.decorateFlinkPod(baseFlinkPod);
+        assertEquals(0, 
decoratedPod.getMainContainer().getVolumeMounts().size());
+        assertEquals(0, 
decoratedPod.getPodWithoutMainContainer().getSpec().getVolumes().size());
+    }
+
+    private StandaloneKubernetesJobManagerParameters 
createJmParamsWithClusterMode(
+            StandaloneKubernetesConfigOptionsInternal.ClusterMode clusterMode) 
{
+        return new StandaloneKubernetesJobManagerParameters(
+                new Configuration()
+                        
.set(StandaloneKubernetesConfigOptionsInternal.CLUSTER_MODE, clusterMode),
+                new ClusterSpecification.ClusterSpecificationBuilder()
+                        .createClusterSpecification());
+    }
+}
diff --git 
a/flink-kubernetes-standalone/src/test/java/org/apache/flink/kubernetes/operator/kubeclient/parameters/ParametersTestBase.java
 
b/flink-kubernetes-standalone/src/test/java/org/apache/flink/kubernetes/operator/kubeclient/parameters/ParametersTestBase.java
new file mode 100644
index 00000000..c3ffbafb
--- /dev/null
+++ 
b/flink-kubernetes-standalone/src/test/java/org/apache/flink/kubernetes/operator/kubeclient/parameters/ParametersTestBase.java
@@ -0,0 +1,145 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.kubernetes.operator.kubeclient.parameters;
+
+import org.apache.flink.client.deployment.ClusterSpecification;
+import org.apache.flink.configuration.Configuration;
+import org.apache.flink.kubernetes.configuration.KubernetesConfigOptions;
+import org.apache.flink.kubernetes.kubeclient.FlinkPod;
+import org.apache.flink.kubernetes.operator.kubeclient.utils.TestUtils;
+import org.apache.flink.kubernetes.utils.KubernetesUtils;
+
+import io.fabric8.kubernetes.api.model.Container;
+import io.fabric8.kubernetes.api.model.ContainerBuilder;
+import io.fabric8.kubernetes.api.model.ContainerPortBuilder;
+import io.fabric8.kubernetes.api.model.EnvVar;
+import io.fabric8.kubernetes.api.model.EnvVarBuilder;
+import io.fabric8.kubernetes.api.model.LocalObjectReference;
+import io.fabric8.kubernetes.api.model.PodBuilder;
+import io.fabric8.kubernetes.api.model.ResourceRequirements;
+
+import java.util.ArrayList;
+import java.util.Arrays;
+import java.util.Collections;
+import java.util.List;
+import java.util.Map;
+
+/** Base class for Kubernetes tests. */
+public class ParametersTestBase {
+
+    protected Configuration flinkConfig;
+
+    protected ClusterSpecification clusterSpecification = 
TestUtils.createClusterSpecification();
+
+    protected final Map<String, String> userLabels =
+            TestUtils.generateTestStringStringMap("label", "value", 2);
+
+    protected final Map<String, String> userAnnotations =
+            TestUtils.generateTestStringStringMap("annotation", "value", 2);
+
+    protected final Map<String, String> userNodeSelectors =
+            TestUtils.generateTestStringStringMap("selector", "val", 2);
+
+    protected final List<String> userImagePullSecrets = Arrays.asList("s1", 
"s2", "s3");
+
+    protected static final String TEMPLATE_PORT_NAME = "user-port";
+    protected static final int TEMPLATE_PORT = 9458;
+
+    protected final Map<String, String> templateEnvs =
+            TestUtils.generateTestStringStringMap("TEMPLATE_KEY", "VAL", 2);
+
+    protected final Map<String, String> templateLabels =
+            TestUtils.generateTestStringStringMap("template-label", "value", 
2);
+
+    protected final Map<String, String> templateAnnotations =
+            TestUtils.generateTestStringStringMap("template-annotation", 
"value", 2);
+
+    protected final Map<String, String> templateNodeSelector =
+            TestUtils.generateTestStringStringMap("template-node-selector", 
"value", 2);
+
+    protected final List<String> templateImagePullSecrets = 
Arrays.asList("ts1", "ts2", "ts3");
+
+    private static final String SECRETS = "ssl-cert:/etc/ssl";
+
+    protected FlinkPod createPodTemplate() {
+        List<EnvVar> envVars = new ArrayList<>();
+        templateEnvs.forEach(
+                (k, v) -> envVars.add(new 
EnvVarBuilder().withName(k).withValue(v).build()));
+
+        Container mainContainer =
+                new ContainerBuilder()
+                        .withImagePullPolicy("templatePullPolicy")
+                        .withImage("templateImage")
+                        .withResources(
+                                KubernetesUtils.getResourceRequirements(
+                                        new ResourceRequirements(),
+                                        1234,
+                                        1,
+                                        102,
+                                        1,
+                                        Collections.emptyMap(),
+                                        Collections.emptyMap()))
+                        .withPorts(
+                                new ContainerPortBuilder()
+                                        .withName(TEMPLATE_PORT_NAME)
+                                        .withContainerPort(TEMPLATE_PORT)
+                                        .build())
+                        .withEnv(envVars)
+                        .build();
+
+        return new FlinkPod.Builder()
+                .withMainContainer(mainContainer)
+                .withPod(
+                        new PodBuilder()
+                                .withApiVersion("templateAPIVersion")
+                                .editOrNewSpec()
+                                
.withServiceAccountName("templateServiceAccountName")
+                                .withServiceAccount("templateServiceAccount")
+                                .endSpec()
+                                .editOrNewMetadata()
+                                .addToLabels(templateLabels)
+                                .addToAnnotations(templateAnnotations)
+                                .endMetadata()
+                                .editOrNewSpec()
+                                .addToImagePullSecrets(getImagePullSecrets())
+                                .addToNodeSelector(templateNodeSelector)
+                                .endSpec()
+                                .build())
+                .build();
+    }
+
+    private LocalObjectReference[] getImagePullSecrets() {
+        return templateImagePullSecrets.stream()
+                .map(String::trim)
+                .filter(secret -> !secret.isEmpty())
+                .map(LocalObjectReference::new)
+                .toArray(LocalObjectReference[]::new);
+    }
+
+    protected void setupFlinkConfig() {
+        flinkConfig = TestUtils.createTestFlinkConfig();
+        flinkConfig.set(KubernetesConfigOptions.TASK_MANAGER_ANNOTATIONS, 
userAnnotations);
+        flinkConfig.set(KubernetesConfigOptions.JOB_MANAGER_ANNOTATIONS, 
userAnnotations);
+        flinkConfig.set(KubernetesConfigOptions.JOB_MANAGER_LABELS, 
userLabels);
+        flinkConfig.set(KubernetesConfigOptions.TASK_MANAGER_LABELS, 
userLabels);
+        flinkConfig.set(KubernetesConfigOptions.TASK_MANAGER_NODE_SELECTOR, 
userNodeSelectors);
+        flinkConfig.set(KubernetesConfigOptions.JOB_MANAGER_NODE_SELECTOR, 
userNodeSelectors);
+        flinkConfig.set(KubernetesConfigOptions.CONTAINER_IMAGE_PULL_SECRETS, 
userImagePullSecrets);
+        
flinkConfig.setString(KubernetesConfigOptions.KUBERNETES_SECRETS.key(), 
SECRETS);
+    }
+}
diff --git 
a/flink-kubernetes-standalone/src/test/java/org/apache/flink/kubernetes/operator/kubeclient/parameters/StandaloneKubernetesJobManagerParametersTest.java
 
b/flink-kubernetes-standalone/src/test/java/org/apache/flink/kubernetes/operator/kubeclient/parameters/StandaloneKubernetesJobManagerParametersTest.java
new file mode 100644
index 00000000..cf6c0250
--- /dev/null
+++ 
b/flink-kubernetes-standalone/src/test/java/org/apache/flink/kubernetes/operator/kubeclient/parameters/StandaloneKubernetesJobManagerParametersTest.java
@@ -0,0 +1,105 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.kubernetes.operator.kubeclient.parameters;
+
+import org.apache.flink.client.deployment.ClusterSpecification;
+import org.apache.flink.client.deployment.application.ApplicationConfiguration;
+import org.apache.flink.kubernetes.operator.kubeclient.utils.TestUtils;
+import 
org.apache.flink.kubernetes.operator.standalone.StandaloneKubernetesConfigOptionsInternal;
+import org.apache.flink.kubernetes.operator.utils.StandaloneKubernetesUtils;
+
+import org.junit.jupiter.api.BeforeEach;
+import org.junit.jupiter.api.Test;
+
+import java.util.HashMap;
+import java.util.Map;
+
+import static org.junit.jupiter.api.Assertions.assertEquals;
+import static org.junit.jupiter.api.Assertions.assertFalse;
+import static org.junit.jupiter.api.Assertions.assertNull;
+import static org.junit.jupiter.api.Assertions.assertTrue;
+
+/** @link StandaloneKubernetesJobManagerParameters unit tests */
+public class StandaloneKubernetesJobManagerParametersTest extends 
ParametersTestBase {
+    private StandaloneKubernetesJobManagerParameters 
kubernetesJobManagerParameters;
+
+    @BeforeEach
+    public void setup() {
+        setupFlinkConfig();
+        ClusterSpecification clusterSpecification = 
TestUtils.createClusterSpecification();
+        kubernetesJobManagerParameters =
+                new StandaloneKubernetesJobManagerParameters(flinkConfig, 
clusterSpecification);
+    }
+
+    @Test
+    public void testGetLabels() {
+        Map<String, String> expectedLabels = new HashMap<>();
+        expectedLabels.putAll(userLabels);
+        expectedLabels.putAll(
+                
StandaloneKubernetesUtils.getJobManagerSelectors(TestUtils.CLUSTER_ID));
+        assertEquals(expectedLabels, 
kubernetesJobManagerParameters.getLabels());
+    }
+
+    @Test
+    public void testGetSelectors() {
+        assertEquals(
+                
StandaloneKubernetesUtils.getJobManagerSelectors(TestUtils.CLUSTER_ID),
+                kubernetesJobManagerParameters.getSelectors());
+    }
+
+    @Test
+    public void testGetCommonLabels() {
+        assertEquals(
+                
StandaloneKubernetesUtils.getCommonLabels(TestUtils.CLUSTER_ID),
+                kubernetesJobManagerParameters.getCommonLabels());
+    }
+
+    @Test
+    public void testIsInternalServiceEnabled() {
+        assertTrue(kubernetesJobManagerParameters.isInternalServiceEnabled());
+    }
+
+    @Test
+    public void testIsApplicationCluster() {
+        flinkConfig.set(
+                StandaloneKubernetesConfigOptionsInternal.CLUSTER_MODE,
+                
StandaloneKubernetesConfigOptionsInternal.ClusterMode.APPLICATION);
+        assertTrue(kubernetesJobManagerParameters.isApplicationCluster());
+
+        flinkConfig.set(
+                StandaloneKubernetesConfigOptionsInternal.CLUSTER_MODE,
+                StandaloneKubernetesConfigOptionsInternal.ClusterMode.SESSION);
+        assertFalse(kubernetesJobManagerParameters.isApplicationCluster());
+    }
+
+    @Test
+    public void testGetMainClass() {
+        String entryClass = "my.main.test.class";
+        flinkConfig.set(ApplicationConfiguration.APPLICATION_MAIN_CLASS, 
entryClass);
+
+        flinkConfig.set(
+                StandaloneKubernetesConfigOptionsInternal.CLUSTER_MODE,
+                StandaloneKubernetesConfigOptionsInternal.ClusterMode.SESSION);
+        assertNull(kubernetesJobManagerParameters.getMainClass());
+
+        flinkConfig.set(
+                StandaloneKubernetesConfigOptionsInternal.CLUSTER_MODE,
+                
StandaloneKubernetesConfigOptionsInternal.ClusterMode.APPLICATION);
+        assertEquals(entryClass, 
kubernetesJobManagerParameters.getMainClass());
+    }
+}
diff --git 
a/flink-kubernetes-standalone/src/test/java/org/apache/flink/kubernetes/operator/kubeclient/parameters/StandaloneKubernetesTaskManagerParametersTest.java
 
b/flink-kubernetes-standalone/src/test/java/org/apache/flink/kubernetes/operator/kubeclient/parameters/StandaloneKubernetesTaskManagerParametersTest.java
new file mode 100644
index 00000000..c794a630
--- /dev/null
+++ 
b/flink-kubernetes-standalone/src/test/java/org/apache/flink/kubernetes/operator/kubeclient/parameters/StandaloneKubernetesTaskManagerParametersTest.java
@@ -0,0 +1,159 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.kubernetes.operator.kubeclient.parameters;
+
+import org.apache.flink.configuration.TaskManagerOptions;
+import org.apache.flink.kubernetes.configuration.KubernetesConfigOptions;
+import org.apache.flink.kubernetes.operator.kubeclient.utils.TestUtils;
+import 
org.apache.flink.kubernetes.operator.standalone.StandaloneKubernetesConfigOptionsInternal;
+import org.apache.flink.kubernetes.operator.utils.StandaloneKubernetesUtils;
+import org.apache.flink.kubernetes.utils.Constants;
+
+import org.junit.jupiter.api.BeforeEach;
+import org.junit.jupiter.api.Test;
+
+import java.io.File;
+import java.util.HashMap;
+import java.util.Map;
+import java.util.Optional;
+
+import static org.junit.jupiter.api.Assertions.assertEquals;
+import static org.junit.jupiter.api.Assertions.assertFalse;
+import static org.junit.jupiter.api.Assertions.assertThrows;
+import static org.junit.jupiter.api.Assertions.assertTrue;
+
+/** @link StandaloneKubernetesTaskManagerParameters unit tests */
+public class StandaloneKubernetesTaskManagerParametersTest extends 
ParametersTestBase {
+
+    private StandaloneKubernetesTaskManagerParameters 
kubernetesTaskManagerParameters;
+
+    @BeforeEach
+    public void setup() {
+        setupFlinkConfig();
+        kubernetesTaskManagerParameters =
+                new StandaloneKubernetesTaskManagerParameters(flinkConfig, 
clusterSpecification);
+    }
+
+    @Test
+    public void testGetLabels() {
+        Map<String, String> expectedLabels = new HashMap<>();
+        expectedLabels.putAll(userLabels);
+        expectedLabels.putAll(
+                
StandaloneKubernetesUtils.getTaskManagerSelectors(TestUtils.CLUSTER_ID));
+        assertEquals(expectedLabels, 
kubernetesTaskManagerParameters.getLabels());
+    }
+
+    @Test
+    public void testGetSelectors() {
+        Map<String, String> expectedSelectors =
+                
StandaloneKubernetesUtils.getTaskManagerSelectors(TestUtils.CLUSTER_ID);
+
+        assertEquals(expectedSelectors, 
kubernetesTaskManagerParameters.getSelectors());
+    }
+
+    @Test
+    public void testGetNodeSelector() {
+        assertEquals(userNodeSelectors, 
kubernetesTaskManagerParameters.getNodeSelector());
+    }
+
+    @Test
+    public void testGetEnvironments() {
+        
assertTrue(kubernetesTaskManagerParameters.getEnvironments().isEmpty());
+    }
+
+    @Test
+    public void testGetAnnotations() {
+        assertEquals(userAnnotations, 
kubernetesTaskManagerParameters.getAnnotations());
+    }
+
+    @Test
+    public void testGetReplicas() {
+        int tmReplicas = 11;
+        flinkConfig.set(
+                
StandaloneKubernetesConfigOptionsInternal.KUBERNETES_TASKMANAGER_REPLICAS,
+                tmReplicas);
+        assertEquals(tmReplicas, 
kubernetesTaskManagerParameters.getReplicas());
+    }
+
+    @Test
+    public void testInvalidReplicas() {
+        flinkConfig.set(
+                
StandaloneKubernetesConfigOptionsInternal.KUBERNETES_TASKMANAGER_REPLICAS, -1);
+        kubernetesTaskManagerParameters =
+                new StandaloneKubernetesTaskManagerParameters(flinkConfig, 
clusterSpecification);
+
+        assertThrows(
+                IllegalArgumentException.class,
+                () -> {
+                    kubernetesTaskManagerParameters.getReplicas();
+                });
+    }
+
+    @Test
+    public void testGetServiceAccount() {
+        assertEquals(
+                TestUtils.SERVICE_ACCOUNT, 
kubernetesTaskManagerParameters.getServiceAccount());
+    }
+
+    @Test
+    public void testGetTaskManagerMemoryMB() {
+        assertEquals(
+                TestUtils.TASK_MANAGER_MEMORY_MB,
+                kubernetesTaskManagerParameters.getTaskManagerMemoryMB());
+    }
+
+    @Test
+    public void testGetTaskManagerCPU() {
+        assertEquals(
+                TestUtils.TASK_MANAGER_CPU,
+                kubernetesTaskManagerParameters.getTaskManagerCPU(),
+                0.00001);
+    }
+
+    @Test
+    public void testGetGetRPCPort() {
+        assertEquals(Constants.TASK_MANAGER_RPC_PORT, 
kubernetesTaskManagerParameters.getRPCPort());
+    }
+
+    @Test
+    public void testInvalidRPCPort() {
+        flinkConfig.set(TaskManagerOptions.RPC_PORT, String.valueOf(0));
+        assertThrows(
+                IllegalArgumentException.class,
+                () -> {
+                    kubernetesTaskManagerParameters.getRPCPort();
+                });
+    }
+
+    @Test
+    public void testGetPodTemplateFilePath() {
+        String templateFilePath = "/tmp/tst.yml";
+        
flinkConfig.setString(KubernetesConfigOptions.TASK_MANAGER_POD_TEMPLATE, 
templateFilePath);
+        Optional<File> templateFile = 
kubernetesTaskManagerParameters.getPodTemplateFilePath();
+        assertTrue(templateFile.isPresent());
+        String filePath = templateFile.get().getAbsolutePath();
+        assertEquals(templateFilePath, filePath);
+    }
+
+    @Test
+    public void testGetNoPodTemplateFilePath() {
+        
flinkConfig.removeConfig(KubernetesConfigOptions.TASK_MANAGER_POD_TEMPLATE);
+        Optional<File> templateFile = 
kubernetesTaskManagerParameters.getPodTemplateFilePath();
+        assertFalse(templateFile.isPresent());
+    }
+}
diff --git 
a/flink-kubernetes-standalone/src/test/java/org/apache/flink/kubernetes/operator/kubeclient/utils/TestUtils.java
 
b/flink-kubernetes-standalone/src/test/java/org/apache/flink/kubernetes/operator/kubeclient/utils/TestUtils.java
new file mode 100644
index 00000000..5a651930
--- /dev/null
+++ 
b/flink-kubernetes-standalone/src/test/java/org/apache/flink/kubernetes/operator/kubeclient/utils/TestUtils.java
@@ -0,0 +1,92 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.kubernetes.operator.kubeclient.utils;
+
+import org.apache.flink.client.deployment.ClusterSpecification;
+import org.apache.flink.configuration.BlobServerOptions;
+import org.apache.flink.configuration.Configuration;
+import org.apache.flink.configuration.MemorySize;
+import org.apache.flink.configuration.RestOptions;
+import org.apache.flink.configuration.TaskManagerOptions;
+import org.apache.flink.kubernetes.configuration.KubernetesConfigOptions;
+import org.apache.flink.kubernetes.utils.Constants;
+
+import java.util.HashMap;
+import java.util.Map;
+
+/** Testing utilities. */
+public class TestUtils {
+
+    public static final String CLUSTER_ID = "test-cluster";
+    public static final String SERVICE_ACCOUNT = "flink-operator";
+    public static final String TEST_NAMESPACE = "flink-operator-test";
+
+    public static final String TASK_MANAGER_MEMORY = "2048m";
+    public static final String JOB_MANAGER_MEMORY = "1024m";
+
+    public static final String FLINK_VERSION = "latest";
+    public static final String IMAGE = String.format("flink:%s", 
FLINK_VERSION);
+    public static final String IMAGE_POLICY = "IfNotPresent";
+
+    public static final int TASK_MANAGER_MEMORY_MB =
+            MemorySize.parse(TASK_MANAGER_MEMORY).getMebiBytes();
+    public static final int JOB_MANAGER_MEMORY_MB =
+            MemorySize.parse(JOB_MANAGER_MEMORY).getMebiBytes();
+
+    public static final int SLOTS_PER_TASK_MANAGER = 2;
+
+    public static final double TASK_MANAGER_CPU = 4;
+    public static final double JOB_MANAGER_CPU = 2;
+
+    public static final Map<String, String> generateTestStringStringMap(
+            String keyPrefix, String valuePrefix, int entries) {
+        Map<String, String> map = new HashMap<>();
+        for (int i = 1; i <= entries; i++) {
+            map.put(keyPrefix + i, valuePrefix + i);
+        }
+        return map;
+    }
+
+    public static ClusterSpecification createClusterSpecification() {
+        return new ClusterSpecification.ClusterSpecificationBuilder()
+                .setMasterMemoryMB(JOB_MANAGER_MEMORY_MB)
+                .setTaskManagerMemoryMB(TASK_MANAGER_MEMORY_MB)
+                .setSlotsPerTaskManager(SLOTS_PER_TASK_MANAGER)
+                .createClusterSpecification();
+    }
+
+    public static Configuration createTestFlinkConfig() {
+        Configuration flinkConf = new Configuration();
+        flinkConf.set(KubernetesConfigOptions.CLUSTER_ID, CLUSTER_ID);
+        flinkConf.set(KubernetesConfigOptions.NAMESPACE, TEST_NAMESPACE);
+        flinkConf.set(KubernetesConfigOptions.KUBERNETES_SERVICE_ACCOUNT, 
SERVICE_ACCOUNT);
+        flinkConf.set(KubernetesConfigOptions.CONTAINER_IMAGE, IMAGE);
+        flinkConf.set(
+                KubernetesConfigOptions.CONTAINER_IMAGE_PULL_POLICY,
+                KubernetesConfigOptions.ImagePullPolicy.valueOf(IMAGE_POLICY));
+
+        flinkConf.set(KubernetesConfigOptions.JOB_MANAGER_CPU, 
JOB_MANAGER_CPU);
+        flinkConf.set(KubernetesConfigOptions.TASK_MANAGER_CPU, 
TASK_MANAGER_CPU);
+
+        flinkConf.setString(
+                TaskManagerOptions.RPC_PORT, 
String.valueOf(Constants.TASK_MANAGER_RPC_PORT));
+        flinkConf.setString(BlobServerOptions.PORT, 
String.valueOf(Constants.BLOB_SERVER_PORT));
+        flinkConf.setString(RestOptions.BIND_PORT, 
String.valueOf(Constants.REST_PORT));
+        return flinkConf;
+    }
+}
diff --git a/pom.xml b/pom.xml
index e548bfdd..6eddd541 100644
--- a/pom.xml
+++ b/pom.xml
@@ -51,6 +51,7 @@ under the License.
     </scm>
 
     <modules>
+        <module>flink-kubernetes-standalone</module>
         <module>flink-kubernetes-shaded</module>
         <module>flink-kubernetes-operator</module>
         <module>flink-kubernetes-webhook</module>
@@ -62,7 +63,7 @@ under the License.
         <maven.compiler.source>11</maven.compiler.source>
         <maven.compiler.target>11</maven.compiler.target>
         <maven-assembly-plugin.version>3.3.0</maven-assembly-plugin.version>
-        <maven-surefire-plugin.version>3.0.0-M5</maven-surefire-plugin.version>
+        <maven-surefire-plugin.version>3.0.0-M4</maven-surefire-plugin.version>
         <maven-failsafe-plugin.version>3.0.0-M5</maven-failsafe-plugin.version>
         <maven-resources-plugin.version>3.2.0</maven-resources-plugin.version>
         
<git-commit-id-maven-plugin.version>5.0.0</git-commit-id-maven-plugin.version>
@@ -81,6 +82,9 @@ under the License.
 
         <spotless.version>2.4.2</spotless.version>
         <it.skip>true</it.skip>
+
+        <hamcrest.version>1.3</hamcrest.version>
+
     </properties>
 
     <dependencyManagement>

Reply via email to