gyfora commented on a change in pull request #17582:
URL: https://github.com/apache/flink/pull/17582#discussion_r745441165



##########
File path: 
flink-kubernetes/src/main/java/org/apache/flink/kubernetes/kubeclient/factory/KubernetesTaskManagerFactory.java
##########
@@ -31,16 +32,24 @@
 import org.apache.flink.kubernetes.kubeclient.resources.KubernetesPod;
 import org.apache.flink.util.Preconditions;
 
+import io.fabric8.kubernetes.api.model.HasMetadata;
 import io.fabric8.kubernetes.api.model.Pod;
 import io.fabric8.kubernetes.api.model.PodBuilder;
 
+import java.io.IOException;
+import java.util.ArrayList;
+import java.util.List;
+
 /** Utility class for constructing the TaskManager Pod on the JobManager. */
 public class KubernetesTaskManagerFactory {
 
-    public static KubernetesPod buildTaskManagerKubernetesPod(
-            FlinkPod podTemplate, KubernetesTaskManagerParameters 
kubernetesTaskManagerParameters) {
+    public static KubernetesTaskManagerSpecification 
buildTaskManagerKubernetesPod(

Review comment:
       This method should now be renamed to 
`buildKubernetesTaskManagerSpecification`

##########
File path: 
flink-kubernetes/src/main/java/org/apache/flink/kubernetes/KubernetesResourceManagerDriver.java
##########
@@ -155,43 +156,49 @@ public void deregisterApplication(
             TaskExecutorProcessSpec taskExecutorProcessSpec) {
         final KubernetesTaskManagerParameters parameters =
                 createKubernetesTaskManagerParameters(taskExecutorProcessSpec);
-        final KubernetesPod taskManagerPod =
-                KubernetesTaskManagerFactory.buildTaskManagerKubernetesPod(
-                        taskManagerPodTemplate, parameters);
-        final String podName = taskManagerPod.getName();
         final CompletableFuture<KubernetesWorkerNode> requestResourceFuture =
                 new CompletableFuture<>();
 
-        requestResourceFutures.put(podName, requestResourceFuture);
-
-        log.info(
-                "Creating new TaskManager pod with name {} and resource 
<{},{}>.",
-                podName,
-                parameters.getTaskManagerMemoryMB(),
-                parameters.getTaskManagerCPU());
-
-        final CompletableFuture<Void> createPodFuture =
-                flinkKubeClient.createTaskManagerPod(taskManagerPod);
-
-        FutureUtils.assertNoException(
-                createPodFuture.handleAsync(
-                        (ignore, exception) -> {
-                            if (exception != null) {
-                                log.warn(
-                                        "Could not create pod {}, exception: 
{}",
-                                        podName,
-                                        exception);
-                                CompletableFuture<KubernetesWorkerNode> future 
=
-                                        
requestResourceFutures.remove(taskManagerPod.getName());
-                                if (future != null) {
-                                    future.completeExceptionally(exception);
+        try {
+            final KubernetesTaskManagerSpecification taskManagerSpec =
+                    KubernetesTaskManagerFactory.buildTaskManagerKubernetesPod(

Review comment:
       Why are we now wrapping everything in the try-catch block instead of 
only the `buildTaskManagerKubernetesPod` part? 




-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: [email protected]

For queries about this service, please contact Infrastructure at:
[email protected]


Reply via email to