tillrohrmann commented on a change in pull request #14629: URL: https://github.com/apache/flink/pull/14629#discussion_r583726233
########## File path: flink-kubernetes/src/main/java/org/apache/flink/kubernetes/kubeclient/decorators/InitJobManagerDecorator.java ########## @@ -49,64 +50,104 @@ public class InitJobManagerDecorator extends AbstractKubernetesStepDecorator { private final KubernetesJobManagerParameters kubernetesJobManagerParameters; + private final Configuration flinkConfig; public InitJobManagerDecorator(KubernetesJobManagerParameters kubernetesJobManagerParameters) { this.kubernetesJobManagerParameters = checkNotNull(kubernetesJobManagerParameters); + this.flinkConfig = checkNotNull(kubernetesJobManagerParameters.getFlinkConfiguration()); } @Override public FlinkPod decorateFlinkPod(FlinkPod flinkPod) { - final Pod basicPod = - new PodBuilder(flinkPod.getPod()) - .withApiVersion(API_VERSION) - .editOrNewMetadata() - .withLabels(kubernetesJobManagerParameters.getLabels()) - .withAnnotations(kubernetesJobManagerParameters.getAnnotations()) - .endMetadata() - .editOrNewSpec() - .withServiceAccountName(kubernetesJobManagerParameters.getServiceAccount()) - .withImagePullSecrets(kubernetesJobManagerParameters.getImagePullSecrets()) - .withNodeSelector(kubernetesJobManagerParameters.getNodeSelector()) - .withTolerations( - kubernetesJobManagerParameters.getTolerations().stream() - .map( - e -> - KubernetesToleration.fromMap(e) - .getInternalResource()) - .collect(Collectors.toList())) - .endSpec() - .build(); + final PodBuilder basicPodBuilder = new PodBuilder(flinkPod.getPodWithoutMainContainer()); + + // Overwrite fields + final String serviceAccountName = + KubernetesUtils.resolveUserDefinedValue( + flinkConfig, + KubernetesConfigOptions.JOB_MANAGER_SERVICE_ACCOUNT, + kubernetesJobManagerParameters.getServiceAccount(), + flinkPod.getPodWithoutMainContainer().getSpec().getServiceAccount(), + "service account name"); + if (flinkPod.getPodWithoutMainContainer().getSpec().getRestartPolicy() != null) { + logger.info( + "The restart policy of JobManager pod will be overwritten to 'always' " + + "since it is controlled by the Kubernetes deployment."); Review comment: Is `always` the default or why don't we set it explicity via `withRestartPolicy("Always")`? It just a bit confusing that I don't see the builder call further down. ########## File path: flink-kubernetes/src/main/java/org/apache/flink/kubernetes/kubeclient/decorators/InitTaskManagerDecorator.java ########## @@ -41,64 +42,108 @@ public class InitTaskManagerDecorator extends AbstractKubernetesStepDecorator { private final KubernetesTaskManagerParameters kubernetesTaskManagerParameters; + private final Configuration flinkConfig; public InitTaskManagerDecorator( KubernetesTaskManagerParameters kubernetesTaskManagerParameters) { this.kubernetesTaskManagerParameters = checkNotNull(kubernetesTaskManagerParameters); + this.flinkConfig = checkNotNull(kubernetesTaskManagerParameters.getFlinkConfiguration()); } @Override public FlinkPod decorateFlinkPod(FlinkPod flinkPod) { - final Pod basicPod = - new PodBuilder(flinkPod.getPod()) - .withApiVersion(Constants.API_VERSION) - .editOrNewMetadata() - .withName(kubernetesTaskManagerParameters.getPodName()) - .withLabels(kubernetesTaskManagerParameters.getLabels()) - .withAnnotations(kubernetesTaskManagerParameters.getAnnotations()) - .endMetadata() - .editOrNewSpec() - .withServiceAccountName(kubernetesTaskManagerParameters.getServiceAccount()) - .withRestartPolicy(Constants.RESTART_POLICY_OF_NEVER) - .withImagePullSecrets(kubernetesTaskManagerParameters.getImagePullSecrets()) - .withNodeSelector(kubernetesTaskManagerParameters.getNodeSelector()) - .withTolerations( - kubernetesTaskManagerParameters.getTolerations().stream() - .map( - e -> - KubernetesToleration.fromMap(e) - .getInternalResource()) - .collect(Collectors.toList())) - .endSpec() - .build(); + final PodBuilder basicPodBuilder = new PodBuilder(flinkPod.getPodWithoutMainContainer()); + + // Overwrite fields + final String serviceAccountName = + KubernetesUtils.resolveUserDefinedValue( + flinkConfig, + KubernetesConfigOptions.JOB_MANAGER_SERVICE_ACCOUNT, Review comment: ```suggestion KubernetesConfigOptions.TASK_MANAGER_SERVICE_ACCOUNT, ``` ########## File path: flink-kubernetes/src/main/java/org/apache/flink/kubernetes/utils/KubernetesUtils.java ########## @@ -396,6 +405,74 @@ public static String getCommonStartCommand( .collect(Collectors.toList()); } + public static FlinkPod loadPodFromTemplateFile( + FlinkKubeClient kubeClient, File podTemplateFile, String mainContainerName) { + final KubernetesPod pod = kubeClient.loadPodFromTemplateFile(podTemplateFile); + final List<Container> otherContainers = new ArrayList<>(); + Container mainContainer = null; + + for (Container container : pod.getInternalResource().getSpec().getContainers()) { + if (mainContainerName.equals(container.getName())) { + mainContainer = container; + } else { + otherContainers.add(container); + } + } + + if (mainContainer == null) { + LOG.info( + "Could not find main container {} in pod template, using empty one to initialize.", + mainContainerName); + mainContainer = new ContainerBuilder().build(); + } + + pod.getInternalResource().getSpec().setContainers(otherContainers); + return new FlinkPod(pod.getInternalResource(), mainContainer); + } + + public static File getTaskManagerPodTemplateFileInPod() { + return new File( + Constants.POD_TEMPLATE_DIR_IN_POD, Constants.TASK_MANAGER_POD_TEMPLATE_FILE_NAME); + } + + /** + * Resolve the user defined value with the precedence. First an explicit config option value is + * taken, then the value in pod template and at last the default value of a config option if + * nothing is specified. + * + * @param flinkConfig flink configuration + * @param configOption the config option to define the Kubernetes fields + * @param valueOfConfigOptionOrDefault the value defined by explicit config option or default + * @param valueOfPodTemplate the value defined in the pod template + * @param fieldDescription Kubernetes fields description + * @param <T> The type of value associated with the configuration option. + * @return the resolved value + */ + public static <T> String resolveUserDefinedValue( + Configuration flinkConfig, + ConfigOption<T> configOption, + String valueOfConfigOptionOrDefault, Review comment: Can we extract this value from `flinkConfig.get(configOption)`? This might make the method call a bit easier. ########## File path: docs/content.zh/docs/deployment/resource-providers/native_kubernetes.md ########## @@ -324,4 +324,240 @@ $ kubectl create clusterrolebinding flink-role-binding-flink --clusterrole=edit Please refer to the official Kubernetes documentation on [RBAC Authorization](https://kubernetes.io/docs/reference/access-authn-authz/rbac/) for more information. +### Pod Template + +Flink allows users to define the JobManager and TaskManager pods via template files. This allows to support advanced features +that are not supported by Flink [Kubernetes config options]({{< ref "docs/deployment/config" >}}#kubernetes) directly. +Use [`kubernetes.pod-template-file`]({{< ref "docs/deployment/config" >}}#kubernetes-pod-template-file) +to specify a local file that contains the pod definition. It will be used to initialize the JobManager and TaskManager. +The main container should be defined with name `flink-main-container`. +Please refer to the [pod template example](#example-of-pod-template) for more information. + +#### Fields Overwritten by Flink + +Some fields of the pod template will be overwritten by Flink. +The mechanism for resolving effective field values can be categorized as follows: +* **Defined by Flink:** User cannot configure it. +* **Defined by the user:** User can freely specify this value. Flink framework won't set any additional values and the effective value derives from the config option and the template. + + Precedence order: First an explicit config option value is taken, then the value in pod template and at last the default value of a config option if nothing is specified. +* **Merged with Flink:** Flink will merge values for a setting with a user defined value (see precedence order for "Defined by the user"). Flink values have precedence in case of same name fields. + +Refer to the following tables for the full list of pod fields that will be overwritten. +All the fields defined in the pod template that are not listed in the tables will be unaffected. + +**Pod Metadata** +<table class="table table-bordered"> + <thead> + <tr> + <th class="text-left" style="width: 10%">Key</th> + <th class="text-left" style="width: 20%">Category</th> + <th class="text-left" style="width: 30%">Related Config Options</th> + <th class="text-left" style="width: 40%">Description</th> + </tr> + </thead> + <tbody> + <tr> + <td>name</td> + <td>Defined by Flink</td> + <td></td> + <td>The JobManager pod name will be overwritten with the deployment which is defined by <a href="{{< ref "docs/deployment/config" >}}#kubernetes-cluster-id">kubernetes.cluster-id</a>. + The TaskManager pod names will be overwritten with the pattern <code><clusterID>-<attempt>-<index></code> which is generated by Flink ResourceManager.</td> + </tr> + <tr> + <td>namespace</td> + <td>Defined by the user</td> + <td><a href="{{< ref "docs/deployment/config" >}}#kubernetes-namespace">kubernetes.namespace</a></td> + <td>Both the JobManager deployment and TaskManager pods will be created in the specified namespace.</td> + </tr> + <tr> + <td>ownerReferences</td> + <td>Defined by the user</td> + <td><a href="{{< ref "docs/deployment/config" >}}#kubernetes-jobmanager-owner-reference">kubernetes.jobmanager.owner.reference</a></td> + <td>The owner reference of JobManager deployment could be set by configuration option. + And the owner reference of every TaskManager pod will be set to JobManager deployment.</td> Review comment: Maybe say that the value is resolved with respect to the defined precedence order for user defined values. Now it sounds as if the configuration option is the only way to define the value. This actually applies to all "Defined by the user" values. ########## File path: docs/content.zh/docs/deployment/resource-providers/native_kubernetes.md ########## @@ -324,4 +324,240 @@ $ kubectl create clusterrolebinding flink-role-binding-flink --clusterrole=edit Please refer to the official Kubernetes documentation on [RBAC Authorization](https://kubernetes.io/docs/reference/access-authn-authz/rbac/) for more information. +### Pod Template + +Flink allows users to define the JobManager and TaskManager pods via template files. This allows to support advanced features +that are not supported by Flink [Kubernetes config options]({{< ref "docs/deployment/config" >}}#kubernetes) directly. +Use [`kubernetes.pod-template-file`]({{< ref "docs/deployment/config" >}}#kubernetes-pod-template-file) +to specify a local file that contains the pod definition. It will be used to initialize the JobManager and TaskManager. +The main container should be defined with name `flink-main-container`. +Please refer to the [pod template example](#example-of-pod-template) for more information. + +#### Fields Overwritten by Flink + +Some fields of the pod template will be overwritten by Flink. +The mechanism for resolving effective field values can be categorized as follows: +* **Defined by Flink:** User cannot configure it. +* **Defined by the user:** User can freely specify this value. Flink framework won't set any additional values and the effective value derives from the config option and the template. + + Precedence order: First an explicit config option value is taken, then the value in pod template and at last the default value of a config option if nothing is specified. +* **Merged with Flink:** Flink will merge values for a setting with a user defined value (see precedence order for "Defined by the user"). Flink values have precedence in case of same name fields. + +Refer to the following tables for the full list of pod fields that will be overwritten. +All the fields defined in the pod template that are not listed in the tables will be unaffected. + +**Pod Metadata** +<table class="table table-bordered"> + <thead> + <tr> + <th class="text-left" style="width: 10%">Key</th> + <th class="text-left" style="width: 20%">Category</th> + <th class="text-left" style="width: 30%">Related Config Options</th> + <th class="text-left" style="width: 40%">Description</th> + </tr> + </thead> + <tbody> + <tr> + <td>name</td> + <td>Defined by Flink</td> + <td></td> + <td>The JobManager pod name will be overwritten with the deployment which is defined by <a href="{{< ref "docs/deployment/config" >}}#kubernetes-cluster-id">kubernetes.cluster-id</a>. + The TaskManager pod names will be overwritten with the pattern <code><clusterID>-<attempt>-<index></code> which is generated by Flink ResourceManager.</td> + </tr> + <tr> + <td>namespace</td> + <td>Defined by the user</td> + <td><a href="{{< ref "docs/deployment/config" >}}#kubernetes-namespace">kubernetes.namespace</a></td> + <td>Both the JobManager deployment and TaskManager pods will be created in the specified namespace.</td> + </tr> + <tr> + <td>ownerReferences</td> + <td>Defined by the user</td> + <td><a href="{{< ref "docs/deployment/config" >}}#kubernetes-jobmanager-owner-reference">kubernetes.jobmanager.owner.reference</a></td> + <td>The owner reference of JobManager deployment could be set by configuration option. + And the owner reference of every TaskManager pod will be set to JobManager deployment.</td> + </tr> + <tr> + <td>annotations</td> + <td>Defined by the user</td> + <td><a href="{{< ref "docs/deployment/config" >}}#kubernetes-jobmanager-annotations">kubernetes.jobmanager.annotations</a> + <a href="{{< ref "docs/deployment/config" >}}#kubernetes-taskmanager-annotations">kubernetes.taskmanager.annotations</a></td> + <td>Flink will add additional annotations specified by the Flink configuration options.</td> + </tr> + <tr> + <td>labels</td> + <td>Merged with Flink</td> + <td><a href="{{< ref "docs/deployment/config" >}}#kubernetes-jobmanager-labels">kubernetes.jobmanager.labels</a> + <a href="{{< ref "docs/deployment/config" >}}#kubernetes-taskmanager-labels">kubernetes.taskmanager.labels</a></td> + <td>Flink will add some internal labels to the user defined values.</td> + </tr> + </tbody> +</table> + +**Pod Spec** +<table class="table table-bordered"> + <thead> + <tr> + <th class="text-left" style="width: 10%">Key</th> + <th class="text-left" style="width: 20%">Category</th> + <th class="text-left" style="width: 30%">Related Config Options</th> + <th class="text-left" style="width: 40%">Description</th> + </tr> + </thead> + <tbody> + <tr> + <td>imagePullSecrets</td> + <td>Defined by the user</td> + <td><a href="{{< ref "docs/deployment/config" >}}#kubernetes-container-image-pull-secrets">kubernetes.container.image.pull-secrets</a></td> + <td>Flink will add additional pull secrets specified by the Flink configuration options.</td> + </tr> + <tr> + <td>nodeSelector</td> + <td>Defined by the user</td> + <td><a href="{{< ref "docs/deployment/config" >}}#kubernetes-jobmanager-node-selector">kubernetes.jobmanager.node-selector</a> + <a href="{{< ref "docs/deployment/config" >}}#kubernetes-taskmanager-node-selector">kubernetes.taskmanager.node-selector</a></td> + <td>Flink will add additional node selectors specified by the Flink configuration options.</td> + </tr> + <tr> + <td>tolerations</td> + <td>Defined by the user</td> + <td><a href="{{< ref "docs/deployment/config" >}}#kubernetes-jobmanager-tolerations">kubernetes.jobmanager.tolerations</a> + <a href="{{< ref "docs/deployment/config" >}}#kubernetes-taskmanager-tolerations">kubernetes.taskmanager.tolerations</a></td> + <td>Flink will add additional tolerations specified by the Flink configuration options.</td> + </tr> + <tr> + <td>restartPolicy</td> + <td>Defined by Flink</td> + <td></td> + <td>"always" for JobManager pod and "never" for TaskManager pod. + <br> + The JobManager pod will always be restarted by deployment. And the TaskManager pod should not be restarted.</td> + </tr> + <tr> + <td>serviceAccount</td> + <td>Defined by the user</td> + <td><a href="{{< ref "docs/deployment/config" >}}#kubernetes-service-account">kubernetes.service-account</a></td> + <td>The JobManager deployment and TaskManager pods will be created with the specified service account.</td> + </tr> + <tr> + <td>volumes</td> + <td>Merged with Flink</td> + <td></td> + <td>Flink will add some internal ConfigMap volumes(e.g. flink-config-volume, hadoop-config-volume) which is necessary for shipping the Flink configuration and hadoop configuration.</td> + </tr> + </tbody> +</table> + +**Main Container Spec** +<table class="table table-bordered"> + <thead> + <tr> + <th class="text-left" style="width: 10%">Key</th> + <th class="text-left" style="width: 20%">Category</th> + <th class="text-left" style="width: 30%">Related Config Options</th> + <th class="text-left" style="width: 40%">Description</th> + </tr> + </thead> + <tbody> + <tr> + <td>env</td> + <td>Merged with Flink</td> + <td><a href="{{< ref "docs/deployment/config" >}}#forwarding-environment-variables">containerized.master.env.{ENV_NAME}</a> + <a href="{{< ref "docs/deployment/config" >}}#forwarding-environment-variables">containerized.taskmanager.env.{ENV_NAME}</a></td> + <td>Flink will add some internal environment variables to the user defined values.</td> + </tr> + <tr> + <td>image</td> + <td>Defined by the user</td> + <td><a href="{{< ref "docs/deployment/config" >}}#kubernetes-container-image">kubernetes.container.image</a></td> + <td>The container image will be overwritten by Flink configuration option.</td> + </tr> + <tr> + <td>imagePullPolicy</td> + <td>Defined by the user</td> + <td><a href="{{< ref "docs/deployment/config" >}}#kubernetes-container-image-pull-policy">kubernetes.container.image.pull-policy</a></td> + <td>The container image pull policy will be overwritten by Flink configuration option.</td> + </tr> + <tr> + <td>name</td> + <td>Defined by Flink</td> + <td></td> + <td>The container name will be overwritten by Flink with "flink-main-container".</td> + </tr> + <tr> + <td>resources</td> + <td>Defined by the user</td> + <td>Memory: <br> + <a href="{{< ref "docs/deployment/config" >}}#jobmanager-memory-process-size">jobmanager.memory.process.size</a> + <a href="{{< ref "docs/deployment/config" >}}#taskmanager-memory-process-size">taskmanager.memory.process.size</a> + <br> + CPU: <br> + <a href="{{< ref "docs/deployment/config" >}}#kubernetes-jobmanager-cpu">kubernetes.jobmanager.cpu</a> + <a href="{{< ref "docs/deployment/config" >}}#kubernetes-taskmanager-cpu">kubernetes.taskmanager.cpu</a></td> + <td>The memory and cpu resources(including requests and limits) will be overwritten by Flink. All other resources(e.g. ephemeral-storage) will be retained.</td> Review comment: Did you mean this? ```suggestion <td>The memory and cpu resources(including requests and limits) will be overwritten by Flink configuration options. All other resources(e.g. ephemeral-storage) will be retained.</td> ``` ########## File path: flink-kubernetes/src/test/java/org/apache/flink/kubernetes/kubeclient/decorators/PodTemplateMountDecoratorTest.java ########## @@ -0,0 +1,144 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.flink.kubernetes.kubeclient.decorators; + +import org.apache.flink.core.testutils.FlinkMatchers; +import org.apache.flink.kubernetes.KubernetesTestUtils; +import org.apache.flink.kubernetes.configuration.KubernetesConfigOptions; +import org.apache.flink.kubernetes.kubeclient.FlinkPod; +import org.apache.flink.kubernetes.kubeclient.KubernetesJobManagerTestBase; +import org.apache.flink.kubernetes.utils.Constants; + +import io.fabric8.kubernetes.api.model.ConfigMap; +import io.fabric8.kubernetes.api.model.HasMetadata; +import io.fabric8.kubernetes.api.model.KeyToPath; +import io.fabric8.kubernetes.api.model.KeyToPathBuilder; +import io.fabric8.kubernetes.api.model.Volume; +import io.fabric8.kubernetes.api.model.VolumeBuilder; +import io.fabric8.kubernetes.api.model.VolumeMount; +import io.fabric8.kubernetes.api.model.VolumeMountBuilder; +import org.junit.Test; + +import java.io.File; +import java.io.IOException; +import java.util.Collections; +import java.util.List; +import java.util.Map; + +import static org.apache.flink.kubernetes.utils.Constants.TASK_MANAGER_POD_TEMPLATE_FILE_NAME; +import static org.hamcrest.Matchers.containsInAnyOrder; +import static org.hamcrest.Matchers.is; +import static org.junit.Assert.assertThat; +import static org.junit.Assert.fail; + +/** General tests for the {@link PodTemplateMountDecorator}. */ +public class PodTemplateMountDecoratorTest extends KubernetesJobManagerTestBase { + + private static final String POD_TEMPLATE_FILE_NAME = "testing-pod-template.yaml"; + private static final String POD_TEMPLATE_DATA = "taskmanager pod template data"; + + private PodTemplateMountDecorator podTemplateMountDecorator; + + @Override + protected void setupFlinkConfig() { + super.setupFlinkConfig(); + + this.flinkConfig.set( + KubernetesConfigOptions.TASK_MANAGER_POD_TEMPLATE, + new File(flinkConfDir, POD_TEMPLATE_FILE_NAME).getAbsolutePath()); + } + + @Override + protected void onSetup() throws Exception { + super.onSetup(); + + this.podTemplateMountDecorator = + new PodTemplateMountDecorator(kubernetesJobManagerParameters); + } + + @Test + public void testBuildAccompanyingKubernetesResourcesAddsPodTemplateAsConfigMap() + throws IOException { + KubernetesTestUtils.createTemporyFile( + POD_TEMPLATE_DATA, flinkConfDir, POD_TEMPLATE_FILE_NAME); + + final List<HasMetadata> additionalResources = + podTemplateMountDecorator.buildAccompanyingKubernetesResources(); + assertThat(additionalResources.size(), is(1)); + + final ConfigMap resultConfigMap = (ConfigMap) additionalResources.get(0); + + Map<String, String> resultDatas = resultConfigMap.getData(); Review comment: ```suggestion Map<String, String> resultData = resultConfigMap.getData(); ``` ---------------------------------------------------------------- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org