zhengcanbin commented on a change in pull request #11323: [FLINK-16439][k8s]
Make KubernetesResourceManager starts workers using WorkerResourceSpec
requested by SlotManager
URL: https://github.com/apache/flink/pull/11323#discussion_r410661728
##########
File path:
flink-kubernetes/src/main/java/org/apache/flink/kubernetes/KubernetesResourceManager.java
##########
@@ -237,57 +230,73 @@ private void recoverWorkerNodesFromPreviousAttempts()
throws ResourceManagerExce
++currentMaxAttemptId);
}
- private void requestKubernetesPod() {
- numPendingPodRequests++;
+ private void requestKubernetesPod(WorkerResourceSpec
workerResourceSpec) {
+ final KubernetesTaskManagerParameters parameters =
+
createKubernetesTaskManagerParameters(workerResourceSpec);
+
+ final KubernetesPod taskManagerPod =
+
KubernetesTaskManagerFactory.createTaskManagerComponent(parameters);
+ kubeClient.createTaskManagerPod(taskManagerPod);
+
+ podWorkerResources.put(parameters.getPodName(),
workerResourceSpec);
+ final int pendingWorkerNum =
notifyNewWorkerRequested(workerResourceSpec);
log.info("Requesting new TaskManager pod with <{},{}>. Number
pending requests {}.",
- defaultMemoryMB,
- defaultCpus,
- numPendingPodRequests);
+ parameters.getTaskManagerMemoryMB(),
+ parameters.getTaskManagerCPU(),
+ pendingWorkerNum);
+ log.info("TaskManager {} will be started with {}.",
parameters.getPodName(), workerResourceSpec);
+ }
+
+ private KubernetesTaskManagerParameters
createKubernetesTaskManagerParameters(WorkerResourceSpec workerResourceSpec) {
+ final TaskExecutorProcessSpec taskExecutorProcessSpec =
+
TaskExecutorProcessUtils.processSpecFromWorkerResourceSpec(flinkConfig,
workerResourceSpec);
final String podName = String.format(
TASK_MANAGER_POD_FORMAT,
clusterId,
currentMaxAttemptId,
++currentMaxPodId);
+ final ContaineredTaskManagerParameters taskManagerParameters =
+ ContaineredTaskManagerParameters.create(flinkConfig,
taskExecutorProcessSpec);
+
final String dynamicProperties =
BootstrapTools.getDynamicPropertiesAsString(flinkClientConfig, flinkConfig);
- final KubernetesTaskManagerParameters
kubernetesTaskManagerParameters = new KubernetesTaskManagerParameters(
+ return new KubernetesTaskManagerParameters(
flinkConfig,
podName,
dynamicProperties,
taskManagerParameters);
-
- final KubernetesPod taskManagerPod =
-
KubernetesTaskManagerFactory.createTaskManagerComponent(kubernetesTaskManagerParameters);
-
- log.info("TaskManager {} will be started with {}.", podName,
taskExecutorProcessSpec);
- kubeClient.createTaskManagerPod(taskManagerPod);
}
/**
* Request new pod if pending pods cannot satisfy pending slot requests.
*/
- private void requestKubernetesPodIfRequired() {
- final int requiredTaskManagers =
getNumberRequiredTaskManagers();
+ private void requestKubernetesPodIfRequired(WorkerResourceSpec
workerResourceSpec) {
+ final int pendingWorkerNum =
getNumPendingWorkersFor(workerResourceSpec);
+ int requiredTaskManagers =
getRequiredResources().get(workerResourceSpec);
- while (requiredTaskManagers > numPendingPodRequests) {
- requestKubernetesPod();
+ while (requiredTaskManagers-- > pendingWorkerNum) {
+ requestKubernetesPod(workerResourceSpec);
}
}
private void removePodIfTerminated(KubernetesPod pod) {
if (pod.isTerminated()) {
kubeClient.stopPod(pod.getName());
Review comment:
@xintongsong Sorry for the late reply. I accidentally jumped into this PR a
few days ago and currently, I haven't taken a closer look at this PR. But
recently I have two concerns:
1. Maybe our handle logic should not rely on the `EVENT` order since there
is no guarantee for this, also, the `EVENT` from the `Watcher` stream may not
have exactly-once semantics, we may lose some of them or receive the same
`EVNET` more than once.
2. We rely heavily on the informer of `Watcher` to track the state of the
Pods, I am not entirely sure that this is a good solution, maybe we need an
active polling mechanism as a supplement to get the whole view of the Pods. Of
course, it needs further discussion and could be resolved in another ticket if
necessary, which is out of the scope of this PR.
----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
For queries about this service, please contact Infrastructure at:
[email protected]
With regards,
Apache Git Services