Thanks for the explanation!

Shen

On Wed, Apr 4, 2018 at 12:38 AM, Eugene Kirpichov <kirpic...@google.com>
wrote:

> Hi Shen,
>
> There is no "IO connector API" in Beam (not counting the deprecated Source
> API), IO is merely an informal term for a PTransform that interacts in some
> way with some external storage system. So whatever question you're asking
> about IO connectors, you might as well be asking it about PTransforms in
> general. See https://conferences.oreilly.com/strata/strata-ca/
> public/schedule/detail/63696
>
> To answer your question, then: is it responsibility of a PTransform author
> to make sure their code works correctly when different elements of various
> PCollection's are processed by downstream ParDo's in parallel? Yes, of
> course.
>
> Things like "writing to a single file" are simply implemented by
> non-parallel code - e.g. GBK the data onto a single key, and write a ParDo
> that takes the single KV<K, Iterable<V>> and writes the Iterable to the
> file. This is, by definition, sequential (modulo windowing/triggering -
> different windows and different firings for the same key can still be
> processed in parallel).
>
> On Tue, Apr 3, 2018 at 8:56 PM Shen Li <cs.she...@gmail.com> wrote:
>
>> Hi Kenn,
>>
>> Thanks for the response.
>>
>> I haven't hit any specific issue yet. I think if the IO connector
>> implementation does take parallelism into consideration, runners can
>> parallelize primitive transforms in the connector (key-partitioned for GBK
>> and stateful ParDo, and round robin for stateless ParDo). For example,
>> TextIO first writes a temp file for every bundle, then uses a void key to
>> prevent parallelism, and then finalizes the result. It should work properly
>> in a distributed environment.
>>
>> But applications can provide any custom IO connectors, and the runner
>> does not know whether a connector can be safely parallelized. Can I assume
>> that it is the applications' responsibility to make sure their IO connector
>> works correctly when running in parallel?
>>
>> Thanks,
>> Shen
>>
>> On Tue, Apr 3, 2018 at 6:11 PM, Kenneth Knowles <k...@google.com> wrote:
>>
>>> The runner should generally not need to be aware of any getNumShard()
>>> API on a connector. The connector itself is probably a composite transform
>>> (with a ParDo or two or three somewhere doing the actual writes) and should
>>> be designed to expose available parallelism. Specifying the number of
>>> shards actually usually limits the parallelism, versus letting the runner
>>> use the maximum allowed parallelism.
>>>
>>> If the connector does a GBK to gather input elements into a single
>>> iterable, then it is a single element and cannot be processed in parallel
>>> (except through splittable DoFn, but in that case you may not need to do
>>> the GBK in the first place). And converse to that, if the connector does
>>> not do a GBK to gather input elements, then the runner is permitted to
>>> bundle them any way it wants and process all of them as though in parallel
>>> (except for stateful DoFn, in which case you probably don't need the GBK).
>>>
>>> Bundling is an important way that this works, too, since the
>>> @FinishBundle method is really a "flush" method, with @ProcessElement
>>> perhaps buffering up elements to be written to e.g. the same file shard. It
>>> is not this simple in practice but that gives the idea of how even with
>>> unrestricted elementwise parallelism you don't get one shard per element.
>>>
>>> These are all just ideas, and I'm not the connector expert. But I think
>>> the TL;DR is that a runner shouldn't need to know this - have you hit
>>> specific issues with a particular connector? That could make this a very
>>> productive discussion.
>>>
>>> Kenn
>>>
>>> On Mon, Apr 2, 2018 at 1:41 PM Shen Li <cs.she...@gmail.com> wrote:
>>>
>>>> Hi,
>>>>
>>>> It seems that there is no Sink base class. Some IO connectors (e.g.,
>>>> KafkaIO and TextIO) provide a getNumShard() API. But it is not generally
>>>> available for all existing Beam IO connectors and potential custom ones. 
>>>> Although
>>>> some IO connectors are implemented using ParDo/GBK, it is unclear whether
>>>> the runner can directly parallelize those transforms (e.g., what if it only
>>>> writes to a single file). Is there a general way for runners to take
>>>> advantage of sink parallelism?
>>>>
>>>> Thanks,
>>>> Shen
>>>>
>>>>
>>>>
>>

Reply via email to