Very good point. We are actually talking about the same high level approach 
where Task Manager Pod has two containers inside running, one is task manager 
container while the other is worker pool service container.

I believe the disconnect probably lies in how a job is being deployed/started. 
In the GCP Flink operator example, it is completely true that the likelihood 
where the worker pool service is not available when the task manager container 
needs to connect to it is extremely low. It is because the worker pool service 
is being brought up together when the Flink cluster is being brought up, which 
is before the job submission even happens.

However, not all runners follow the pattern where a predefined number of 
workers are brought up before job submission, for example, for Samza runner, 
the number of workers needed for a job is determined after the job submission 
happens, in which case, in the Samza worker Pod, which is similar to “Task 
Manager Pod” in Flink, is brought up together after job submission and the 
runner container in this POD need to connect to worker pool service at much 
earlier time.

In addition, if I understand correctly, Flink is planning to add support for 
dynamically adding new task managers after job submission [1], in which case, 
the task manager container and worker pool service container in the same Task 
Manager Pod could be started together and the task manager container need to 
connect to the worker pool service container sooner. 

Hope this clarifies things better. Let me know if you have more questions.

Best,
Ke

[1] https://issues.apache.org/jira/browse/FLINK-10407 
<https://issues.apache.org/jira/browse/FLINK-10407> 

> On Jun 2, 2021, at 4:27 PM, Kyle Weaver <[email protected]> wrote:
> 
> Therefore, if we bring up the external worker pool container together with 
> the runner container, which is one the supported approach by Flink Runner on 
> K8s
> 
> Exactly which approach are you talking about in the doc? I feel like there 
> could be some misunderstanding here. Here is the configuration I'm talking 
> about: 
> https://github.com/GoogleCloudPlatform/flink-on-k8s-operator/blob/master/examples/beam/without_job_server/beam_flink_cluster.yaml
>  
> <https://github.com/GoogleCloudPlatform/flink-on-k8s-operator/blob/master/examples/beam/without_job_server/beam_flink_cluster.yaml>
> 
> Basically this config is describing a Flink task manager with a Beam worker 
> pool sidecar. The user invokes it with:
> 
> kubectl apply -f examples/beam/without_job_server/beam_flink_cluster.yaml
> 
> It doesn't matter which container is started first, the task manager 
> container or the worker pool sidecar, because no communication between the 
> two containers is necessary at this time.
> 
> The instructions are to start the cluster first and wait until it is ready to 
> submit a job, e.g.:
> 
> kubectl apply -f examples/beam/without_job_server/beam_wordcount_py.yaml
> 
> The task manager only sends the worker pool requests once it's running a job. 
> So for things to go wrong in the way you describe:
> 
> 1. The user submits a job, then starts a Flink cluster -- reversing the order 
> of steps in the instructions.
> 2. The worker pool sidecar takes way longer to start up than the task manager 
> container for some reason.
> 3. The Flink cluster accepts and starts running the job before the worker 
> pool sidecar is ready -- I'm not familiar enough with k8s lifecycle 
> management or the Flink operator implementation to be sure if this is even 
> possible.
> 
> I've never seen this happen. But, of course there are plenty of unforeseen 
> ways things can go wrong. So I'm not opposed to improving our error handling 
> here more generally.
> 

Reply via email to