I agree at the level of GroupIntoBatches. The part that is similar is if you implement GroupIntoBatches with a new primitive supporting runner-scoped state where the state is per-shard and the runner is able to split state shards.
Kenn On Tue, Jun 30, 2020 at 9:22 AM Luke Cwik <lc...@google.com> wrote: > I'm not sure that runner determined sharding/GroupIntoBatches applies to > splittable DoFns. Splittable DoFns is about taking one element and having a > high fan-out / high cost function broken down to smaller pieces while > runner determined sharding/GroupIntoBatches is about taking a lot of small > elements and doing something with all of them together at once (e.g. > service call). > > > On Mon, Jun 29, 2020 at 3:00 PM Siyuan Chen <syc...@google.com> wrote: > >> >> -- >> Best regards, >> Siyuan >> >> >> On Mon, Jun 29, 2020 at 1:06 PM Kenneth Knowles <k...@apache.org> wrote: >> >>> Great doc. >>> >>> The extended API makes sense. I like how it removes a knob. The question >>> that I have to ask is whether there is a core model change here or can we >>> avoid it. Defining "shard" as a scope of state within which execution is >>> observably serial, today the model has key+window sharding always. It seems >>> like two options mentioned are: >>> >>> - new primitive way for runner to inject shard keys, so standard >>> stateful ParDo works >>> - new primitive stateful ParDo with runner-defined scope of state that >>> is not observable to users >>> >>> Seems on the doc that the latter is favored. I like it better, too. I do >>> not know how dynamic resharding would work for either of these. With >>> dynamic resharding the latter seems related to SDF. >>> >> The runner-side implementation can be the same for both cases. One way >> would be to generate a shard number for a given input key when the data is >> emitted by an upstream transform and passed back to the runner, and then >> the runner can distribute the data to the corresponding shard for >> processing GroupIntoBatches. It's dynamic in that the data with the same >> key emitted at different times can be tagged with different shard numbers. >> SDF is mainly for sources AFAIU so is different (not familiar with that so >> correct me if I am wrong :p). >> >>> >>> Suppose we say that the semantics of the URN+payload "GroupIntoBatch { >>> maxSize = <n> }" allows any segmentation of the PCollection into batches of >>> size <= n. The runner can replace it with the proposed implementation. But >>> what does the default implementation look like? If the default >>> implementation is not good (like, say, "reshuffle") then maybe figuring out >>> the primitive and adding it is better than leaving a transform that is sort >>> of a hack when not replaced. >>> >> >> It's a good point that we should think more carefully when extending the >> API. The default implementation of `ofSize()` or other similar limits like >> `ofBytes()` would behave like a normal stateful DoFn. (`ofMaxSize` sounds >> similar to `ofSize()` since ofSize() guarantees emitting a fixed number of >> elements whenever possible and emitting a partial result otherwise.) >> Enabling runner determined sharding on a runner that does not support >> dynamic sharding would fall back to the current (default) implementation >> so no behavioral changes than today. Allowing the transform to accept >> unkeyed input might have to use a naive sharding though, e.g., paring each >> element with a random key. >> >>> >>> Kenn >>> >>> On Mon, Jun 29, 2020 at 9:26 AM Luke Cwik <lc...@google.com> wrote: >>> >>>> >>>> >>>> On Fri, Jun 26, 2020 at 3:45 PM Tyson Hamilton <tyso...@google.com> >>>> wrote: >>>> >>>>> Nice doc by the way, it's concise. Thanks for sharing and I'm excited >>>>> to see this feature, particularly the PCollection<T> variant that would >>>>> have been useful for the Cloud AI transforms recently introduced. >>>>> >>>>> On Fri, Jun 26, 2020 at 3:25 PM Tyson Hamilton <tyso...@google.com> >>>>> wrote: >>>>> >>>>>> >>>>>> >>>>>> On Fri, Jun 26, 2020 at 12:24 PM Siyuan Chen <syc...@google.com> >>>>>> wrote: >>>>>> >>>>>>> Thanks Luke! >>>>>>> >>>>>>> Hi, I'm Siyuan and I'm working on the Google Dataflow team. We are >>>>>>> faced with similar issues with some sinks commonly used by Dataflow >>>>>>> such as >>>>>>> the streaming BigQuery sink. Basically the elements are grouped and >>>>>>> batched >>>>>>> so some following operations (potentially expensive) can be performed at >>>>>>> once on a batch of elements. One problem with the grouping is that it >>>>>>> could >>>>>>> impose a limit on the parallelism of the DoFn performing those >>>>>>> operations. >>>>>>> To mitigate the limited parallelism problem, recently I have been >>>>>>> looking >>>>>>> into the idea of improving the `GroupIntoBatches` transform to allow the >>>>>>> grouping to be dynamically sharded and therefore distributed - >>>>>>> essentially >>>>>>> a "shardable" stateful DoFn. The transform already does grouping and >>>>>>> batching on the input KV - grouping on K and batching on V - and it >>>>>>> could >>>>>>> be extended to be able to shard the key and do batching within each >>>>>>> shard >>>>>>> (meaning that we would have a sharded form of keys somewhere). The idea >>>>>>> is >>>>>>> detailed in https://s.apache.org/sharded-group-into-batches >>>>>>> >>>>>>> Along with the proposal, there are two points I would like to ask >>>>>>> for advice: >>>>>>> - Would there be cases where the sharded keys need to be visible to >>>>>>> users? One case where that might be needed would be to apply another >>>>>>> stateful DoFn to the sharded output of the GroupIntoBatches, so the >>>>>>> semantics of key-to-user state mapping is respected. >>>>>>> >>>>>> >>>>>> Does exposing an API for the sharded keys change the implementation >>>>>> of the feature? If it is only an API change I think it would be best to >>>>>> avoid exposing the keys to start with to avoid any unnecessary dependency >>>>>> on the implementation. It seems like it could make it more difficult to >>>>>> modify the sharding implementation in the future unnecessarily at this >>>>>> point. >>>>>> >>>>> >>>> Exposing the shard id makes it such that the shard id must cross the >>>> portability APIs during execution. If we don't expose it then it could be >>>> implemented completely within the runner and all the SDK has to do is say >>>> that this stateful transform supports sharding or this transform is the >>>> GroupIntoBatches transform. >>>> >>>> >>>>> - Would there be a need to have a per element shard id or per bundle >>>>>>> shard id would just be sufficient? The former is more general and we >>>>>>> could >>>>>>> still have the same shard id for all elements in a bundle. But the >>>>>>> conclusion would potentially affect the way of implementation (like how >>>>>>> the >>>>>>> sharding information should be passed across FnAPI for example). >>>>>>> >>>>>>> >>>>>> Are you referring to an API for a pipeline author to get the shard >>>>>> id? I thought that a bundle isn't a pipeline author abstraction but an >>>>>> implementation detail, I may be wrong in this since I'm not too familiar >>>>>> with this area of code. In the proposal it looks like the shard id isn't >>>>>> exposed, I prefer this, as I'm not sure there is any value for the user >>>>>> in >>>>>> having a specific 'shard id'. Is there? >>>>>> >>>>> >>>> A bundle is exposed to a pipeline author since they regularly want to >>>> know the lifetime (e.g. startBundle/finishBundle on DoFn) of a bundle to >>>> amortize setup/teardown at the bundle level. >>>> >>>> Exposing the shard id is the big question and is there a use case for >>>> it. I hope the community can provide guidance here otherwise I'm with you >>>> and also agree that exposing it isn't necessary. >>>> >>>> >>>>> >>>>>> >>>>>>> I'm very new to Beam so looking forward to hearing the thoughts from >>>>>>> the community. Any comments will be appreciated :) >>>>>>> -- >>>>>>> Best regards, >>>>>>> Siyuan >>>>>>> >>>>>>> >>>>>>> On Tue, Jun 16, 2020 at 3:04 PM Luke Cwik <lc...@google.com> wrote: >>>>>>> >>>>>>>> My first idea was to use a connection pool that is shared across >>>>>>>> the entire worker across multiple bundles. The connection pool would >>>>>>>> TTL >>>>>>>> connections that have been unused. This would help a bunch up until >>>>>>>> you hit >>>>>>>> the problem where you don't want every worker connected to every >>>>>>>> resource >>>>>>>> because of sharding of the work. In this case we should really be >>>>>>>> making >>>>>>>> sure that workers that have processed the same "key" process the same >>>>>>>> "key" >>>>>>>> again without limiting the number of workers that can process a >>>>>>>> specific >>>>>>>> key. This is very similar to what we do with a stateful DoFn but one >>>>>>>> where >>>>>>>> the runner knows that it can "shard" the key. +Siyuan Chen >>>>>>>> <syc...@google.com> has been investigating something like this for >>>>>>>> Dataflow to solve scalability issues with the BigQuery sink and has >>>>>>>> been >>>>>>>> looking into how a better GroupIntoBatches and/or sharded stateful DoFn >>>>>>>> could really help in these situations. This applies in general to lots >>>>>>>> of >>>>>>>> things where we want to co-locate things with the same key but not >>>>>>>> limit >>>>>>>> the parallel processing to only a single worker like stateful DoFn does >>>>>>>> today. >>>>>>>> >>>>>>>> On Tue, Jun 16, 2020 at 2:44 PM Ismaël Mejía <ieme...@gmail.com> >>>>>>>> wrote: >>>>>>>> >>>>>>>>> We have been promoting the use of DoFn to write IO connectors for >>>>>>>>> many reasons >>>>>>>>> including better composability. A common pattern that arrives in >>>>>>>>> such IOs is >>>>>>>>> that a preceding transform prepares the specification element on >>>>>>>>> split that a >>>>>>>>> subsequent DoFn uses to read the data. You can see an example of >>>>>>>>> this on FileIO >>>>>>>>> [1] or in RedisIO [2] >>>>>>>>> >>>>>>>>> The issue is that if we process that spec in the `@ProcessElement` >>>>>>>>> method we >>>>>>>>> lose the DoFn lifecycle because we cannot establish a connection >>>>>>>>> on `@Setup` and >>>>>>>>> close it in `@Teardown` because the spec is per element, so we end >>>>>>>>> up re >>>>>>>>> creating connections which is a quite costly operation in some >>>>>>>>> systems like >>>>>>>>> Cassandra/HBase/etc and that it could end up saturating the data >>>>>>>>> store because >>>>>>>>> of the massive creation of connections (something that already >>>>>>>>> happened in the >>>>>>>>> past with JdbcIO in the streaming case). >>>>>>>>> >>>>>>>>> In the ongoing PR that transforms Cassandra to be DoFn based [3] >>>>>>>>> this subject >>>>>>>>> appeared again, and we were discussing how to eventually reuse >>>>>>>>> connections, >>>>>>>>> maybe by a pretty naive approach of saving a previous connection >>>>>>>>> (or set of >>>>>>>>> identified connections) statically so it can be reused by multiple >>>>>>>>> DoFns >>>>>>>>> instances. We already had some issues in the past because of >>>>>>>>> creating many >>>>>>>>> connections on other IOs (JdbcIO) with streaming pipelines where >>>>>>>>> databases were >>>>>>>>> swamped by massive amounts of connections, so reusing connections >>>>>>>>> seems to be >>>>>>>>> something that matters, but at the moment we do not have a clear >>>>>>>>> way to do this >>>>>>>>> better. >>>>>>>>> >>>>>>>>> Anyone have better ideas or recommendations for this scenario? >>>>>>>>> Thanks in advance. >>>>>>>>> >>>>>>>>> Ismaël >>>>>>>>> >>>>>>>>> [1] >>>>>>>>> https://github.com/apache/beam/blob/14085a5a3c0e146fcc13ca77515bd24abc255eda/sdks/java/io/parquet/src/main/java/org/apache/beam/sdk/io/parquet/ParquetIO.java#L260 >>>>>>>>> [2] >>>>>>>>> https://github.com/apache/beam/blob/14085a5a3c0e146fcc13ca77515bd24abc255eda/sdks/java/io/solr/src/main/java/org/apache/beam/sdk/io/solr/SolrIO.java#L471 >>>>>>>>> [3] https://github.com/apache/beam/pull/10546 >>>>>>>> >>>>>>>> >>>>>>> >>>>>>