>
> However, not all runners follow the pattern where a predefined number of
> workers are brought up before job submission, for example, for Samza
> runner, the number of workers needed for a job is determined after the job
> submission happens, in which case, in the Samza worker Pod, which is
> similar to “Task Manager Pod” in Flink, is brought up together after job
> submission and the runner container in this POD need to connect to worker
> pool service at much earlier time.


Makes sense. In that case, the best alternative to worker pools is probably
to create a custom Samza/Flink worker container image that includes
whatever dependencies necessary to run the Beam user code, and then
configure the job to use the PROCESS environment.

Also, are there any resources I can use to find out more about how
horizontal scaling works in Samza?

On Wed, Jun 2, 2021 at 6:39 PM Ke Wu <[email protected]> wrote:

> Very good point. We are actually talking about the same high level
> approach where Task Manager Pod has two containers inside running, one is
> task manager container while the other is worker pool service container.
>
> I believe the disconnect probably lies in how a job is being
> deployed/started. In the GCP Flink operator example, it is completely true
> that the likelihood where the worker pool service is not available when the
> task manager container needs to connect to it is extremely low. It is
> because the worker pool service is being brought up together when the Flink
> cluster is being brought up, which is before the job submission even
> happens.
>
> However, not all runners follow the pattern where a predefined number of
> workers are brought up before job submission, for example, for Samza
> runner, the number of workers needed for a job is determined after the job
> submission happens, in which case, in the Samza worker Pod, which is
> similar to “Task Manager Pod” in Flink, is brought up together after job
> submission and the runner container in this POD need to connect to worker
> pool service at much earlier time.
>
> In addition, if I understand correctly, Flink is planning to add support
> for dynamically adding new task managers after job submission [1], in which
> case, the task manager container and worker pool service container in the
> same Task Manager Pod could be started together and the task manager
> container need to connect to the worker pool service container sooner.
>
> Hope this clarifies things better. Let me know if you have more questions.
>
> Best,
> Ke
>
> [1] https://issues.apache.org/jira/browse/FLINK-10407
>
> On Jun 2, 2021, at 4:27 PM, Kyle Weaver <[email protected]> wrote:
>
> Therefore, if we bring up the external worker pool container together with
>> the runner container, which is one the supported approach by Flink Runner
>> on K8s
>
>
> Exactly which approach are you talking about in the doc? I feel like there
> could be some misunderstanding here. Here is the configuration I'm talking
> about:
> https://github.com/GoogleCloudPlatform/flink-on-k8s-operator/blob/master/examples/beam/without_job_server/beam_flink_cluster.yaml
>
> Basically this config is describing a Flink task manager with a Beam
> worker pool sidecar. The user invokes it with:
>
> kubectl apply -f examples/beam/without_job_server/beam_flink_cluster.yaml
>
> It doesn't matter which container is started first, the task manager
> container or the worker pool sidecar, because no communication between the
> two containers is necessary at this time.
>
> The instructions are to start the cluster first and wait until it is ready
> to submit a job, e.g.:
>
> kubectl apply -f examples/beam/without_job_server/beam_wordcount_py.yaml
>
> The task manager only sends the worker pool requests once it's running a
> job. So for things to go wrong in the way you describe:
>
> 1. The user submits a job, then starts a Flink cluster -- reversing the
> order of steps in the instructions.
> 2. The worker pool sidecar takes way longer to start up than the task
> manager container for some reason.
> 3. The Flink cluster accepts and starts running the job before the worker
> pool sidecar is ready -- I'm not familiar enough with k8s lifecycle
> management or the Flink operator implementation to be sure if this is even
> possible.
>
> I've never seen this happen. But, of course there are plenty of unforeseen
> ways things can go wrong. So I'm not opposed to improving our error
> handling here more generally.
>
>
>

Reply via email to