Thanks for the explanation! Shen
On Wed, Apr 4, 2018 at 12:38 AM, Eugene Kirpichov <[email protected]> wrote: > Hi Shen, > > There is no "IO connector API" in Beam (not counting the deprecated Source > API), IO is merely an informal term for a PTransform that interacts in some > way with some external storage system. So whatever question you're asking > about IO connectors, you might as well be asking it about PTransforms in > general. See https://conferences.oreilly.com/strata/strata-ca/ > public/schedule/detail/63696 > > To answer your question, then: is it responsibility of a PTransform author > to make sure their code works correctly when different elements of various > PCollection's are processed by downstream ParDo's in parallel? Yes, of > course. > > Things like "writing to a single file" are simply implemented by > non-parallel code - e.g. GBK the data onto a single key, and write a ParDo > that takes the single KV<K, Iterable<V>> and writes the Iterable to the > file. This is, by definition, sequential (modulo windowing/triggering - > different windows and different firings for the same key can still be > processed in parallel). > > On Tue, Apr 3, 2018 at 8:56 PM Shen Li <[email protected]> wrote: > >> Hi Kenn, >> >> Thanks for the response. >> >> I haven't hit any specific issue yet. I think if the IO connector >> implementation does take parallelism into consideration, runners can >> parallelize primitive transforms in the connector (key-partitioned for GBK >> and stateful ParDo, and round robin for stateless ParDo). For example, >> TextIO first writes a temp file for every bundle, then uses a void key to >> prevent parallelism, and then finalizes the result. It should work properly >> in a distributed environment. >> >> But applications can provide any custom IO connectors, and the runner >> does not know whether a connector can be safely parallelized. Can I assume >> that it is the applications' responsibility to make sure their IO connector >> works correctly when running in parallel? >> >> Thanks, >> Shen >> >> On Tue, Apr 3, 2018 at 6:11 PM, Kenneth Knowles <[email protected]> wrote: >> >>> The runner should generally not need to be aware of any getNumShard() >>> API on a connector. The connector itself is probably a composite transform >>> (with a ParDo or two or three somewhere doing the actual writes) and should >>> be designed to expose available parallelism. Specifying the number of >>> shards actually usually limits the parallelism, versus letting the runner >>> use the maximum allowed parallelism. >>> >>> If the connector does a GBK to gather input elements into a single >>> iterable, then it is a single element and cannot be processed in parallel >>> (except through splittable DoFn, but in that case you may not need to do >>> the GBK in the first place). And converse to that, if the connector does >>> not do a GBK to gather input elements, then the runner is permitted to >>> bundle them any way it wants and process all of them as though in parallel >>> (except for stateful DoFn, in which case you probably don't need the GBK). >>> >>> Bundling is an important way that this works, too, since the >>> @FinishBundle method is really a "flush" method, with @ProcessElement >>> perhaps buffering up elements to be written to e.g. the same file shard. It >>> is not this simple in practice but that gives the idea of how even with >>> unrestricted elementwise parallelism you don't get one shard per element. >>> >>> These are all just ideas, and I'm not the connector expert. But I think >>> the TL;DR is that a runner shouldn't need to know this - have you hit >>> specific issues with a particular connector? That could make this a very >>> productive discussion. >>> >>> Kenn >>> >>> On Mon, Apr 2, 2018 at 1:41 PM Shen Li <[email protected]> wrote: >>> >>>> Hi, >>>> >>>> It seems that there is no Sink base class. Some IO connectors (e.g., >>>> KafkaIO and TextIO) provide a getNumShard() API. But it is not generally >>>> available for all existing Beam IO connectors and potential custom ones. >>>> Although >>>> some IO connectors are implemented using ParDo/GBK, it is unclear whether >>>> the runner can directly parallelize those transforms (e.g., what if it only >>>> writes to a single file). Is there a general way for runners to take >>>> advantage of sink parallelism? >>>> >>>> Thanks, >>>> Shen >>>> >>>> >>>> >>
