xintongsong commented on a change in pull request #11323:
URL: https://github.com/apache/flink/pull/11323#discussion_r413842746
##########
File path:
flink-kubernetes/src/main/java/org/apache/flink/kubernetes/KubernetesResourceManager.java
##########
@@ -320,5 +333,16 @@ private void internalStopPod(String podName) {
}
}
);
+
+ final KubernetesWorkerNode kubernetesWorkerNode =
workerNodes.remove(resourceId);
+ final WorkerResourceSpec workerResourceSpec =
podWorkerResources.remove(podName);
+
+ // If the stopped pod is requested in the current attempt
(workerResourceSpec is known) and is not yet added,
+ // we need to notify ActiveResourceManager to decrease the
pending worker count.
+ if (workerResourceSpec != null && kubernetesWorkerNode == null)
{
Review comment:
I think what we need to restart is the workers that are requested by the
slot manager, but not yet registered to the slot manager. Because in such
cases, the slot manager does not know that these pods are failed and will keep
expecting them to register.
For a worker that is already registered, in case of failure:
- If there are slots on the failed worker that are already in use, JM will
realize that the slots are lost, and send out new slot requests, which will
trigger starting of new workers if needed.
- If none of the slots on the failed worker is in use, then we don't loose
anything. New workers will be started when new slot requests are received and
cannot be satisfied by registered free slots.
For a pod recovered from the previous attempt, if it is not yet registered,
slot manager will not be aware of it at all. Thus, slot manager should already
requested enough workers for satisfying the slot requests even without this
failed pod.
Moreover, we have not increased the `pendingWorkerCounter` for the recovered
pod and we cannot do that because we don't know its `workerResourceSpec` until
it is registered. So we should also not decrease the `pendingWorkerCounter`
when the recovered pod is added/failed, otherwise the `pendingWorkerCounter`
will go out of sync. I guess that's exactly the reason why we also decrease the
pending count for recovered pod before these changes, because previously we
assume all the pods are identical.
----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
For queries about this service, please contact Infrastructure at:
[email protected]