Thomas Weise created BEAM-8816:
----------------------------------
Summary: Load balance bundle processing w/ multiple SDK workers
Key: BEAM-8816
URL: https://issues.apache.org/jira/browse/BEAM-8816
Project: Beam
Issue Type: Improvement
Components: runner-core, runner-flink
Affects Versions: 2.17.0
Reporter: Thomas Weise
Assignee: Thomas Weise
We found skewed utilization of SDK workers causing excessive latency with
Streaming/Python/Flink. (Remember that with Python, we need to execute multiple
worker processes on a machine instead of relying on threads in a single worker,
which requires the runner to make a decision to which worker to give a bundle
for processing.)
The Flink runner has knobs to influence the number of records per bundle and
the maximum duration for a bundle. But since the runner does not understand the
cost of individual records, it is possible for the duration of bundles to
fluctuates significantly due to skew in processing time of individual records.
And unless the bundle size is 1, multiple expensive records could be allocated
to a single bundle before the cutoff time is reached. We notice this with a
pipeline that executes models, but there are other use cases where the cost of
individual records can vary significantly.
Additionally, the Flink runner establishes the association between the subtask
managing an executable stage and the SDK worker during initialization, lasting
for the duration of the job. In other words, bundles for the same executable
stage will always be sent to the same SDK worker. When the execution time skew
is tied to specific keys (stateful processing), it further aggravates the issue.
[https://lists.apache.org/thread.html/59c02d8b8ea849c158deb39ad9d83af4d8fcb56570501c7fe8f79bb2@%3Cdev.beam.apache.org%3E]
--
This message was sent by Atlassian Jira
(v8.3.4#803005)