Hi Jan,

To generalize the per-stage parallelism configuration, we should have a FR
proposing the capability to explicitly set autoscaling (in this case, fixed
size per stage) policy in Beam pipelines.

Per-step or per-stage parallelism, or fusion/optimization is not part of
the Beam model. They are [Flink] runner implementation details and should
be configured for each runner.

Also, when building the pipeline, it's not clear what the fusion looks like
until the pipeline is submitted to a runner, thus making configuration of
the parallelism/worker-per-stage not straightforward.
Flink's parallelism settings can be found here
<https://nightlies.apache.org/flink/flink-docs-master/docs/dev/datastream/execution/parallel/>,
it's still kind of a black box since you don't really know how many tasks
are actually spawned until you run a pipeline.

That being said, if we have a general interface controlling how a pipeline
scales, each runner could adapt [auto]scaling in their own way.
For example, in a Flink job, each operator/stage's task slot is prorated by
their key numbers; the maximum parallelism is throttled by task slot
utilization.
Another example, in a Dataflow job, each stage horizontally scales by CPU
utilization; vertically scales by memory/disk utilization.

+dev@beam.apache.org <dev@beam.apache.org>
Let's use this thread to discuss how to configure a pipeline for runners so
that they can scale workers appropriately without exposing runner-specific
details to the Beam model.

Ning.


On Thu, Apr 20, 2023 at 1:41 PM Jan Lukavský <je...@seznam.cz> wrote:

> Hi Ning,
>
> I might have missed that in the discussion, but we talk about batch
> execution, am I right? In streaming, all operators (PTransforms) of a
> Pipeline are run in the same slots, thus the downsides are limited. You can
> enforce streaming mode using --streaming command-line argument. But yes,
> this might have other implications. For batch only it obviously makes sense
> to limit parallelism of a (fused) 'stage', which is not an transform-level
> concept, but rather a more complex union of transforms divided by shuffle
> barrier. Would you be willing to start a follow-up thread in @dev mailing
> list for this for deeper discussion?
>
>  Jan
> On 4/20/23 19:18, Ning Kang via user wrote:
>
> Hi Jan,
>
> The approach works when your pipeline doesn't have too many operators. And
> the operator that needs the highest parallelism can only use at most
> #total_task_slots / #operators resources available in the cluster.
>
> Another downside is wasted resources for other smaller operators who
> cannot make full use of task slots assigned to them. You might see only
> 1/10 tasks running while the other 9/10 tasks idle for an operator with
> parallelism 10, especially when it's doing some aggregation like a SUM.
>
> One redeeming method is that, for operators following another operator
> with high fanout, we can explicitly add a Reshuffle to allow a higher
> parallelism. But this circles back to the first downside: if your pipeline
> has exponentially high fanout through it, setting a single parallelism for
> the whole pipeline is not ideal because it limits the scalability of your
> pipeline significantly.
>
> Ning.
>
>
> On Thu, Apr 20, 2023 at 5:53 AM Jan Lukavský <je...@seznam.cz> wrote:
>
>> Hi,
>>
>> this topic was discussed many years ago and the conclusion there was that
>> setting the parallelism of individual operators via FlinkPipelineOptions
>> (or ResourceHints) is be possible, but would be somewhat cumbersome.
>> Although I understand that it "feels" weird to have high parallelism for
>> operators with small inputs, does this actually bring any relevant
>> performance impact? I always use parallelism based on the largest operator
>> in the Pipeline and this seems to work just fine. Is there any particular
>> need or measurable impact of such approach?
>>
>>  Jan
>> On 4/19/23 17:23, Nimalan Mahendran wrote:
>>
>> Same need here, using Flink runner. We are processing a pcollection
>> (extracting features per element) then combining these into groups of
>> features and running the next operator on those groups.
>>
>> Each group contains ~50 elements, so the parallelism of the operator
>> upstream of the groupby should be higher, to be balanced with the
>> downstream operator.
>>
>> On Tue, Apr 18, 2023 at 19:17 Jeff Zhang <zjf...@gmail.com> wrote:
>>
>>> Hi Reuven,
>>>
>>> It would be better to set parallelism for operators, as I mentioned
>>> before, there may be multiple groupby, join operators in one pipeline, and
>>> their parallelism can be different due to different input data sizes.
>>>
>>> On Wed, Apr 19, 2023 at 3:59 AM Reuven Lax <re...@google.com> wrote:
>>>
>>>> Jeff - does setting the global default work for you, or do you need
>>>> per-operator control? Seems like it would be to add this to ResourceHints.
>>>>
>>>> On Tue, Apr 18, 2023 at 12:35 PM Robert Bradshaw <rober...@google.com>
>>>> wrote:
>>>>
>>>>> Yeah, I don't think we have a good per-operator API for this. If we
>>>>> were to add it, it probably belongs in ResourceHints.
>>>>>
>>>>> On Sun, Apr 16, 2023 at 11:28 PM Reuven Lax <re...@google.com> wrote:
>>>>>
>>>>>> Looking at FlinkPipelineOptions, there is a parallelism option you
>>>>>> can set. I believe this sets the default parallelism for all Flink
>>>>>> operators.
>>>>>>
>>>>>> On Sun, Apr 16, 2023 at 7:20 PM Jeff Zhang <zjf...@gmail.com> wrote:
>>>>>>
>>>>>>> Thanks Holden, this would work for Spark, but Flink doesn't have
>>>>>>> such kind of mechanism, so I am looking for a general solution on the 
>>>>>>> beam
>>>>>>> side.
>>>>>>>
>>>>>>> On Mon, Apr 17, 2023 at 10:08 AM Holden Karau <hol...@pigscanfly.ca>
>>>>>>> wrote:
>>>>>>>
>>>>>>>> To a (small) degree Sparks “new” AQE might be able to help
>>>>>>>> depending on what kind of operations Beam is compiling it down to.
>>>>>>>>
>>>>>>>> Have you tried setting spark.sql.adaptive.enabled &
>>>>>>>> spark.sql.adaptive.coalescePartitions.enabled
>>>>>>>>
>>>>>>>>
>>>>>>>>
>>>>>>>> On Mon, Apr 17, 2023 at 10:34 AM Reuven Lax via user <
>>>>>>>> u...@beam.apache.org> wrote:
>>>>>>>>
>>>>>>>>> I see. Robert - what is the story for parallelism controls on
>>>>>>>>> GBK with the Spark or Flink runners?
>>>>>>>>>
>>>>>>>>> On Sun, Apr 16, 2023 at 6:24 PM Jeff Zhang <zjf...@gmail.com>
>>>>>>>>> wrote:
>>>>>>>>>
>>>>>>>>>> No, I don't use dataflow, I use Spark & Flink.
>>>>>>>>>>
>>>>>>>>>>
>>>>>>>>>> On Mon, Apr 17, 2023 at 8:08 AM Reuven Lax <re...@google.com>
>>>>>>>>>> wrote:
>>>>>>>>>>
>>>>>>>>>>> Are you running on the Dataflow runner? If so, Dataflow - unlike
>>>>>>>>>>> Spark and Flink - dynamically modifies the parallelism as the 
>>>>>>>>>>> operator
>>>>>>>>>>> runs, so there is no need to have such controls. In fact these 
>>>>>>>>>>> specific
>>>>>>>>>>> controls wouldn't make much sense for the way Dataflow implements 
>>>>>>>>>>> these
>>>>>>>>>>> operators.
>>>>>>>>>>>
>>>>>>>>>>> On Sun, Apr 16, 2023 at 12:25 AM Jeff Zhang <zjf...@gmail.com>
>>>>>>>>>>> wrote:
>>>>>>>>>>>
>>>>>>>>>>>> Just for performance tuning like in Spark and Flink.
>>>>>>>>>>>>
>>>>>>>>>>>>
>>>>>>>>>>>> On Sun, Apr 16, 2023 at 1:10 PM Robert Bradshaw via user <
>>>>>>>>>>>> u...@beam.apache.org> wrote:
>>>>>>>>>>>>
>>>>>>>>>>>>> What are you trying to achieve by setting the parallelism?
>>>>>>>>>>>>>
>>>>>>>>>>>>> On Sat, Apr 15, 2023 at 5:13 PM Jeff Zhang <zjf...@gmail.com>
>>>>>>>>>>>>> wrote:
>>>>>>>>>>>>>
>>>>>>>>>>>>>> Thanks Reuven, what I mean is to set the parallelism in
>>>>>>>>>>>>>> operator level. And the input size of the operator is unknown at 
>>>>>>>>>>>>>> compiling
>>>>>>>>>>>>>> stage if it is not a source
>>>>>>>>>>>>>>  operator,
>>>>>>>>>>>>>>
>>>>>>>>>>>>>> Here's an example of flink
>>>>>>>>>>>>>> https://nightlies.apache.org/flink/flink-docs-master/docs/dev/datastream/execution/parallel/#operator-level
>>>>>>>>>>>>>> Spark also support to set operator level parallelism (see 
>>>>>>>>>>>>>> groupByKey
>>>>>>>>>>>>>> and reduceByKey):
>>>>>>>>>>>>>> https://spark.apache.org/docs/latest/rdd-programming-guide.html
>>>>>>>>>>>>>>
>>>>>>>>>>>>>>
>>>>>>>>>>>>>> On Sun, Apr 16, 2023 at 1:42 AM Reuven Lax via user <
>>>>>>>>>>>>>> u...@beam.apache.org> wrote:
>>>>>>>>>>>>>>
>>>>>>>>>>>>>>> The maximum parallelism is always determined by the
>>>>>>>>>>>>>>> parallelism of your data. If you do a GroupByKey for example, 
>>>>>>>>>>>>>>> the number of
>>>>>>>>>>>>>>> keys in your data determines the maximum parallelism.
>>>>>>>>>>>>>>>
>>>>>>>>>>>>>>> Beyond the limitations in your data, it depends on your
>>>>>>>>>>>>>>> execution engine. If you're using Dataflow, Dataflow is 
>>>>>>>>>>>>>>> designed to
>>>>>>>>>>>>>>> automatically determine the parallelism (e.g. work will be 
>>>>>>>>>>>>>>> dynamically
>>>>>>>>>>>>>>> split and moved around between workers, the number of workers 
>>>>>>>>>>>>>>> will
>>>>>>>>>>>>>>> autoscale, etc.), so there's no need to explicitly set the 
>>>>>>>>>>>>>>> parallelism of
>>>>>>>>>>>>>>> the execution.
>>>>>>>>>>>>>>>
>>>>>>>>>>>>>>> On Sat, Apr 15, 2023 at 1:12 AM Jeff Zhang <zjf...@gmail.com>
>>>>>>>>>>>>>>> wrote:
>>>>>>>>>>>>>>>
>>>>>>>>>>>>>>>> Besides the global parallelism of beam job, is there any
>>>>>>>>>>>>>>>> way to set parallelism for individual operators like group by 
>>>>>>>>>>>>>>>> and join? I
>>>>>>>>>>>>>>>> understand the parallelism setting depends on the underlying 
>>>>>>>>>>>>>>>> execution
>>>>>>>>>>>>>>>> engine, but it is very common to set parallelism like group by 
>>>>>>>>>>>>>>>> and join in
>>>>>>>>>>>>>>>> both spark & flink.
>>>>>>>>>>>>>>>>
>>>>>>>>>>>>>>>>
>>>>>>>>>>>>>>>>
>>>>>>>>>>>>>>>>
>>>>>>>>>>>>>>>>
>>>>>>>>>>>>>>>>
>>>>>>>>>>>>>>>>
>>>>>>>>>>>>>>>> --
>>>>>>>>>>>>>>>> Best Regards
>>>>>>>>>>>>>>>>
>>>>>>>>>>>>>>>> Jeff Zhang
>>>>>>>>>>>>>>>>
>>>>>>>>>>>>>>>
>>>>>>>>>>>>>>
>>>>>>>>>>>>>> --
>>>>>>>>>>>>>> Best Regards
>>>>>>>>>>>>>>
>>>>>>>>>>>>>> Jeff Zhang
>>>>>>>>>>>>>>
>>>>>>>>>>>>>
>>>>>>>>>>>>
>>>>>>>>>>>> --
>>>>>>>>>>>> Best Regards
>>>>>>>>>>>>
>>>>>>>>>>>> Jeff Zhang
>>>>>>>>>>>>
>>>>>>>>>>>
>>>>>>>>>>
>>>>>>>>>> --
>>>>>>>>>> Best Regards
>>>>>>>>>>
>>>>>>>>>> Jeff Zhang
>>>>>>>>>>
>>>>>>>>> --
>>>>>>>> Twitter: https://twitter.com/holdenkarau
>>>>>>>> Books (Learning Spark, High Performance Spark, etc.):
>>>>>>>> https://amzn.to/2MaRAG9  <https://amzn.to/2MaRAG9>
>>>>>>>> YouTube Live Streams: https://www.youtube.com/user/holdenkarau
>>>>>>>>
>>>>>>>
>>>>>>>
>>>>>>> --
>>>>>>> Best Regards
>>>>>>>
>>>>>>> Jeff Zhang
>>>>>>>
>>>>>>
>>>
>>> --
>>> Best Regards
>>>
>>> Jeff Zhang
>>>
>>

Reply via email to