There is also a talk [1] which introduces dynamic scaling a stream processing 
job at LinkedIn with Samza runner as well

[1] https://www.usenix.org/conference/hotcloud20/presentation/singh 
<https://www.usenix.org/conference/hotcloud20/presentation/singh> 

> On Jun 3, 2021, at 1:59 PM, Ke Wu <[email protected]> wrote:
> 
> Also, are there any resources I can use to find out more about how horizontal 
> scaling works in Samza?
> 
> It is a configuration [1] passed along with job submission, then Job 
> Coordinator, similar to Job Manager in Flink, asks for Yarn Resource Manager 
> to allocate containers, or Kubernetes API server to allocate Pods. 
> 
> configure the job to use the PROCESS environment.
> 
> Agreed that a custom image with fat jar inside + PROCESS environment works 
> too, we prefer EXTERNAL environment because it gives us isolation between the 
> runner and sdk worker, where the runner container can be running completely 
> based on a framework image.
> 
>  
> 
> [1] 
> https://samza.apache.org/learn/documentation/latest/jobs/configuration-table.html#job-container-count
>  
> <https://samza.apache.org/learn/documentation/latest/jobs/configuration-table.html#job-container-count>
>  
> 
> On Thu, Jun 3, 2021 at 12:45 PM Kyle Weaver <[email protected] 
> <mailto:[email protected]>> wrote:
> However, not all runners follow the pattern where a predefined number of 
> workers are brought up before job submission, for example, for Samza runner, 
> the number of workers needed for a job is determined after the job submission 
> happens, in which case, in the Samza worker Pod, which is similar to “Task 
> Manager Pod” in Flink, is brought up together after job submission and the 
> runner container in this POD need to connect to worker pool service at much 
> earlier time.
>  
> Makes sense. In that case, the best alternative to worker pools is probably 
> to create a custom Samza/Flink worker container image that includes whatever 
> dependencies necessary to run the Beam user code, and then configure the job 
> to use the PROCESS environment.
> 
> Also, are there any resources I can use to find out more about how horizontal 
> scaling works in Samza?
> 
> On Wed, Jun 2, 2021 at 6:39 PM Ke Wu <[email protected] 
> <mailto:[email protected]>> wrote:
> Very good point. We are actually talking about the same high level approach 
> where Task Manager Pod has two containers inside running, one is task manager 
> container while the other is worker pool service container.
> 
> I believe the disconnect probably lies in how a job is being 
> deployed/started. In the GCP Flink operator example, it is completely true 
> that the likelihood where the worker pool service is not available when the 
> task manager container needs to connect to it is extremely low. It is because 
> the worker pool service is being brought up together when the Flink cluster 
> is being brought up, which is before the job submission even happens.
> 
> However, not all runners follow the pattern where a predefined number of 
> workers are brought up before job submission, for example, for Samza runner, 
> the number of workers needed for a job is determined after the job submission 
> happens, in which case, in the Samza worker Pod, which is similar to “Task 
> Manager Pod” in Flink, is brought up together after job submission and the 
> runner container in this POD need to connect to worker pool service at much 
> earlier time.
> 
> In addition, if I understand correctly, Flink is planning to add support for 
> dynamically adding new task managers after job submission [1], in which case, 
> the task manager container and worker pool service container in the same Task 
> Manager Pod could be started together and the task manager container need to 
> connect to the worker pool service container sooner. 
> 
> Hope this clarifies things better. Let me know if you have more questions.
> 
> Best,
> Ke
> 
> [1] https://issues.apache.org/jira/browse/FLINK-10407 
> <https://issues.apache.org/jira/browse/FLINK-10407> 
> 
>> On Jun 2, 2021, at 4:27 PM, Kyle Weaver <[email protected] 
>> <mailto:[email protected]>> wrote:
>> 
>> Therefore, if we bring up the external worker pool container together with 
>> the runner container, which is one the supported approach by Flink Runner on 
>> K8s
>> 
>> Exactly which approach are you talking about in the doc? I feel like there 
>> could be some misunderstanding here. Here is the configuration I'm talking 
>> about: 
>> https://github.com/GoogleCloudPlatform/flink-on-k8s-operator/blob/master/examples/beam/without_job_server/beam_flink_cluster.yaml
>>  
>> <https://github.com/GoogleCloudPlatform/flink-on-k8s-operator/blob/master/examples/beam/without_job_server/beam_flink_cluster.yaml>
>> 
>> Basically this config is describing a Flink task manager with a Beam worker 
>> pool sidecar. The user invokes it with:
>> 
>> kubectl apply -f examples/beam/without_job_server/beam_flink_cluster.yaml
>> 
>> It doesn't matter which container is started first, the task manager 
>> container or the worker pool sidecar, because no communication between the 
>> two containers is necessary at this time.
>> 
>> The instructions are to start the cluster first and wait until it is ready 
>> to submit a job, e.g.:
>> 
>> kubectl apply -f examples/beam/without_job_server/beam_wordcount_py.yaml
>> 
>> The task manager only sends the worker pool requests once it's running a 
>> job. So for things to go wrong in the way you describe:
>> 
>> 1. The user submits a job, then starts a Flink cluster -- reversing the 
>> order of steps in the instructions.
>> 2. The worker pool sidecar takes way longer to start up than the task 
>> manager container for some reason.
>> 3. The Flink cluster accepts and starts running the job before the worker 
>> pool sidecar is ready -- I'm not familiar enough with k8s lifecycle 
>> management or the Flink operator implementation to be sure if this is even 
>> possible.
>> 
>> I've never seen this happen. But, of course there are plenty of unforeseen 
>> ways things can go wrong. So I'm not opposed to improving our error handling 
>> here more generally.
>> 
> 

Reply via email to