There is also a talk [1] which introduces dynamic scaling a stream processing job at LinkedIn with Samza runner as well
[1] https://www.usenix.org/conference/hotcloud20/presentation/singh <https://www.usenix.org/conference/hotcloud20/presentation/singh> > On Jun 3, 2021, at 1:59 PM, Ke Wu <[email protected]> wrote: > > Also, are there any resources I can use to find out more about how horizontal > scaling works in Samza? > > It is a configuration [1] passed along with job submission, then Job > Coordinator, similar to Job Manager in Flink, asks for Yarn Resource Manager > to allocate containers, or Kubernetes API server to allocate Pods. > > configure the job to use the PROCESS environment. > > Agreed that a custom image with fat jar inside + PROCESS environment works > too, we prefer EXTERNAL environment because it gives us isolation between the > runner and sdk worker, where the runner container can be running completely > based on a framework image. > > > > [1] > https://samza.apache.org/learn/documentation/latest/jobs/configuration-table.html#job-container-count > > <https://samza.apache.org/learn/documentation/latest/jobs/configuration-table.html#job-container-count> > > > On Thu, Jun 3, 2021 at 12:45 PM Kyle Weaver <[email protected] > <mailto:[email protected]>> wrote: > However, not all runners follow the pattern where a predefined number of > workers are brought up before job submission, for example, for Samza runner, > the number of workers needed for a job is determined after the job submission > happens, in which case, in the Samza worker Pod, which is similar to “Task > Manager Pod” in Flink, is brought up together after job submission and the > runner container in this POD need to connect to worker pool service at much > earlier time. > > Makes sense. In that case, the best alternative to worker pools is probably > to create a custom Samza/Flink worker container image that includes whatever > dependencies necessary to run the Beam user code, and then configure the job > to use the PROCESS environment. > > Also, are there any resources I can use to find out more about how horizontal > scaling works in Samza? > > On Wed, Jun 2, 2021 at 6:39 PM Ke Wu <[email protected] > <mailto:[email protected]>> wrote: > Very good point. We are actually talking about the same high level approach > where Task Manager Pod has two containers inside running, one is task manager > container while the other is worker pool service container. > > I believe the disconnect probably lies in how a job is being > deployed/started. In the GCP Flink operator example, it is completely true > that the likelihood where the worker pool service is not available when the > task manager container needs to connect to it is extremely low. It is because > the worker pool service is being brought up together when the Flink cluster > is being brought up, which is before the job submission even happens. > > However, not all runners follow the pattern where a predefined number of > workers are brought up before job submission, for example, for Samza runner, > the number of workers needed for a job is determined after the job submission > happens, in which case, in the Samza worker Pod, which is similar to “Task > Manager Pod” in Flink, is brought up together after job submission and the > runner container in this POD need to connect to worker pool service at much > earlier time. > > In addition, if I understand correctly, Flink is planning to add support for > dynamically adding new task managers after job submission [1], in which case, > the task manager container and worker pool service container in the same Task > Manager Pod could be started together and the task manager container need to > connect to the worker pool service container sooner. > > Hope this clarifies things better. Let me know if you have more questions. > > Best, > Ke > > [1] https://issues.apache.org/jira/browse/FLINK-10407 > <https://issues.apache.org/jira/browse/FLINK-10407> > >> On Jun 2, 2021, at 4:27 PM, Kyle Weaver <[email protected] >> <mailto:[email protected]>> wrote: >> >> Therefore, if we bring up the external worker pool container together with >> the runner container, which is one the supported approach by Flink Runner on >> K8s >> >> Exactly which approach are you talking about in the doc? I feel like there >> could be some misunderstanding here. Here is the configuration I'm talking >> about: >> https://github.com/GoogleCloudPlatform/flink-on-k8s-operator/blob/master/examples/beam/without_job_server/beam_flink_cluster.yaml >> >> <https://github.com/GoogleCloudPlatform/flink-on-k8s-operator/blob/master/examples/beam/without_job_server/beam_flink_cluster.yaml> >> >> Basically this config is describing a Flink task manager with a Beam worker >> pool sidecar. The user invokes it with: >> >> kubectl apply -f examples/beam/without_job_server/beam_flink_cluster.yaml >> >> It doesn't matter which container is started first, the task manager >> container or the worker pool sidecar, because no communication between the >> two containers is necessary at this time. >> >> The instructions are to start the cluster first and wait until it is ready >> to submit a job, e.g.: >> >> kubectl apply -f examples/beam/without_job_server/beam_wordcount_py.yaml >> >> The task manager only sends the worker pool requests once it's running a >> job. So for things to go wrong in the way you describe: >> >> 1. The user submits a job, then starts a Flink cluster -- reversing the >> order of steps in the instructions. >> 2. The worker pool sidecar takes way longer to start up than the task >> manager container for some reason. >> 3. The Flink cluster accepts and starts running the job before the worker >> pool sidecar is ready -- I'm not familiar enough with k8s lifecycle >> management or the Flink operator implementation to be sure if this is even >> possible. >> >> I've never seen this happen. But, of course there are plenty of unforeseen >> ways things can go wrong. So I'm not opposed to improving our error handling >> here more generally. >> >
