I'm not sure that runner determined sharding/GroupIntoBatches applies to splittable DoFns. Splittable DoFns is about taking one element and having a high fan-out / high cost function broken down to smaller pieces while runner determined sharding/GroupIntoBatches is about taking a lot of small elements and doing something with all of them together at once (e.g. service call).
On Mon, Jun 29, 2020 at 3:00 PM Siyuan Chen <syc...@google.com> wrote: > > -- > Best regards, > Siyuan > > > On Mon, Jun 29, 2020 at 1:06 PM Kenneth Knowles <k...@apache.org> wrote: > >> Great doc. >> >> The extended API makes sense. I like how it removes a knob. The question >> that I have to ask is whether there is a core model change here or can we >> avoid it. Defining "shard" as a scope of state within which execution is >> observably serial, today the model has key+window sharding always. It seems >> like two options mentioned are: >> >> - new primitive way for runner to inject shard keys, so standard >> stateful ParDo works >> - new primitive stateful ParDo with runner-defined scope of state that >> is not observable to users >> >> Seems on the doc that the latter is favored. I like it better, too. I do >> not know how dynamic resharding would work for either of these. With >> dynamic resharding the latter seems related to SDF. >> > The runner-side implementation can be the same for both cases. One way > would be to generate a shard number for a given input key when the data is > emitted by an upstream transform and passed back to the runner, and then > the runner can distribute the data to the corresponding shard for > processing GroupIntoBatches. It's dynamic in that the data with the same > key emitted at different times can be tagged with different shard numbers. > SDF is mainly for sources AFAIU so is different (not familiar with that so > correct me if I am wrong :p). > >> >> Suppose we say that the semantics of the URN+payload "GroupIntoBatch { >> maxSize = <n> }" allows any segmentation of the PCollection into batches of >> size <= n. The runner can replace it with the proposed implementation. But >> what does the default implementation look like? If the default >> implementation is not good (like, say, "reshuffle") then maybe figuring out >> the primitive and adding it is better than leaving a transform that is sort >> of a hack when not replaced. >> > > It's a good point that we should think more carefully when extending the > API. The default implementation of `ofSize()` or other similar limits like > `ofBytes()` would behave like a normal stateful DoFn. (`ofMaxSize` sounds > similar to `ofSize()` since ofSize() guarantees emitting a fixed number of > elements whenever possible and emitting a partial result otherwise.) > Enabling runner determined sharding on a runner that does not support > dynamic sharding would fall back to the current (default) implementation > so no behavioral changes than today. Allowing the transform to accept > unkeyed input might have to use a naive sharding though, e.g., paring each > element with a random key. > >> >> Kenn >> >> On Mon, Jun 29, 2020 at 9:26 AM Luke Cwik <lc...@google.com> wrote: >> >>> >>> >>> On Fri, Jun 26, 2020 at 3:45 PM Tyson Hamilton <tyso...@google.com> >>> wrote: >>> >>>> Nice doc by the way, it's concise. Thanks for sharing and I'm excited >>>> to see this feature, particularly the PCollection<T> variant that would >>>> have been useful for the Cloud AI transforms recently introduced. >>>> >>>> On Fri, Jun 26, 2020 at 3:25 PM Tyson Hamilton <tyso...@google.com> >>>> wrote: >>>> >>>>> >>>>> >>>>> On Fri, Jun 26, 2020 at 12:24 PM Siyuan Chen <syc...@google.com> >>>>> wrote: >>>>> >>>>>> Thanks Luke! >>>>>> >>>>>> Hi, I'm Siyuan and I'm working on the Google Dataflow team. We are >>>>>> faced with similar issues with some sinks commonly used by Dataflow such >>>>>> as >>>>>> the streaming BigQuery sink. Basically the elements are grouped and >>>>>> batched >>>>>> so some following operations (potentially expensive) can be performed at >>>>>> once on a batch of elements. One problem with the grouping is that it >>>>>> could >>>>>> impose a limit on the parallelism of the DoFn performing those >>>>>> operations. >>>>>> To mitigate the limited parallelism problem, recently I have been looking >>>>>> into the idea of improving the `GroupIntoBatches` transform to allow the >>>>>> grouping to be dynamically sharded and therefore distributed - >>>>>> essentially >>>>>> a "shardable" stateful DoFn. The transform already does grouping and >>>>>> batching on the input KV - grouping on K and batching on V - and it could >>>>>> be extended to be able to shard the key and do batching within each shard >>>>>> (meaning that we would have a sharded form of keys somewhere). The idea >>>>>> is >>>>>> detailed in https://s.apache.org/sharded-group-into-batches >>>>>> >>>>>> Along with the proposal, there are two points I would like to ask for >>>>>> advice: >>>>>> - Would there be cases where the sharded keys need to be visible to >>>>>> users? One case where that might be needed would be to apply another >>>>>> stateful DoFn to the sharded output of the GroupIntoBatches, so the >>>>>> semantics of key-to-user state mapping is respected. >>>>>> >>>>> >>>>> Does exposing an API for the sharded keys change the implementation of >>>>> the feature? If it is only an API change I think it would be best to avoid >>>>> exposing the keys to start with to avoid any unnecessary dependency on the >>>>> implementation. It seems like it could make it more difficult to modify >>>>> the >>>>> sharding implementation in the future unnecessarily at this point. >>>>> >>>> >>> Exposing the shard id makes it such that the shard id must cross the >>> portability APIs during execution. If we don't expose it then it could be >>> implemented completely within the runner and all the SDK has to do is say >>> that this stateful transform supports sharding or this transform is the >>> GroupIntoBatches transform. >>> >>> >>>> - Would there be a need to have a per element shard id or per bundle >>>>>> shard id would just be sufficient? The former is more general and we >>>>>> could >>>>>> still have the same shard id for all elements in a bundle. But the >>>>>> conclusion would potentially affect the way of implementation (like how >>>>>> the >>>>>> sharding information should be passed across FnAPI for example). >>>>>> >>>>>> >>>>> Are you referring to an API for a pipeline author to get the shard id? >>>>> I thought that a bundle isn't a pipeline author abstraction but an >>>>> implementation detail, I may be wrong in this since I'm not too familiar >>>>> with this area of code. In the proposal it looks like the shard id isn't >>>>> exposed, I prefer this, as I'm not sure there is any value for the user in >>>>> having a specific 'shard id'. Is there? >>>>> >>>> >>> A bundle is exposed to a pipeline author since they regularly want to >>> know the lifetime (e.g. startBundle/finishBundle on DoFn) of a bundle to >>> amortize setup/teardown at the bundle level. >>> >>> Exposing the shard id is the big question and is there a use case for >>> it. I hope the community can provide guidance here otherwise I'm with you >>> and also agree that exposing it isn't necessary. >>> >>> >>>> >>>>> >>>>>> I'm very new to Beam so looking forward to hearing the thoughts from >>>>>> the community. Any comments will be appreciated :) >>>>>> -- >>>>>> Best regards, >>>>>> Siyuan >>>>>> >>>>>> >>>>>> On Tue, Jun 16, 2020 at 3:04 PM Luke Cwik <lc...@google.com> wrote: >>>>>> >>>>>>> My first idea was to use a connection pool that is shared across the >>>>>>> entire worker across multiple bundles. The connection pool would TTL >>>>>>> connections that have been unused. This would help a bunch up until you >>>>>>> hit >>>>>>> the problem where you don't want every worker connected to every >>>>>>> resource >>>>>>> because of sharding of the work. In this case we should really be making >>>>>>> sure that workers that have processed the same "key" process the same >>>>>>> "key" >>>>>>> again without limiting the number of workers that can process a specific >>>>>>> key. This is very similar to what we do with a stateful DoFn but one >>>>>>> where >>>>>>> the runner knows that it can "shard" the key. +Siyuan Chen >>>>>>> <syc...@google.com> has been investigating something like this for >>>>>>> Dataflow to solve scalability issues with the BigQuery sink and has been >>>>>>> looking into how a better GroupIntoBatches and/or sharded stateful DoFn >>>>>>> could really help in these situations. This applies in general to lots >>>>>>> of >>>>>>> things where we want to co-locate things with the same key but not limit >>>>>>> the parallel processing to only a single worker like stateful DoFn does >>>>>>> today. >>>>>>> >>>>>>> On Tue, Jun 16, 2020 at 2:44 PM Ismaël Mejía <ieme...@gmail.com> >>>>>>> wrote: >>>>>>> >>>>>>>> We have been promoting the use of DoFn to write IO connectors for >>>>>>>> many reasons >>>>>>>> including better composability. A common pattern that arrives in >>>>>>>> such IOs is >>>>>>>> that a preceding transform prepares the specification element on >>>>>>>> split that a >>>>>>>> subsequent DoFn uses to read the data. You can see an example of >>>>>>>> this on FileIO >>>>>>>> [1] or in RedisIO [2] >>>>>>>> >>>>>>>> The issue is that if we process that spec in the `@ProcessElement` >>>>>>>> method we >>>>>>>> lose the DoFn lifecycle because we cannot establish a connection on >>>>>>>> `@Setup` and >>>>>>>> close it in `@Teardown` because the spec is per element, so we end >>>>>>>> up re >>>>>>>> creating connections which is a quite costly operation in some >>>>>>>> systems like >>>>>>>> Cassandra/HBase/etc and that it could end up saturating the data >>>>>>>> store because >>>>>>>> of the massive creation of connections (something that already >>>>>>>> happened in the >>>>>>>> past with JdbcIO in the streaming case). >>>>>>>> >>>>>>>> In the ongoing PR that transforms Cassandra to be DoFn based [3] >>>>>>>> this subject >>>>>>>> appeared again, and we were discussing how to eventually reuse >>>>>>>> connections, >>>>>>>> maybe by a pretty naive approach of saving a previous connection >>>>>>>> (or set of >>>>>>>> identified connections) statically so it can be reused by multiple >>>>>>>> DoFns >>>>>>>> instances. We already had some issues in the past because of >>>>>>>> creating many >>>>>>>> connections on other IOs (JdbcIO) with streaming pipelines where >>>>>>>> databases were >>>>>>>> swamped by massive amounts of connections, so reusing connections >>>>>>>> seems to be >>>>>>>> something that matters, but at the moment we do not have a clear >>>>>>>> way to do this >>>>>>>> better. >>>>>>>> >>>>>>>> Anyone have better ideas or recommendations for this scenario? >>>>>>>> Thanks in advance. >>>>>>>> >>>>>>>> Ismaël >>>>>>>> >>>>>>>> [1] >>>>>>>> https://github.com/apache/beam/blob/14085a5a3c0e146fcc13ca77515bd24abc255eda/sdks/java/io/parquet/src/main/java/org/apache/beam/sdk/io/parquet/ParquetIO.java#L260 >>>>>>>> [2] >>>>>>>> https://github.com/apache/beam/blob/14085a5a3c0e146fcc13ca77515bd24abc255eda/sdks/java/io/solr/src/main/java/org/apache/beam/sdk/io/solr/SolrIO.java#L471 >>>>>>>> [3] https://github.com/apache/beam/pull/10546 >>>>>>> >>>>>>> >>>>>> >>>>>