[ 
https://issues.apache.org/jira/browse/BEAM-8816?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Thomas Weise updated BEAM-8816:
-------------------------------
    Description: 
We found skewed utilization of SDK workers causing excessive latency with 
Streaming/Python/Flink. (Remember that with Python, we need to execute multiple 
worker processes on a machine instead of relying on threads in a single worker, 
which requires the runner to make a decision to which worker to give a bundle 
for processing.)

The Flink runner has knobs to influence the number of records per bundle and 
the maximum duration for a bundle. But since the runner does not understand the 
cost of individual records, it is possible for the duration of bundles to 
fluctuate significantly due to skew in processing time of individual records. 
And unless the bundle size is 1, multiple expensive records could be allocated 
to a single bundle before the cutoff time is reached. We notice this with a 
pipeline that executes models, but there are other use cases where the cost of 
individual records can vary significantly.

Additionally, the Flink runner establishes the association between the subtask 
managing an executable stage and the SDK worker during initialization, lasting 
for the duration of the job. In other words, bundles for the same executable 
stage will always be sent to the same SDK worker. When the execution time skew 
is tied to specific keys (stateful processing), it further aggravates the issue.

[https://lists.apache.org/thread.html/59c02d8b8ea849c158deb39ad9d83af4d8fcb56570501c7fe8f79bb2@%3Cdev.beam.apache.org%3E]

Long term this problem can be addressed with SDF. Till then, an (optional) 
runner controlled balancing mechanism has shown to improve the performance in 
internal testing.

 

  was:
We found skewed utilization of SDK workers causing excessive latency with 
Streaming/Python/Flink. (Remember that with Python, we need to execute multiple 
worker processes on a machine instead of relying on threads in a single worker, 
which requires the runner to make a decision to which worker to give a bundle 
for processing.)

The Flink runner has knobs to influence the number of records per bundle and 
the maximum duration for a bundle. But since the runner does not understand the 
cost of individual records, it is possible for the duration of bundles to 
fluctuates significantly due to skew in processing time of individual records. 
And unless the bundle size is 1, multiple expensive records could be allocated 
to a single bundle before the cutoff time is reached. We notice this with a 
pipeline that executes models, but there are other use cases where the cost of 
individual records can vary significantly.

Additionally, the Flink runner establishes the association between the subtask 
managing an executable stage and the SDK worker during initialization, lasting 
for the duration of the job. In other words, bundles for the same executable 
stage will always be sent to the same SDK worker. When the execution time skew 
is tied to specific keys (stateful processing), it further aggravates the issue.

[https://lists.apache.org/thread.html/59c02d8b8ea849c158deb39ad9d83af4d8fcb56570501c7fe8f79bb2@%3Cdev.beam.apache.org%3E]

 


> Load balance bundle processing w/ multiple SDK workers
> ------------------------------------------------------
>
>                 Key: BEAM-8816
>                 URL: https://issues.apache.org/jira/browse/BEAM-8816
>             Project: Beam
>          Issue Type: Improvement
>          Components: runner-core, runner-flink
>    Affects Versions: 2.17.0
>            Reporter: Thomas Weise
>            Assignee: Thomas Weise
>            Priority: Major
>              Labels: portability
>
> We found skewed utilization of SDK workers causing excessive latency with 
> Streaming/Python/Flink. (Remember that with Python, we need to execute 
> multiple worker processes on a machine instead of relying on threads in a 
> single worker, which requires the runner to make a decision to which worker 
> to give a bundle for processing.)
> The Flink runner has knobs to influence the number of records per bundle and 
> the maximum duration for a bundle. But since the runner does not understand 
> the cost of individual records, it is possible for the duration of bundles to 
> fluctuate significantly due to skew in processing time of individual records. 
> And unless the bundle size is 1, multiple expensive records could be 
> allocated to a single bundle before the cutoff time is reached. We notice 
> this with a pipeline that executes models, but there are other use cases 
> where the cost of individual records can vary significantly.
> Additionally, the Flink runner establishes the association between the 
> subtask managing an executable stage and the SDK worker during 
> initialization, lasting for the duration of the job. In other words, bundles 
> for the same executable stage will always be sent to the same SDK worker. 
> When the execution time skew is tied to specific keys (stateful processing), 
> it further aggravates the issue.
> [https://lists.apache.org/thread.html/59c02d8b8ea849c158deb39ad9d83af4d8fcb56570501c7fe8f79bb2@%3Cdev.beam.apache.org%3E]
> Long term this problem can be addressed with SDF. Till then, an (optional) 
> runner controlled balancing mechanism has shown to improve the performance in 
> internal testing.
>  



--
This message was sent by Atlassian Jira
(v8.3.4#803005)

Reply via email to