> > However, not all runners follow the pattern where a predefined number of > workers are brought up before job submission, for example, for Samza > runner, the number of workers needed for a job is determined after the job > submission happens, in which case, in the Samza worker Pod, which is > similar to “Task Manager Pod” in Flink, is brought up together after job > submission and the runner container in this POD need to connect to worker > pool service at much earlier time.
Makes sense. In that case, the best alternative to worker pools is probably to create a custom Samza/Flink worker container image that includes whatever dependencies necessary to run the Beam user code, and then configure the job to use the PROCESS environment. Also, are there any resources I can use to find out more about how horizontal scaling works in Samza? On Wed, Jun 2, 2021 at 6:39 PM Ke Wu <[email protected]> wrote: > Very good point. We are actually talking about the same high level > approach where Task Manager Pod has two containers inside running, one is > task manager container while the other is worker pool service container. > > I believe the disconnect probably lies in how a job is being > deployed/started. In the GCP Flink operator example, it is completely true > that the likelihood where the worker pool service is not available when the > task manager container needs to connect to it is extremely low. It is > because the worker pool service is being brought up together when the Flink > cluster is being brought up, which is before the job submission even > happens. > > However, not all runners follow the pattern where a predefined number of > workers are brought up before job submission, for example, for Samza > runner, the number of workers needed for a job is determined after the job > submission happens, in which case, in the Samza worker Pod, which is > similar to “Task Manager Pod” in Flink, is brought up together after job > submission and the runner container in this POD need to connect to worker > pool service at much earlier time. > > In addition, if I understand correctly, Flink is planning to add support > for dynamically adding new task managers after job submission [1], in which > case, the task manager container and worker pool service container in the > same Task Manager Pod could be started together and the task manager > container need to connect to the worker pool service container sooner. > > Hope this clarifies things better. Let me know if you have more questions. > > Best, > Ke > > [1] https://issues.apache.org/jira/browse/FLINK-10407 > > On Jun 2, 2021, at 4:27 PM, Kyle Weaver <[email protected]> wrote: > > Therefore, if we bring up the external worker pool container together with >> the runner container, which is one the supported approach by Flink Runner >> on K8s > > > Exactly which approach are you talking about in the doc? I feel like there > could be some misunderstanding here. Here is the configuration I'm talking > about: > https://github.com/GoogleCloudPlatform/flink-on-k8s-operator/blob/master/examples/beam/without_job_server/beam_flink_cluster.yaml > > Basically this config is describing a Flink task manager with a Beam > worker pool sidecar. The user invokes it with: > > kubectl apply -f examples/beam/without_job_server/beam_flink_cluster.yaml > > It doesn't matter which container is started first, the task manager > container or the worker pool sidecar, because no communication between the > two containers is necessary at this time. > > The instructions are to start the cluster first and wait until it is ready > to submit a job, e.g.: > > kubectl apply -f examples/beam/without_job_server/beam_wordcount_py.yaml > > The task manager only sends the worker pool requests once it's running a > job. So for things to go wrong in the way you describe: > > 1. The user submits a job, then starts a Flink cluster -- reversing the > order of steps in the instructions. > 2. The worker pool sidecar takes way longer to start up than the task > manager container for some reason. > 3. The Flink cluster accepts and starts running the job before the worker > pool sidecar is ready -- I'm not familiar enough with k8s lifecycle > management or the Flink operator implementation to be sure if this is even > possible. > > I've never seen this happen. But, of course there are plenty of unforeseen > ways things can go wrong. So I'm not opposed to improving our error > handling here more generally. > > >
