Very good point. We are actually talking about the same high level approach where Task Manager Pod has two containers inside running, one is task manager container while the other is worker pool service container.
I believe the disconnect probably lies in how a job is being deployed/started. In the GCP Flink operator example, it is completely true that the likelihood where the worker pool service is not available when the task manager container needs to connect to it is extremely low. It is because the worker pool service is being brought up together when the Flink cluster is being brought up, which is before the job submission even happens. However, not all runners follow the pattern where a predefined number of workers are brought up before job submission, for example, for Samza runner, the number of workers needed for a job is determined after the job submission happens, in which case, in the Samza worker Pod, which is similar to “Task Manager Pod” in Flink, is brought up together after job submission and the runner container in this POD need to connect to worker pool service at much earlier time. In addition, if I understand correctly, Flink is planning to add support for dynamically adding new task managers after job submission [1], in which case, the task manager container and worker pool service container in the same Task Manager Pod could be started together and the task manager container need to connect to the worker pool service container sooner. Hope this clarifies things better. Let me know if you have more questions. Best, Ke [1] https://issues.apache.org/jira/browse/FLINK-10407 <https://issues.apache.org/jira/browse/FLINK-10407> > On Jun 2, 2021, at 4:27 PM, Kyle Weaver <[email protected]> wrote: > > Therefore, if we bring up the external worker pool container together with > the runner container, which is one the supported approach by Flink Runner on > K8s > > Exactly which approach are you talking about in the doc? I feel like there > could be some misunderstanding here. Here is the configuration I'm talking > about: > https://github.com/GoogleCloudPlatform/flink-on-k8s-operator/blob/master/examples/beam/without_job_server/beam_flink_cluster.yaml > > <https://github.com/GoogleCloudPlatform/flink-on-k8s-operator/blob/master/examples/beam/without_job_server/beam_flink_cluster.yaml> > > Basically this config is describing a Flink task manager with a Beam worker > pool sidecar. The user invokes it with: > > kubectl apply -f examples/beam/without_job_server/beam_flink_cluster.yaml > > It doesn't matter which container is started first, the task manager > container or the worker pool sidecar, because no communication between the > two containers is necessary at this time. > > The instructions are to start the cluster first and wait until it is ready to > submit a job, e.g.: > > kubectl apply -f examples/beam/without_job_server/beam_wordcount_py.yaml > > The task manager only sends the worker pool requests once it's running a job. > So for things to go wrong in the way you describe: > > 1. The user submits a job, then starts a Flink cluster -- reversing the order > of steps in the instructions. > 2. The worker pool sidecar takes way longer to start up than the task manager > container for some reason. > 3. The Flink cluster accepts and starts running the job before the worker > pool sidecar is ready -- I'm not familiar enough with k8s lifecycle > management or the Flink operator implementation to be sure if this is even > possible. > > I've never seen this happen. But, of course there are plenty of unforeseen > ways things can go wrong. So I'm not opposed to improving our error handling > here more generally. >
