xintongsong commented on a change in pull request #14629:
URL: https://github.com/apache/flink/pull/14629#discussion_r579053748



##########
File path: 
flink-kubernetes/src/main/java/org/apache/flink/kubernetes/kubeclient/parameters/KubernetesParameters.java
##########
@@ -107,4 +107,7 @@
      * container(s).
      */
     List<Map<String, String>> getEnvironmentsFromSecrets();
+
+    /** The pod template file path. */
+    String getPodTemplateFilePath();

Review comment:
       We could return `Optional<String>` to force the existence check.

##########
File path: 
flink-kubernetes/src/main/java/org/apache/flink/kubernetes/utils/KubernetesUtils.java
##########
@@ -396,6 +401,42 @@ public static String getCommonStartCommand(
                 .collect(Collectors.toList());
     }
 
+    public static FlinkPod loadPodFromTemplateFile(
+            FlinkKubeClient kubeClient, @Nullable String path, String 
mainContainerName) {
+        if (path == null || !new File(path).exists()) {
+            return new FlinkPod.Builder().build();
+        }
+
+        final KubernetesPod pod = kubeClient.loadPodFromTemplateFile(path);
+        final List<Container> otherContainers = new ArrayList<>();
+        Container mainContainer = null;
+
+        for (Container container : 
pod.getInternalResource().getSpec().getContainers()) {
+            if (container.getName().equals(mainContainerName)) {

Review comment:
       Are we sure that `container.getName()` is non-null?
   Maybe `mainContainerName.equals(container.getName())`?

##########
File path: 
flink-kubernetes/src/test/java/org/apache/flink/kubernetes/utils/KubernetesUtilsTest.java
##########
@@ -75,6 +88,155 @@ public void testCheckWithFixedPort() {
         testCheckAndUpdatePortConfigOption("6123", "16123", "6123");
     }
 
+    @Test
+    public void testLoadPodFromTemplateWithNullPathShouldReturnEmptyPod() {
+        final FlinkPod flinkPod =
+                KubernetesUtils.loadPodFromTemplateFile(
+                        flinkKubeClient, null, 
JOB_MANAGER_MAIN_CONTAINER_NAME);
+        assertThat(flinkPod.getPod(), is(EMPTY_POD.getPod()));
+        assertThat(flinkPod.getMainContainer(), 
is(EMPTY_POD.getMainContainer()));
+    }
+
+    @Test
+    public void testLoadPodFromTemplateWithNonExistPathShouldReturnEmptyPod() {
+        final FlinkPod flinkPod =
+                KubernetesUtils.loadPodFromTemplateFile(
+                        flinkKubeClient,
+                        "/path/of/non-exist.yaml",
+                        JOB_MANAGER_MAIN_CONTAINER_NAME);
+        assertThat(flinkPod.getPod(), is(EMPTY_POD.getPod()));
+        assertThat(flinkPod.getMainContainer(), 
is(EMPTY_POD.getMainContainer()));
+    }
+
+    @Test
+    public void 
testLoadPodFromTemplateWithNoMainContainerShouldReturnEmptyPod() {
+        final FlinkPod flinkPod =
+                KubernetesUtils.loadPodFromTemplateFile(
+                        flinkKubeClient,
+                        KubernetesTestUtils.getJobManagerPodTemplatePath(),
+                        "nonExistMainContainer");
+        assertThat(flinkPod.getMainContainer(), 
is(EMPTY_POD.getMainContainer()));
+        assertThat(flinkPod.getPod().getSpec().getContainers().size(), is(2));
+    }
+
+    @Test
+    public void testLoadPodFromTemplateAndCheckMetaData() {
+        final FlinkPod flinkPod =
+                KubernetesUtils.loadPodFromTemplateFile(
+                        flinkKubeClient,
+                        KubernetesTestUtils.getJobManagerPodTemplatePath(),
+                        JOB_MANAGER_MAIN_CONTAINER_NAME);
+        assertThat(flinkPod.getPod().getMetadata().getName(), 
is("pod-template"));
+    }
+
+    @Test
+    public void testLoadPodFromTemplateAndCheckInitContainer() {
+        final FlinkPod flinkPod =
+                KubernetesUtils.loadPodFromTemplateFile(
+                        flinkKubeClient,
+                        KubernetesTestUtils.getJobManagerPodTemplatePath(),
+                        JOB_MANAGER_MAIN_CONTAINER_NAME);
+        assertThat(flinkPod.getPod().getSpec().getInitContainers().size(), 
is(1));
+        Container expectedInitContainer =
+                new ContainerBuilder()
+                        .withName("artifacts-fetcher")
+                        .withImage("busybox")
+                        .withCommand(
+                                "wget",
+                                "https://path/of/StateMachineExample.jar";,
+                                "-O",
+                                "/flink-artifact/myjob.jar")
+                        .withVolumeMounts(
+                                new VolumeMountBuilder()
+                                        .withName("flink-artifact")
+                                        .withMountPath("/flink-artifact")
+                                        .build())
+                        .build();
+        assertThat(
+                flinkPod.getPod().getSpec().getInitContainers().get(0), 
is(expectedInitContainer));
+    }
+
+    @Test
+    public void testLoadPodFromTemplateAndCheckMainContainer() {
+        final FlinkPod flinkPod =
+                KubernetesUtils.loadPodFromTemplateFile(
+                        flinkKubeClient,
+                        KubernetesTestUtils.getJobManagerPodTemplatePath(),
+                        JOB_MANAGER_MAIN_CONTAINER_NAME);
+        assertThat(flinkPod.getMainContainer().getName(), 
is(JOB_MANAGER_MAIN_CONTAINER_NAME));
+        final VolumeMount[] expectedVolumeMounts =
+                new VolumeMount[] {
+                    new VolumeMountBuilder()
+                            .withName("flink-volume-hostpath")
+                            .withMountPath("/opt/flink/volumes/hostpath")
+                            .build(),
+                    new VolumeMountBuilder()
+                            .withName("flink-artifact")
+                            .withMountPath("/opt/flink/artifacts")
+                            .build(),
+                    new VolumeMountBuilder()
+                            .withName("flink-logs")
+                            .withMountPath("/opt/flink/log")
+                            .build()
+                };
+        assertThat(
+                flinkPod.getMainContainer().getVolumeMounts(),
+                containsInAnyOrder(expectedVolumeMounts));
+    }
+
+    @Test
+    public void testLoadPodFromTemplateAndCheckSideCarContainer() {
+        final FlinkPod flinkPod =
+                KubernetesUtils.loadPodFromTemplateFile(
+                        flinkKubeClient,
+                        KubernetesTestUtils.getJobManagerPodTemplatePath(),
+                        JOB_MANAGER_MAIN_CONTAINER_NAME);
+        assertThat(flinkPod.getPod().getSpec().getContainers().size(), is(1));
+        Container expectedSideCarContainer =
+                new ContainerBuilder()
+                        .withName("sidecar-log-collector")
+                        .withImage("busybox")
+                        .withCommand("command-to-upload", 
"/flink-logs/jobmanager.log")
+                        .withVolumeMounts(
+                                new VolumeMountBuilder()
+                                        .withName("flink-logs")
+                                        .withMountPath("/flink-logs")
+                                        .build())
+                        .build();
+        assertThat(
+                flinkPod.getPod().getSpec().getContainers().get(0), 
is(expectedSideCarContainer));
+    }
+
+    @Test
+    public void testLoadPodFromTemplateAndCheckVolumes() {
+        final FlinkPod flinkPod =
+                KubernetesUtils.loadPodFromTemplateFile(
+                        flinkKubeClient,
+                        KubernetesTestUtils.getJobManagerPodTemplatePath(),
+                        JOB_MANAGER_MAIN_CONTAINER_NAME);

Review comment:
       I wonder do we really need to introduce two testing template files. Can 
we introduce one template file and point both JM/TM configuration options to it?

##########
File path: 
flink-kubernetes/src/test/java/org/apache/flink/kubernetes/kubeclient/decorators/FlinkConfMountDecoratorTest.java
##########
@@ -243,4 +217,65 @@ public void testDecoratedFlinkContainer() {
         assertEquals(Constants.FLINK_CONF_VOLUME, volumeMount.getName());
         assertEquals(FLINK_CONF_DIR_IN_POD, volumeMount.getMountPath());
     }
+
+    @Test
+    public void testConfigMapWithTaskManagerPodTemplate() throws IOException {
+        final String podTemplateData = "taskmanager pod template data";
+        KubernetesTestUtils.createTemporyFile(
+                podTemplateData, flinkConfDir, 
TASK_MANAGER_POD_TEMPLATE_FILE_NAME);

Review comment:
       Deduplicate with `prepareTaskManagerPodTemplateLocalFile`.

##########
File path: 
flink-kubernetes/src/test/java/org/apache/flink/kubernetes/kubeclient/decorators/FlinkConfMountDecoratorTest.java
##########
@@ -65,6 +68,9 @@ protected void setupFlinkConfig() {
         super.setupFlinkConfig();
 
         this.flinkConfig.set(KubernetesConfigOptions.FLINK_CONF_DIR, 
FLINK_CONF_DIR_IN_POD);
+        this.flinkConfig.set(
+                KubernetesConfigOptions.TASK_MANAGER_POD_TEMPLATE,
+                new File(flinkConfDir, 
TASK_MANAGER_POD_TEMPLATE_FILE_NAME).getAbsolutePath());

Review comment:
       Maybe use an arbitrary temp folder and file name.

##########
File path: 
flink-kubernetes/src/main/java/org/apache/flink/kubernetes/KubernetesResourceManagerDriver.java
##########
@@ -149,7 +157,8 @@ public void deregisterApplication(
         final KubernetesTaskManagerParameters parameters =
                 createKubernetesTaskManagerParameters(taskExecutorProcessSpec);
         final KubernetesPod taskManagerPod =
-                
KubernetesTaskManagerFactory.buildTaskManagerKubernetesPod(parameters);
+                KubernetesTaskManagerFactory.buildTaskManagerKubernetesPod(
+                        
Preconditions.checkNotNull(taskManagerPodTemplate).clone(), parameters);

Review comment:
       I would suggest to wrap the null-checking, cloning and pod building into 
`buildTaskManagerPodFromTemplate`.

##########
File path: 
flink-kubernetes/src/main/java/org/apache/flink/kubernetes/kubeclient/FlinkPod.java
##########
@@ -48,6 +48,12 @@ public Container getMainContainer() {
         return mainContainer;
     }
 
+    public FlinkPod clone() {

Review comment:
       `@Override`

##########
File path: 
flink-kubernetes/src/main/java/org/apache/flink/kubernetes/kubeclient/parameters/KubernetesTaskManagerParameters.java
##########
@@ -99,6 +99,11 @@ public KubernetesTaskManagerParameters(
                 .orElse(Collections.emptyList());
     }
 
+    @Override
+    public String getPodTemplateFilePath() {
+        return KubernetesUtils.getTaskManagerPodTemplatePathInPod(flinkConfig);
+    }

Review comment:
       This implementation for TM is never used.

##########
File path: 
flink-kubernetes/src/main/java/org/apache/flink/kubernetes/kubeclient/FlinkPod.java
##########
@@ -48,6 +48,12 @@ public Container getMainContainer() {
         return mainContainer;
     }
 
+    public FlinkPod clone() {
+        return new FlinkPod(
+                new PodBuilder(this.getPod()).build(),
+                new ContainerBuilder(this.getMainContainer()).build());

Review comment:
       It's super implicit that `FlinkPod#pod` is guaranteed not containing the 
main container, and the main container is only added to the pod in 
`KubernetesJob/TaskManagerFactory`. Maybe we should rename it to 
`FlinkPod#podWithoutMainContainer`.
   
   It's not introduced by this PR though.

##########
File path: 
flink-kubernetes/src/main/java/org/apache/flink/kubernetes/utils/KubernetesUtils.java
##########
@@ -309,12 +309,15 @@ public static CompletedCheckpointStore 
createCompletedCheckpointStore(
      * @return KubernetesResource requirements.
      */
     public static ResourceRequirements getResourceRequirements(

Review comment:
       JavaDoc not updated

##########
File path: 
flink-kubernetes/src/test/java/org/apache/flink/kubernetes/kubeclient/factory/KubernetesJobManagerFactoryTest.java
##########
@@ -459,4 +464,28 @@ public void testEmptyHadoopConfDirectory() throws 
IOException {
                                                                 
.getHadoopConfConfigMapName(
                                                                         
CLUSTER_ID))));
     }
+
+    @Test
+    public void testCreateJobManagerDeploymentWithPodTemplate() throws 
Exception {
+        final FlinkPod flinkPod =
+                KubernetesUtils.loadPodFromTemplateFile(
+                        flinkKubeClient,
+                        KubernetesTestUtils.getJobManagerPodTemplatePath(),
+                        
KubernetesJobManagerParameters.JOB_MANAGER_MAIN_CONTAINER_NAME);
+
+        final KubernetesJobManagerSpecification 
kubernetesJobManagerSpecification =
+                
KubernetesJobManagerFactory.buildKubernetesJobManagerSpecification(
+                        flinkPod, kubernetesJobManagerParameters);
+
+        final PodTemplateSpec resultPod =
+                
kubernetesJobManagerSpecification.getDeployment().getSpec().getTemplate();
+        // Pod template has been applied successfully.
+        assertThat(resultPod.getSpec().getInitContainers().size(), is(1));
+        assertThat(resultPod.getSpec().getContainers().size(), is(2));
+        assertThat(
+                resultPod.getSpec().getContainers().stream()
+                        .map(Container::getName)
+                        .collect(Collectors.toList()),
+                containsInAnyOrder("flink-job-manager", 
"sidecar-log-collector"));

Review comment:
       I think we need to verify that other functions also take effect after 
going through all the decorators, like what we do in `KubernetesUtilsTest` 
(metadata, init container, volumes, etc.) and 
`InitJob/TaskManagerDecoratorWithPodTemplateTest` (labels, annotations, env, 
etc.).

##########
File path: 
flink-kubernetes/src/test/java/org/apache/flink/kubernetes/utils/KubernetesUtilsTest.java
##########
@@ -75,6 +88,155 @@ public void testCheckWithFixedPort() {
         testCheckAndUpdatePortConfigOption("6123", "16123", "6123");
     }
 
+    @Test
+    public void testLoadPodFromTemplateWithNullPathShouldReturnEmptyPod() {
+        final FlinkPod flinkPod =
+                KubernetesUtils.loadPodFromTemplateFile(
+                        flinkKubeClient, null, 
JOB_MANAGER_MAIN_CONTAINER_NAME);
+        assertThat(flinkPod.getPod(), is(EMPTY_POD.getPod()));
+        assertThat(flinkPod.getMainContainer(), 
is(EMPTY_POD.getMainContainer()));
+    }
+
+    @Test
+    public void testLoadPodFromTemplateWithNonExistPathShouldReturnEmptyPod() {
+        final FlinkPod flinkPod =
+                KubernetesUtils.loadPodFromTemplateFile(
+                        flinkKubeClient,
+                        "/path/of/non-exist.yaml",
+                        JOB_MANAGER_MAIN_CONTAINER_NAME);
+        assertThat(flinkPod.getPod(), is(EMPTY_POD.getPod()));
+        assertThat(flinkPod.getMainContainer(), 
is(EMPTY_POD.getMainContainer()));
+    }
+
+    @Test
+    public void 
testLoadPodFromTemplateWithNoMainContainerShouldReturnEmptyPod() {
+        final FlinkPod flinkPod =
+                KubernetesUtils.loadPodFromTemplateFile(
+                        flinkKubeClient,
+                        KubernetesTestUtils.getJobManagerPodTemplatePath(),
+                        "nonExistMainContainer");
+        assertThat(flinkPod.getMainContainer(), 
is(EMPTY_POD.getMainContainer()));
+        assertThat(flinkPod.getPod().getSpec().getContainers().size(), is(2));
+    }
+
+    @Test
+    public void testLoadPodFromTemplateAndCheckMetaData() {
+        final FlinkPod flinkPod =
+                KubernetesUtils.loadPodFromTemplateFile(
+                        flinkKubeClient,
+                        KubernetesTestUtils.getJobManagerPodTemplatePath(),
+                        JOB_MANAGER_MAIN_CONTAINER_NAME);
+        assertThat(flinkPod.getPod().getMetadata().getName(), 
is("pod-template"));
+    }
+
+    @Test
+    public void testLoadPodFromTemplateAndCheckInitContainer() {
+        final FlinkPod flinkPod =
+                KubernetesUtils.loadPodFromTemplateFile(
+                        flinkKubeClient,
+                        KubernetesTestUtils.getJobManagerPodTemplatePath(),
+                        JOB_MANAGER_MAIN_CONTAINER_NAME);
+        assertThat(flinkPod.getPod().getSpec().getInitContainers().size(), 
is(1));
+        Container expectedInitContainer =
+                new ContainerBuilder()
+                        .withName("artifacts-fetcher")
+                        .withImage("busybox")
+                        .withCommand(
+                                "wget",
+                                "https://path/of/StateMachineExample.jar";,
+                                "-O",
+                                "/flink-artifact/myjob.jar")
+                        .withVolumeMounts(
+                                new VolumeMountBuilder()
+                                        .withName("flink-artifact")
+                                        .withMountPath("/flink-artifact")
+                                        .build())
+                        .build();
+        assertThat(
+                flinkPod.getPod().getSpec().getInitContainers().get(0), 
is(expectedInitContainer));
+    }
+
+    @Test
+    public void testLoadPodFromTemplateAndCheckMainContainer() {
+        final FlinkPod flinkPod =
+                KubernetesUtils.loadPodFromTemplateFile(
+                        flinkKubeClient,
+                        KubernetesTestUtils.getJobManagerPodTemplatePath(),
+                        JOB_MANAGER_MAIN_CONTAINER_NAME);
+        assertThat(flinkPod.getMainContainer().getName(), 
is(JOB_MANAGER_MAIN_CONTAINER_NAME));
+        final VolumeMount[] expectedVolumeMounts =
+                new VolumeMount[] {
+                    new VolumeMountBuilder()
+                            .withName("flink-volume-hostpath")
+                            .withMountPath("/opt/flink/volumes/hostpath")
+                            .build(),
+                    new VolumeMountBuilder()
+                            .withName("flink-artifact")
+                            .withMountPath("/opt/flink/artifacts")
+                            .build(),
+                    new VolumeMountBuilder()
+                            .withName("flink-logs")
+                            .withMountPath("/opt/flink/log")
+                            .build()
+                };
+        assertThat(
+                flinkPod.getMainContainer().getVolumeMounts(),
+                containsInAnyOrder(expectedVolumeMounts));
+    }
+
+    @Test
+    public void testLoadPodFromTemplateAndCheckSideCarContainer() {
+        final FlinkPod flinkPod =
+                KubernetesUtils.loadPodFromTemplateFile(
+                        flinkKubeClient,
+                        KubernetesTestUtils.getJobManagerPodTemplatePath(),
+                        JOB_MANAGER_MAIN_CONTAINER_NAME);
+        assertThat(flinkPod.getPod().getSpec().getContainers().size(), is(1));
+        Container expectedSideCarContainer =
+                new ContainerBuilder()
+                        .withName("sidecar-log-collector")
+                        .withImage("busybox")
+                        .withCommand("command-to-upload", 
"/flink-logs/jobmanager.log")
+                        .withVolumeMounts(
+                                new VolumeMountBuilder()
+                                        .withName("flink-logs")
+                                        .withMountPath("/flink-logs")
+                                        .build())
+                        .build();
+        assertThat(
+                flinkPod.getPod().getSpec().getContainers().get(0), 
is(expectedSideCarContainer));
+    }
+
+    @Test
+    public void testLoadPodFromTemplateAndCheckVolumes() {
+        final FlinkPod flinkPod =
+                KubernetesUtils.loadPodFromTemplateFile(
+                        flinkKubeClient,
+                        KubernetesTestUtils.getJobManagerPodTemplatePath(),
+                        JOB_MANAGER_MAIN_CONTAINER_NAME);

Review comment:
       That reveals another issue, can a user point both JM/TM options to the 
same template file? I think that should be a very common demand. We may also 
provide a common config option in addition to the JM/TM specific options, 
similar to what we do for `evn.java.opts`.




----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
[email protected]


Reply via email to