wangyang0918 commented on a change in pull request #18854:
URL: https://github.com/apache/flink/pull/18854#discussion_r824350934



##########
File path: 
flink-kubernetes/src/main/java/org/apache/flink/kubernetes/kubeclient/Fabric8FlinkKubeClient.java
##########
@@ -130,15 +132,16 @@ public void 
createJobManagerComponent(KubernetesJobManagerSpecification kubernet
 
     @Override
     public CompletableFuture<Void> createTaskManagerPod(KubernetesPod 
kubernetesPod) {
+        if (masterDeployment == null) {

Review comment:
       The network operation will block the main thread of Flink 
ResourceManager. And after this change, the `Fabric8FlinkKubeClient` is no 
longer thead-safe.
   
   So I suggest to use `private final AtomicReference<Deployment> 
masterDeploymentRef` to store the JM deployment. And we could also run the 
get-and-update reference on the `kubeClientExecutorService`.
   
   WDYT?
   
   ```
       @Override
       public CompletableFuture<Void> createTaskManagerPod(KubernetesPod 
kubernetesPod) {
           return CompletableFuture.runAsync(
                   () -> {
                       if (masterDeploymentRef.get() == null) {
                           final Deployment masterDeployment =
                                   this.internalClient
                                           .apps()
                                           .deployments()
                                           
.withName(KubernetesUtils.getDeploymentName(clusterId))
                                           .get();
   
                           if (masterDeployment == null) {
                               throw new RuntimeException(
                                       "Failed to find Deployment named "
                                               + clusterId
                                               + " in namespace "
                                               + this.namespace);
                           }
   
                           masterDeploymentRef.compareAndSet(null, 
masterDeployment);
                       }
   
                       // Note that we should use the uid of the master 
Deployment for the
                       // OwnerReference.
                       setOwnerReference(
                               checkNotNull(masterDeploymentRef.get()),
                               
Collections.singletonList(kubernetesPod.getInternalResource()));
   
                       LOG.debug(
                               "Start to create pod with spec {}{}",
                               System.lineSeparator(),
                               KubernetesUtils.tryToGetPrettyPrintYaml(
                                       kubernetesPod.getInternalResource()));
   
                       
this.internalClient.pods().create(kubernetesPod.getInternalResource());
                   },
                   kubeClientExecutorService);
       }
   ```

##########
File path: 
flink-kubernetes/src/main/java/org/apache/flink/kubernetes/kubeclient/Fabric8FlinkKubeClient.java
##########
@@ -130,15 +132,16 @@ public void 
createJobManagerComponent(KubernetesJobManagerSpecification kubernet
 
     @Override
     public CompletableFuture<Void> createTaskManagerPod(KubernetesPod 
kubernetesPod) {

Review comment:
       We also need to add a test to guard this change.




-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: [email protected]

For queries about this service, please contact Infrastructure at:
[email protected]


Reply via email to