Hi

As a use case we have records being fetched from Kinesis as well as S3 
(Bounded) source as an unified pipeline which eventually is flattened into a 
single projection/output  for processing the data. But we usually end up not 
needing a lot of task slots / parallelism for processing data coming in from 
Kinesis as opposed to we need more parallelism for reading data from S3. So 
usually in this case we end up assigning a parallelism of X which is way more 
than what is actually needed for our unbounded stream in our case Kinesis. We 
then usually are forced to either have equal shards to distribute the load 
among task slots but shards = money w.r.t AWS. Or we can stick with fixed shard 
in which case we don’t use the compute to the fullest potential and also 
usually this causes errors like rate limit exceeded within kinesis itself.

So I was wondering if we can at least have a way to mention individual 
parallelism for some sources and sinks. I understand that individual 
parallelism for operators won’t be easy especially if its targeted to a 
specific runner .. but can the IO’s have a pipelineOption may be that is 
extended out of FlinkRunner’s pipelineOptions which when set can be used to set 
it’s parallelism or can default to the global one when the runner is flink?


Thanks
Akshay I.







From: amit kumar <akdata...@gmail.com>
Reply-To: "dev@beam.apache.org" <dev@beam.apache.org>
Date: Monday, June 29, 2020 at 2:47 PM
To: "dev@beam.apache.org" <dev@beam.apache.org>
Subject: Re: Individual Parallelism support for Flink Runner

Notice: This email is from an external sender.


Looks like

https://ci.apache.org/projects/flink/flink-docs-stable/dev/parallel.html#operator-level

Regards,
Amit

On Mon, Jun 29, 2020 at 12:59 PM Kenneth Knowles 
<k...@apache.org<mailto:k...@apache.org>> wrote:
This exact issue has been discussed before, though I can't find the older 
threads. Basically, specifying parallelism is a workaround (aka a cost), not a 
feature (aka a benefit). Sometimes you have to pay that cost as it is the only 
solution currently understood or implemented. It depends on what your reason is 
for having to set parallelism.

A lot of the time, the parallelism is a property of the combination of the 
pipeline and the data. The same pipeline with different data should have this 
tuned differently. For composite transforms in a library (not the top level 
pipeline) this is even more likely. It sounds like the suggestions here fit 
this case.

Some of the time, max parallelism has to do with not overwhelming another 
service. This depends on the particular endpoint. That is usually 
construction-time information. In this case you want to have portable mandatory 
limits.

Could you clarify your use case?

Kenn

On Mon, Jun 29, 2020 at 8:58 AM Luke Cwik 
<lc...@google.com<mailto:lc...@google.com>> wrote:
Check out this thread[1] about adding "runner determined sharding" as a general 
concept. This could be used to enhance the reshuffle implementation 
significantly and might remove the need for per transform parallelism from that 
specific use case and likely from most others.

1: 
https://lists.apache.org/thread.html/rfd1ca93268eb215fbbcfe098c1dfb330f1b84fb89673325135dfd9a8%40%3Cdev.beam.apache.org%3E

On Mon, Jun 29, 2020 at 4:03 AM Maximilian Michels 
<m...@apache.org<mailto:m...@apache.org>> wrote:
We could allow parameterizing transforms by using transform identifiers
from the pipeline, e.g.


   options = ['--parameterize=MyTransform;parallelism=5']
   with Pipeline.create(PipelineOptions(options)) as p:
     p | Create(1, 2, 3) | 'MyTransform' >> ParDo(..)


Those hints should always be optional, such that a pipeline continues to
run on all runners.

-Max

On 28.06.20 14:30, Reuven Lax wrote:
> However such a parameter would be specific to a single transform,
> whereas maxNumWorkers is a global parameter today.
>
> On Sat, Jun 27, 2020 at 10:31 PM Daniel Collins 
> <dpcoll...@google.com<mailto:dpcoll...@google.com>
> <mailto:dpcoll...@google.com<mailto:dpcoll...@google.com>>> wrote:
>
>     I could imagine for example, a 'parallelismHint' field in the base
>     parameters that could be set to maxNumWorkers when running on
>     dataflow or an equivalent parameter when running on flink. It would
>     be useful to get a default value for the sharding in the Reshuffle
>     changes here https://github.com/apache/beam/pull/11919, but more
>     generally to have some decent guess on how to best shard work. Then
>     it would be runner-agnostic; you could set it to something like
>     numCpus on the local runner for instance.
>
>     On Sat, Jun 27, 2020 at 2:04 AM Reuven Lax 
> <re...@google.com<mailto:re...@google.com>
>     <mailto:re...@google.com<mailto:re...@google.com>>> wrote:
>
>         It's an interesting question - this parameter is clearly very
>         runner specific (e.g. it would be meaningless for the Dataflow
>         runner, where parallelism is not a static constant). How should
>         we go about passing runner-specific options per transform?
>
>         On Fri, Jun 26, 2020 at 1:14 PM Akshay Iyangar
>         <aiyan...@godaddy.com<mailto:aiyan...@godaddy.com> 
> <mailto:aiyan...@godaddy.com<mailto:aiyan...@godaddy.com>>> wrote:
>
>             Hi beam community,____
>
>             __ __
>
>             So I had brought this issue in our slack channel but I guess
>             this warrants a deeper discussion and if we do go about what
>             is the POA for it.____
>
>             __ __
>
>             So basically currently for Flink Runner we don’t support
>             operator level parallelism which native Flink provides OOTB.
>             So I was wondering what the community feels about having
>             some way to pass parallelism for individual operators esp.
>               for some of the existing IO’s ____
>
>             __ __
>
>             Wanted to know what people think of this.____
>
>             __ __
>
>             Thanks ____
>
>             Akshay I____
>

Reply via email to