wangyang0918 commented on a change in pull request #18854:
URL: https://github.com/apache/flink/pull/18854#discussion_r824350934
##########
File path:
flink-kubernetes/src/main/java/org/apache/flink/kubernetes/kubeclient/Fabric8FlinkKubeClient.java
##########
@@ -130,15 +132,16 @@ public void
createJobManagerComponent(KubernetesJobManagerSpecification kubernet
@Override
public CompletableFuture<Void> createTaskManagerPod(KubernetesPod
kubernetesPod) {
+ if (masterDeployment == null) {
Review comment:
The network operation will block the main thread of Flink
ResourceManager. And after this change, the `Fabric8FlinkKubeClient` is no
longer thead-safe.
So I suggest to use `private final AtomicReference<Deployment>
masterDeploymentRef` to store the JM deployment. And we could also run the
get-and-update reference on the `kubeClientExecutorService`.
WDYT?
```
@Override
public CompletableFuture<Void> createTaskManagerPod(KubernetesPod
kubernetesPod) {
return CompletableFuture.runAsync(
() -> {
if (masterDeploymentRef.get() == null) {
final Deployment masterDeployment =
this.internalClient
.apps()
.deployments()
.withName(KubernetesUtils.getDeploymentName(clusterId))
.get();
if (masterDeployment == null) {
throw new RuntimeException(
"Failed to find Deployment named "
+ clusterId
+ " in namespace "
+ this.namespace);
}
masterDeploymentRef.compareAndSet(null,
masterDeployment);
}
// Note that we should use the uid of the master
Deployment for the
// OwnerReference.
setOwnerReference(
checkNotNull(masterDeploymentRef.get()),
Collections.singletonList(kubernetesPod.getInternalResource()));
LOG.debug(
"Start to create pod with spec {}{}",
System.lineSeparator(),
KubernetesUtils.tryToGetPrettyPrintYaml(
kubernetesPod.getInternalResource()));
this.internalClient.pods().create(kubernetesPod.getInternalResource());
},
kubeClientExecutorService);
}
```
##########
File path:
flink-kubernetes/src/main/java/org/apache/flink/kubernetes/kubeclient/Fabric8FlinkKubeClient.java
##########
@@ -130,15 +132,16 @@ public void
createJobManagerComponent(KubernetesJobManagerSpecification kubernet
@Override
public CompletableFuture<Void> createTaskManagerPod(KubernetesPod
kubernetesPod) {
Review comment:
We also need to add a test to guard this change.
--
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
To unsubscribe, e-mail: [email protected]
For queries about this service, please contact Infrastructure at:
[email protected]