tillrohrmann commented on a change in pull request #14629:
URL: https://github.com/apache/flink/pull/14629#discussion_r583726233



##########
File path: 
flink-kubernetes/src/main/java/org/apache/flink/kubernetes/kubeclient/decorators/InitJobManagerDecorator.java
##########
@@ -49,64 +50,104 @@
 public class InitJobManagerDecorator extends AbstractKubernetesStepDecorator {
 
     private final KubernetesJobManagerParameters 
kubernetesJobManagerParameters;
+    private final Configuration flinkConfig;
 
     public InitJobManagerDecorator(KubernetesJobManagerParameters 
kubernetesJobManagerParameters) {
         this.kubernetesJobManagerParameters = 
checkNotNull(kubernetesJobManagerParameters);
+        this.flinkConfig = 
checkNotNull(kubernetesJobManagerParameters.getFlinkConfiguration());
     }
 
     @Override
     public FlinkPod decorateFlinkPod(FlinkPod flinkPod) {
-        final Pod basicPod =
-                new PodBuilder(flinkPod.getPod())
-                        .withApiVersion(API_VERSION)
-                        .editOrNewMetadata()
-                        .withLabels(kubernetesJobManagerParameters.getLabels())
-                        
.withAnnotations(kubernetesJobManagerParameters.getAnnotations())
-                        .endMetadata()
-                        .editOrNewSpec()
-                        
.withServiceAccountName(kubernetesJobManagerParameters.getServiceAccount())
-                        
.withImagePullSecrets(kubernetesJobManagerParameters.getImagePullSecrets())
-                        
.withNodeSelector(kubernetesJobManagerParameters.getNodeSelector())
-                        .withTolerations(
-                                
kubernetesJobManagerParameters.getTolerations().stream()
-                                        .map(
-                                                e ->
-                                                        
KubernetesToleration.fromMap(e)
-                                                                
.getInternalResource())
-                                        .collect(Collectors.toList()))
-                        .endSpec()
-                        .build();
+        final PodBuilder basicPodBuilder = new 
PodBuilder(flinkPod.getPodWithoutMainContainer());
+
+        // Overwrite fields
+        final String serviceAccountName =
+                KubernetesUtils.resolveUserDefinedValue(
+                        flinkConfig,
+                        KubernetesConfigOptions.JOB_MANAGER_SERVICE_ACCOUNT,
+                        kubernetesJobManagerParameters.getServiceAccount(),
+                        
flinkPod.getPodWithoutMainContainer().getSpec().getServiceAccount(),
+                        "service account name");
+        if (flinkPod.getPodWithoutMainContainer().getSpec().getRestartPolicy() 
!= null) {
+            logger.info(
+                    "The restart policy of JobManager pod will be overwritten 
to 'always' "
+                            + "since it is controlled by the Kubernetes 
deployment.");

Review comment:
       Is `always` the default or why don't we set it explicity via 
`withRestartPolicy("Always")`? It just a bit confusing that I don't see the 
builder call further down.

##########
File path: 
flink-kubernetes/src/main/java/org/apache/flink/kubernetes/kubeclient/decorators/InitTaskManagerDecorator.java
##########
@@ -41,64 +42,108 @@
 public class InitTaskManagerDecorator extends AbstractKubernetesStepDecorator {
 
     private final KubernetesTaskManagerParameters 
kubernetesTaskManagerParameters;
+    private final Configuration flinkConfig;
 
     public InitTaskManagerDecorator(
             KubernetesTaskManagerParameters kubernetesTaskManagerParameters) {
         this.kubernetesTaskManagerParameters = 
checkNotNull(kubernetesTaskManagerParameters);
+        this.flinkConfig = 
checkNotNull(kubernetesTaskManagerParameters.getFlinkConfiguration());
     }
 
     @Override
     public FlinkPod decorateFlinkPod(FlinkPod flinkPod) {
-        final Pod basicPod =
-                new PodBuilder(flinkPod.getPod())
-                        .withApiVersion(Constants.API_VERSION)
-                        .editOrNewMetadata()
-                        .withName(kubernetesTaskManagerParameters.getPodName())
-                        
.withLabels(kubernetesTaskManagerParameters.getLabels())
-                        
.withAnnotations(kubernetesTaskManagerParameters.getAnnotations())
-                        .endMetadata()
-                        .editOrNewSpec()
-                        
.withServiceAccountName(kubernetesTaskManagerParameters.getServiceAccount())
-                        .withRestartPolicy(Constants.RESTART_POLICY_OF_NEVER)
-                        
.withImagePullSecrets(kubernetesTaskManagerParameters.getImagePullSecrets())
-                        
.withNodeSelector(kubernetesTaskManagerParameters.getNodeSelector())
-                        .withTolerations(
-                                
kubernetesTaskManagerParameters.getTolerations().stream()
-                                        .map(
-                                                e ->
-                                                        
KubernetesToleration.fromMap(e)
-                                                                
.getInternalResource())
-                                        .collect(Collectors.toList()))
-                        .endSpec()
-                        .build();
+        final PodBuilder basicPodBuilder = new 
PodBuilder(flinkPod.getPodWithoutMainContainer());
+
+        // Overwrite fields
+        final String serviceAccountName =
+                KubernetesUtils.resolveUserDefinedValue(
+                        flinkConfig,
+                        KubernetesConfigOptions.JOB_MANAGER_SERVICE_ACCOUNT,

Review comment:
       ```suggestion
                           KubernetesConfigOptions.TASK_MANAGER_SERVICE_ACCOUNT,
   ```

##########
File path: 
flink-kubernetes/src/main/java/org/apache/flink/kubernetes/utils/KubernetesUtils.java
##########
@@ -396,6 +405,74 @@ public static String getCommonStartCommand(
                 .collect(Collectors.toList());
     }
 
+    public static FlinkPod loadPodFromTemplateFile(
+            FlinkKubeClient kubeClient, File podTemplateFile, String 
mainContainerName) {
+        final KubernetesPod pod = 
kubeClient.loadPodFromTemplateFile(podTemplateFile);
+        final List<Container> otherContainers = new ArrayList<>();
+        Container mainContainer = null;
+
+        for (Container container : 
pod.getInternalResource().getSpec().getContainers()) {
+            if (mainContainerName.equals(container.getName())) {
+                mainContainer = container;
+            } else {
+                otherContainers.add(container);
+            }
+        }
+
+        if (mainContainer == null) {
+            LOG.info(
+                    "Could not find main container {} in pod template, using 
empty one to initialize.",
+                    mainContainerName);
+            mainContainer = new ContainerBuilder().build();
+        }
+
+        pod.getInternalResource().getSpec().setContainers(otherContainers);
+        return new FlinkPod(pod.getInternalResource(), mainContainer);
+    }
+
+    public static File getTaskManagerPodTemplateFileInPod() {
+        return new File(
+                Constants.POD_TEMPLATE_DIR_IN_POD, 
Constants.TASK_MANAGER_POD_TEMPLATE_FILE_NAME);
+    }
+
+    /**
+     * Resolve the user defined value with the precedence. First an explicit 
config option value is
+     * taken, then the value in pod template and at last the default value of 
a config option if
+     * nothing is specified.
+     *
+     * @param flinkConfig flink configuration
+     * @param configOption the config option to define the Kubernetes fields
+     * @param valueOfConfigOptionOrDefault the value defined by explicit 
config option or default
+     * @param valueOfPodTemplate the value defined in the pod template
+     * @param fieldDescription Kubernetes fields description
+     * @param <T> The type of value associated with the configuration option.
+     * @return the resolved value
+     */
+    public static <T> String resolveUserDefinedValue(
+            Configuration flinkConfig,
+            ConfigOption<T> configOption,
+            String valueOfConfigOptionOrDefault,

Review comment:
       Can we extract this value from `flinkConfig.get(configOption)`? This 
might make the method call a bit easier.

##########
File path: 
docs/content.zh/docs/deployment/resource-providers/native_kubernetes.md
##########
@@ -324,4 +324,240 @@ $ kubectl create clusterrolebinding 
flink-role-binding-flink --clusterrole=edit
 
 Please refer to the official Kubernetes documentation on [RBAC 
Authorization](https://kubernetes.io/docs/reference/access-authn-authz/rbac/) 
for more information.
 
+### Pod Template
+
+Flink allows users to define the JobManager and TaskManager pods via template 
files. This allows to support advanced features
+that are not supported by Flink [Kubernetes config options]({{< ref 
"docs/deployment/config" >}}#kubernetes) directly.
+Use [`kubernetes.pod-template-file`]({{< ref "docs/deployment/config" 
>}}#kubernetes-pod-template-file)
+to specify a local file that contains the pod definition. It will be used to 
initialize the JobManager and TaskManager.
+The main container should be defined with name `flink-main-container`.
+Please refer to the [pod template example](#example-of-pod-template) for more 
information.
+
+#### Fields Overwritten by Flink
+
+Some fields of the pod template will be overwritten by Flink.
+The mechanism for resolving effective field values can be categorized as 
follows:
+* **Defined by Flink:** User cannot configure it.
+* **Defined by the user:** User can freely specify this value. Flink framework 
won't set any additional values and the effective value derives from the config 
option and the template.
+
+  Precedence order: First an explicit config option value is taken, then the 
value in pod template and at last the default value of a config option if 
nothing is specified.
+* **Merged with Flink:** Flink will merge values for a setting with a user 
defined value (see precedence order for "Defined by the user"). Flink values 
have precedence in case of same name fields.
+
+Refer to the following tables for the full list of pod fields that will be 
overwritten.
+All the fields defined in the pod template that are not listed in the tables 
will be unaffected.
+
+**Pod Metadata**
+<table class="table table-bordered">
+    <thead>
+        <tr>
+            <th class="text-left" style="width: 10%">Key</th>
+            <th class="text-left" style="width: 20%">Category</th>
+            <th class="text-left" style="width: 30%">Related Config 
Options</th>
+            <th class="text-left" style="width: 40%">Description</th>
+        </tr>
+    </thead>
+    <tbody>
+        <tr>
+            <td>name</td>
+            <td>Defined by Flink</td>
+            <td></td>
+            <td>The JobManager pod name will be overwritten with the 
deployment which is defined by <a href="{{< ref "docs/deployment/config" 
>}}#kubernetes-cluster-id">kubernetes.cluster-id</a>.
+                The TaskManager pod names will be overwritten with the pattern 
<code>&lt;clusterID&gt;-&lt;attempt&gt;-&lt;index&gt;</code> which is generated 
by Flink ResourceManager.</td>
+        </tr>
+        <tr>
+            <td>namespace</td>
+            <td>Defined by the user</td>
+            <td><a href="{{< ref "docs/deployment/config" 
>}}#kubernetes-namespace">kubernetes.namespace</a></td>
+            <td>Both the JobManager deployment and TaskManager pods will be 
created in the specified namespace.</td>
+        </tr>
+        <tr>
+            <td>ownerReferences</td>
+            <td>Defined by the user</td>
+            <td><a href="{{< ref "docs/deployment/config" 
>}}#kubernetes-jobmanager-owner-reference">kubernetes.jobmanager.owner.reference</a></td>
+            <td>The owner reference of JobManager deployment could be set by 
configuration option.
+                And the owner reference of every TaskManager pod will be set 
to JobManager deployment.</td>

Review comment:
       Maybe say that the value is resolved with respect to the defined 
precedence order for user defined values. Now it sounds as if the configuration 
option is the only way to define the value. This actually applies to all 
"Defined by the user" values.

##########
File path: 
docs/content.zh/docs/deployment/resource-providers/native_kubernetes.md
##########
@@ -324,4 +324,240 @@ $ kubectl create clusterrolebinding 
flink-role-binding-flink --clusterrole=edit
 
 Please refer to the official Kubernetes documentation on [RBAC 
Authorization](https://kubernetes.io/docs/reference/access-authn-authz/rbac/) 
for more information.
 
+### Pod Template
+
+Flink allows users to define the JobManager and TaskManager pods via template 
files. This allows to support advanced features
+that are not supported by Flink [Kubernetes config options]({{< ref 
"docs/deployment/config" >}}#kubernetes) directly.
+Use [`kubernetes.pod-template-file`]({{< ref "docs/deployment/config" 
>}}#kubernetes-pod-template-file)
+to specify a local file that contains the pod definition. It will be used to 
initialize the JobManager and TaskManager.
+The main container should be defined with name `flink-main-container`.
+Please refer to the [pod template example](#example-of-pod-template) for more 
information.
+
+#### Fields Overwritten by Flink
+
+Some fields of the pod template will be overwritten by Flink.
+The mechanism for resolving effective field values can be categorized as 
follows:
+* **Defined by Flink:** User cannot configure it.
+* **Defined by the user:** User can freely specify this value. Flink framework 
won't set any additional values and the effective value derives from the config 
option and the template.
+
+  Precedence order: First an explicit config option value is taken, then the 
value in pod template and at last the default value of a config option if 
nothing is specified.
+* **Merged with Flink:** Flink will merge values for a setting with a user 
defined value (see precedence order for "Defined by the user"). Flink values 
have precedence in case of same name fields.
+
+Refer to the following tables for the full list of pod fields that will be 
overwritten.
+All the fields defined in the pod template that are not listed in the tables 
will be unaffected.
+
+**Pod Metadata**
+<table class="table table-bordered">
+    <thead>
+        <tr>
+            <th class="text-left" style="width: 10%">Key</th>
+            <th class="text-left" style="width: 20%">Category</th>
+            <th class="text-left" style="width: 30%">Related Config 
Options</th>
+            <th class="text-left" style="width: 40%">Description</th>
+        </tr>
+    </thead>
+    <tbody>
+        <tr>
+            <td>name</td>
+            <td>Defined by Flink</td>
+            <td></td>
+            <td>The JobManager pod name will be overwritten with the 
deployment which is defined by <a href="{{< ref "docs/deployment/config" 
>}}#kubernetes-cluster-id">kubernetes.cluster-id</a>.
+                The TaskManager pod names will be overwritten with the pattern 
<code>&lt;clusterID&gt;-&lt;attempt&gt;-&lt;index&gt;</code> which is generated 
by Flink ResourceManager.</td>
+        </tr>
+        <tr>
+            <td>namespace</td>
+            <td>Defined by the user</td>
+            <td><a href="{{< ref "docs/deployment/config" 
>}}#kubernetes-namespace">kubernetes.namespace</a></td>
+            <td>Both the JobManager deployment and TaskManager pods will be 
created in the specified namespace.</td>
+        </tr>
+        <tr>
+            <td>ownerReferences</td>
+            <td>Defined by the user</td>
+            <td><a href="{{< ref "docs/deployment/config" 
>}}#kubernetes-jobmanager-owner-reference">kubernetes.jobmanager.owner.reference</a></td>
+            <td>The owner reference of JobManager deployment could be set by 
configuration option.
+                And the owner reference of every TaskManager pod will be set 
to JobManager deployment.</td>
+        </tr>
+        <tr>
+            <td>annotations</td>
+            <td>Defined by the user</td>
+            <td><a href="{{< ref "docs/deployment/config" 
>}}#kubernetes-jobmanager-annotations">kubernetes.jobmanager.annotations</a>
+                <a href="{{< ref "docs/deployment/config" 
>}}#kubernetes-taskmanager-annotations">kubernetes.taskmanager.annotations</a></td>
+            <td>Flink will add additional annotations specified by the Flink 
configuration options.</td>
+        </tr>
+        <tr>
+            <td>labels</td>
+            <td>Merged with Flink</td>
+            <td><a href="{{< ref "docs/deployment/config" 
>}}#kubernetes-jobmanager-labels">kubernetes.jobmanager.labels</a>
+                <a href="{{< ref "docs/deployment/config" 
>}}#kubernetes-taskmanager-labels">kubernetes.taskmanager.labels</a></td>
+            <td>Flink will add some internal labels to the user defined 
values.</td>
+        </tr>
+    </tbody>
+</table>
+
+**Pod Spec**
+<table class="table table-bordered">
+    <thead>
+        <tr>
+            <th class="text-left" style="width: 10%">Key</th>
+            <th class="text-left" style="width: 20%">Category</th>
+            <th class="text-left" style="width: 30%">Related Config 
Options</th>
+            <th class="text-left" style="width: 40%">Description</th>
+        </tr>
+    </thead>
+    <tbody>
+        <tr>
+            <td>imagePullSecrets</td>
+            <td>Defined by the user</td>
+            <td><a href="{{< ref "docs/deployment/config" 
>}}#kubernetes-container-image-pull-secrets">kubernetes.container.image.pull-secrets</a></td>
+            <td>Flink will add additional pull secrets specified by the Flink 
configuration options.</td>
+        </tr>
+        <tr>
+            <td>nodeSelector</td>
+            <td>Defined by the user</td>
+            <td><a href="{{< ref "docs/deployment/config" 
>}}#kubernetes-jobmanager-node-selector">kubernetes.jobmanager.node-selector</a>
+                <a href="{{< ref "docs/deployment/config" 
>}}#kubernetes-taskmanager-node-selector">kubernetes.taskmanager.node-selector</a></td>
+            <td>Flink will add additional node selectors specified by the 
Flink configuration options.</td>
+        </tr>
+        <tr>
+            <td>tolerations</td>
+            <td>Defined by the user</td>
+            <td><a href="{{< ref "docs/deployment/config" 
>}}#kubernetes-jobmanager-tolerations">kubernetes.jobmanager.tolerations</a>
+                <a href="{{< ref "docs/deployment/config" 
>}}#kubernetes-taskmanager-tolerations">kubernetes.taskmanager.tolerations</a></td>
+            <td>Flink will add additional tolerations specified by the Flink 
configuration options.</td>
+        </tr>
+        <tr>
+            <td>restartPolicy</td>
+            <td>Defined by Flink</td>
+            <td></td>
+            <td>"always" for JobManager pod and "never" for TaskManager pod.
+                <br>
+                The JobManager pod will always be restarted by deployment. And 
the TaskManager pod should not be restarted.</td>
+        </tr>
+        <tr>
+            <td>serviceAccount</td>
+            <td>Defined by the user</td>
+            <td><a href="{{< ref "docs/deployment/config" 
>}}#kubernetes-service-account">kubernetes.service-account</a></td>
+            <td>The JobManager deployment and TaskManager pods will be created 
with the specified service account.</td>
+        </tr>
+        <tr>
+            <td>volumes</td>
+            <td>Merged with Flink</td>
+            <td></td>
+            <td>Flink will add some internal ConfigMap volumes(e.g. 
flink-config-volume, hadoop-config-volume) which is necessary for shipping the 
Flink configuration and hadoop configuration.</td>
+        </tr>
+    </tbody>
+</table>
+
+**Main Container Spec**
+<table class="table table-bordered">
+    <thead>
+        <tr>
+            <th class="text-left" style="width: 10%">Key</th>
+            <th class="text-left" style="width: 20%">Category</th>
+            <th class="text-left" style="width: 30%">Related Config 
Options</th>
+            <th class="text-left" style="width: 40%">Description</th>
+        </tr>
+    </thead>
+    <tbody>
+        <tr>
+            <td>env</td>
+            <td>Merged with Flink</td>
+            <td><a href="{{< ref "docs/deployment/config" 
>}}#forwarding-environment-variables">containerized.master.env.{ENV_NAME}</a>
+                <a href="{{< ref "docs/deployment/config" 
>}}#forwarding-environment-variables">containerized.taskmanager.env.{ENV_NAME}</a></td>
+            <td>Flink will add some internal environment variables to the user 
defined values.</td>
+        </tr>
+        <tr>
+            <td>image</td>
+            <td>Defined by the user</td>
+            <td><a href="{{< ref "docs/deployment/config" 
>}}#kubernetes-container-image">kubernetes.container.image</a></td>
+            <td>The container image will be overwritten by Flink configuration 
option.</td>
+        </tr>
+        <tr>
+            <td>imagePullPolicy</td>
+            <td>Defined by the user</td>
+            <td><a href="{{< ref "docs/deployment/config" 
>}}#kubernetes-container-image-pull-policy">kubernetes.container.image.pull-policy</a></td>
+            <td>The container image pull policy will be overwritten by Flink 
configuration option.</td>
+        </tr>
+        <tr>
+            <td>name</td>
+            <td>Defined by Flink</td>
+            <td></td>
+            <td>The container name will be overwritten by Flink with 
"flink-main-container".</td>
+        </tr>
+        <tr>
+            <td>resources</td>
+            <td>Defined by the user</td>
+            <td>Memory: <br>
+                    <a href="{{< ref "docs/deployment/config" 
>}}#jobmanager-memory-process-size">jobmanager.memory.process.size</a>
+                    <a href="{{< ref "docs/deployment/config" 
>}}#taskmanager-memory-process-size">taskmanager.memory.process.size</a>
+                <br>
+                CPU: <br>
+                    <a href="{{< ref "docs/deployment/config" 
>}}#kubernetes-jobmanager-cpu">kubernetes.jobmanager.cpu</a>
+                    <a href="{{< ref "docs/deployment/config" 
>}}#kubernetes-taskmanager-cpu">kubernetes.taskmanager.cpu</a></td>
+            <td>The memory and cpu resources(including requests and limits) 
will be overwritten by Flink. All other resources(e.g. ephemeral-storage) will 
be retained.</td>

Review comment:
       Did you mean this?
   
   ```suggestion
               <td>The memory and cpu resources(including requests and limits) 
will be overwritten by Flink configuration options. All other resources(e.g. 
ephemeral-storage) will be retained.</td>
   ```

##########
File path: 
flink-kubernetes/src/test/java/org/apache/flink/kubernetes/kubeclient/decorators/PodTemplateMountDecoratorTest.java
##########
@@ -0,0 +1,144 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.kubernetes.kubeclient.decorators;
+
+import org.apache.flink.core.testutils.FlinkMatchers;
+import org.apache.flink.kubernetes.KubernetesTestUtils;
+import org.apache.flink.kubernetes.configuration.KubernetesConfigOptions;
+import org.apache.flink.kubernetes.kubeclient.FlinkPod;
+import org.apache.flink.kubernetes.kubeclient.KubernetesJobManagerTestBase;
+import org.apache.flink.kubernetes.utils.Constants;
+
+import io.fabric8.kubernetes.api.model.ConfigMap;
+import io.fabric8.kubernetes.api.model.HasMetadata;
+import io.fabric8.kubernetes.api.model.KeyToPath;
+import io.fabric8.kubernetes.api.model.KeyToPathBuilder;
+import io.fabric8.kubernetes.api.model.Volume;
+import io.fabric8.kubernetes.api.model.VolumeBuilder;
+import io.fabric8.kubernetes.api.model.VolumeMount;
+import io.fabric8.kubernetes.api.model.VolumeMountBuilder;
+import org.junit.Test;
+
+import java.io.File;
+import java.io.IOException;
+import java.util.Collections;
+import java.util.List;
+import java.util.Map;
+
+import static 
org.apache.flink.kubernetes.utils.Constants.TASK_MANAGER_POD_TEMPLATE_FILE_NAME;
+import static org.hamcrest.Matchers.containsInAnyOrder;
+import static org.hamcrest.Matchers.is;
+import static org.junit.Assert.assertThat;
+import static org.junit.Assert.fail;
+
+/** General tests for the {@link PodTemplateMountDecorator}. */
+public class PodTemplateMountDecoratorTest extends 
KubernetesJobManagerTestBase {
+
+    private static final String POD_TEMPLATE_FILE_NAME = 
"testing-pod-template.yaml";
+    private static final String POD_TEMPLATE_DATA = "taskmanager pod template 
data";
+
+    private PodTemplateMountDecorator podTemplateMountDecorator;
+
+    @Override
+    protected void setupFlinkConfig() {
+        super.setupFlinkConfig();
+
+        this.flinkConfig.set(
+                KubernetesConfigOptions.TASK_MANAGER_POD_TEMPLATE,
+                new File(flinkConfDir, 
POD_TEMPLATE_FILE_NAME).getAbsolutePath());
+    }
+
+    @Override
+    protected void onSetup() throws Exception {
+        super.onSetup();
+
+        this.podTemplateMountDecorator =
+                new PodTemplateMountDecorator(kubernetesJobManagerParameters);
+    }
+
+    @Test
+    public void 
testBuildAccompanyingKubernetesResourcesAddsPodTemplateAsConfigMap()
+            throws IOException {
+        KubernetesTestUtils.createTemporyFile(
+                POD_TEMPLATE_DATA, flinkConfDir, POD_TEMPLATE_FILE_NAME);
+
+        final List<HasMetadata> additionalResources =
+                
podTemplateMountDecorator.buildAccompanyingKubernetesResources();
+        assertThat(additionalResources.size(), is(1));
+
+        final ConfigMap resultConfigMap = (ConfigMap) 
additionalResources.get(0);
+
+        Map<String, String> resultDatas = resultConfigMap.getData();

Review comment:
       ```suggestion
           Map<String, String> resultData = resultConfigMap.getData();
   ```




----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


Reply via email to