Hi Kenn,

Thanks for the response.

I haven't hit any specific issue yet. I think if the IO connector
implementation does take parallelism into consideration, runners can
parallelize primitive transforms in the connector (key-partitioned for GBK
and stateful ParDo, and round robin for stateless ParDo). For example,
TextIO first writes a temp file for every bundle, then uses a void key to
prevent parallelism, and then finalizes the result. It should work properly
in a distributed environment.

But applications can provide any custom IO connectors, and the runner does
not know whether a connector can be safely parallelized. Can I assume that
it is the applications' responsibility to make sure their IO connector
works correctly when running in parallel?

Thanks,
Shen

On Tue, Apr 3, 2018 at 6:11 PM, Kenneth Knowles <k...@google.com> wrote:

> The runner should generally not need to be aware of any getNumShard() API
> on a connector. The connector itself is probably a composite transform
> (with a ParDo or two or three somewhere doing the actual writes) and should
> be designed to expose available parallelism. Specifying the number of
> shards actually usually limits the parallelism, versus letting the runner
> use the maximum allowed parallelism.
>
> If the connector does a GBK to gather input elements into a single
> iterable, then it is a single element and cannot be processed in parallel
> (except through splittable DoFn, but in that case you may not need to do
> the GBK in the first place). And converse to that, if the connector does
> not do a GBK to gather input elements, then the runner is permitted to
> bundle them any way it wants and process all of them as though in parallel
> (except for stateful DoFn, in which case you probably don't need the GBK).
>
> Bundling is an important way that this works, too, since the @FinishBundle
> method is really a "flush" method, with @ProcessElement perhaps buffering
> up elements to be written to e.g. the same file shard. It is not this
> simple in practice but that gives the idea of how even with unrestricted
> elementwise parallelism you don't get one shard per element.
>
> These are all just ideas, and I'm not the connector expert. But I think
> the TL;DR is that a runner shouldn't need to know this - have you hit
> specific issues with a particular connector? That could make this a very
> productive discussion.
>
> Kenn
>
> On Mon, Apr 2, 2018 at 1:41 PM Shen Li <cs.she...@gmail.com> wrote:
>
>> Hi,
>>
>> It seems that there is no Sink base class. Some IO connectors (e.g.,
>> KafkaIO and TextIO) provide a getNumShard() API. But it is not generally
>> available for all existing Beam IO connectors and potential custom ones. 
>> Although
>> some IO connectors are implemented using ParDo/GBK, it is unclear whether
>> the runner can directly parallelize those transforms (e.g., what if it only
>> writes to a single file). Is there a general way for runners to take
>> advantage of sink parallelism?
>>
>> Thanks,
>> Shen
>>
>>
>>

Reply via email to