Thanks a lot for the discussions and comments! My conclusions for the proposal (https://s.apache.org/sharded-group-into-batches) are as follows: - Keep the existing API for GroupIntoBatches as is. No exposure of shard id. - Enable runner determined sharding as a default (and only) option for Dataflow runner, and build an optimized version to override the default implementation.
(ccing +datapls-plat-team@ for the following question) There weren't strong opinions on the approaches to implement Dataflow's optimization, i.e., a. explicit approach: encoding shard id as a part of data by creating a type, ShardedKey. Well-known coders would need to be added for the type. b. implicit approach: appending shard id to CacheToken. This assumes that one bundle is associated with one CacheToken and therefore one shard id. Would be hard to extend to the case where one bundle could have multiple keyed states. If the assumption for b is reasonable and the extensibility is not a concern, I would go with approach b. Please advise if you have thoughts on this. I am planning to proceed with the implementation starting July 15th otherwise :) -- Best regards, Siyuan On Wed, Jul 1, 2020 at 8:53 PM Luke Cwik <lc...@google.com> wrote: > I see, the splitting of state shards is related to splitting of splittable > DoFns. > > On Tue, Jun 30, 2020 at 3:36 PM Kenneth Knowles <k...@apache.org> wrote: > >> I agree at the level of GroupIntoBatches. The part that is similar is if >> you implement GroupIntoBatches with a new primitive supporting >> runner-scoped state where the state is per-shard and the runner is able to >> split state shards. >> >> Kenn >> >> On Tue, Jun 30, 2020 at 9:22 AM Luke Cwik <lc...@google.com> wrote: >> >>> I'm not sure that runner determined sharding/GroupIntoBatches applies to >>> splittable DoFns. Splittable DoFns is about taking one element and having a >>> high fan-out / high cost function broken down to smaller pieces while >>> runner determined sharding/GroupIntoBatches is about taking a lot of small >>> elements and doing something with all of them together at once (e.g. >>> service call). >>> >>> >>> On Mon, Jun 29, 2020 at 3:00 PM Siyuan Chen <syc...@google.com> wrote: >>> >>>> >>>> -- >>>> Best regards, >>>> Siyuan >>>> >>>> >>>> On Mon, Jun 29, 2020 at 1:06 PM Kenneth Knowles <k...@apache.org> >>>> wrote: >>>> >>>>> Great doc. >>>>> >>>>> The extended API makes sense. I like how it removes a knob. The >>>>> question that I have to ask is whether there is a core model change here >>>>> or >>>>> can we avoid it. Defining "shard" as a scope of state within which >>>>> execution is observably serial, today the model has key+window sharding >>>>> always. It seems like two options mentioned are: >>>>> >>>>> - new primitive way for runner to inject shard keys, so standard >>>>> stateful ParDo works >>>>> - new primitive stateful ParDo with runner-defined scope of state >>>>> that is not observable to users >>>>> >>>>> Seems on the doc that the latter is favored. I like it better, too. I >>>>> do not know how dynamic resharding would work for either of these. With >>>>> dynamic resharding the latter seems related to SDF. >>>>> >>>> The runner-side implementation can be the same for both cases. One way >>>> would be to generate a shard number for a given input key when the data is >>>> emitted by an upstream transform and passed back to the runner, and then >>>> the runner can distribute the data to the corresponding shard for >>>> processing GroupIntoBatches. It's dynamic in that the data with the same >>>> key emitted at different times can be tagged with different shard numbers. >>>> SDF is mainly for sources AFAIU so is different (not familiar with that so >>>> correct me if I am wrong :p). >>>> >>>>> >>>>> Suppose we say that the semantics of the URN+payload "GroupIntoBatch { >>>>> maxSize = <n> }" allows any segmentation of the PCollection into batches >>>>> of >>>>> size <= n. The runner can replace it with the proposed implementation. But >>>>> what does the default implementation look like? If the default >>>>> implementation is not good (like, say, "reshuffle") then maybe figuring >>>>> out >>>>> the primitive and adding it is better than leaving a transform that is >>>>> sort >>>>> of a hack when not replaced. >>>>> >>>> >>>> It's a good point that we should think more carefully when extending >>>> the API. The default implementation of `ofSize()` or other similar limits >>>> like `ofBytes()` would behave like a normal stateful DoFn. (`ofMaxSize` >>>> sounds similar to `ofSize()` since ofSize() guarantees emitting a fixed >>>> number of elements whenever possible and emitting a partial result >>>> otherwise.) Enabling runner determined sharding on a runner that does not >>>> support dynamic sharding would fall back to the current (default) >>>> implementation so no behavioral changes than today. Allowing the transform >>>> to accept unkeyed input might have to use a naive sharding though, e.g., >>>> paring each element with a random key. >>>> >>>>> >>>>> Kenn >>>>> >>>>> On Mon, Jun 29, 2020 at 9:26 AM Luke Cwik <lc...@google.com> wrote: >>>>> >>>>>> >>>>>> >>>>>> On Fri, Jun 26, 2020 at 3:45 PM Tyson Hamilton <tyso...@google.com> >>>>>> wrote: >>>>>> >>>>>>> Nice doc by the way, it's concise. Thanks for sharing and I'm >>>>>>> excited to see this feature, particularly the PCollection<T> variant >>>>>>> that >>>>>>> would have been useful for the Cloud AI transforms recently introduced. >>>>>>> >>>>>>> On Fri, Jun 26, 2020 at 3:25 PM Tyson Hamilton <tyso...@google.com> >>>>>>> wrote: >>>>>>> >>>>>>>> >>>>>>>> >>>>>>>> On Fri, Jun 26, 2020 at 12:24 PM Siyuan Chen <syc...@google.com> >>>>>>>> wrote: >>>>>>>> >>>>>>>>> Thanks Luke! >>>>>>>>> >>>>>>>>> Hi, I'm Siyuan and I'm working on the Google Dataflow team. We are >>>>>>>>> faced with similar issues with some sinks commonly used by Dataflow >>>>>>>>> such as >>>>>>>>> the streaming BigQuery sink. Basically the elements are grouped and >>>>>>>>> batched >>>>>>>>> so some following operations (potentially expensive) can be performed >>>>>>>>> at >>>>>>>>> once on a batch of elements. One problem with the grouping is that it >>>>>>>>> could >>>>>>>>> impose a limit on the parallelism of the DoFn performing those >>>>>>>>> operations. >>>>>>>>> To mitigate the limited parallelism problem, recently I have been >>>>>>>>> looking >>>>>>>>> into the idea of improving the `GroupIntoBatches` transform to allow >>>>>>>>> the >>>>>>>>> grouping to be dynamically sharded and therefore distributed - >>>>>>>>> essentially >>>>>>>>> a "shardable" stateful DoFn. The transform already does grouping and >>>>>>>>> batching on the input KV - grouping on K and batching on V - and it >>>>>>>>> could >>>>>>>>> be extended to be able to shard the key and do batching within each >>>>>>>>> shard >>>>>>>>> (meaning that we would have a sharded form of keys somewhere). The >>>>>>>>> idea is >>>>>>>>> detailed in https://s.apache.org/sharded-group-into-batches >>>>>>>>> >>>>>>>>> Along with the proposal, there are two points I would like to ask >>>>>>>>> for advice: >>>>>>>>> - Would there be cases where the sharded keys need to be visible >>>>>>>>> to users? One case where that might be needed would be to apply >>>>>>>>> another >>>>>>>>> stateful DoFn to the sharded output of the GroupIntoBatches, so the >>>>>>>>> semantics of key-to-user state mapping is respected. >>>>>>>>> >>>>>>>> >>>>>>>> Does exposing an API for the sharded keys change the implementation >>>>>>>> of the feature? If it is only an API change I think it would be best to >>>>>>>> avoid exposing the keys to start with to avoid any unnecessary >>>>>>>> dependency >>>>>>>> on the implementation. It seems like it could make it more difficult to >>>>>>>> modify the sharding implementation in the future unnecessarily at this >>>>>>>> point. >>>>>>>> >>>>>>> >>>>>> Exposing the shard id makes it such that the shard id must cross the >>>>>> portability APIs during execution. If we don't expose it then it could be >>>>>> implemented completely within the runner and all the SDK has to do is say >>>>>> that this stateful transform supports sharding or this transform is the >>>>>> GroupIntoBatches transform. >>>>>> >>>>>> >>>>>>> - Would there be a need to have a per element shard id or per bundle >>>>>>>>> shard id would just be sufficient? The former is more general and we >>>>>>>>> could >>>>>>>>> still have the same shard id for all elements in a bundle. But the >>>>>>>>> conclusion would potentially affect the way of implementation (like >>>>>>>>> how the >>>>>>>>> sharding information should be passed across FnAPI for example). >>>>>>>>> >>>>>>>>> >>>>>>>> Are you referring to an API for a pipeline author to get the shard >>>>>>>> id? I thought that a bundle isn't a pipeline author abstraction but an >>>>>>>> implementation detail, I may be wrong in this since I'm not too >>>>>>>> familiar >>>>>>>> with this area of code. In the proposal it looks like the shard id >>>>>>>> isn't >>>>>>>> exposed, I prefer this, as I'm not sure there is any value for the >>>>>>>> user in >>>>>>>> having a specific 'shard id'. Is there? >>>>>>>> >>>>>>> >>>>>> A bundle is exposed to a pipeline author since they regularly want to >>>>>> know the lifetime (e.g. startBundle/finishBundle on DoFn) of a bundle to >>>>>> amortize setup/teardown at the bundle level. >>>>>> >>>>>> Exposing the shard id is the big question and is there a use case for >>>>>> it. I hope the community can provide guidance here otherwise I'm with you >>>>>> and also agree that exposing it isn't necessary. >>>>>> >>>>>> >>>>>>> >>>>>>>> >>>>>>>>> I'm very new to Beam so looking forward to hearing the thoughts >>>>>>>>> from the community. Any comments will be appreciated :) >>>>>>>>> -- >>>>>>>>> Best regards, >>>>>>>>> Siyuan >>>>>>>>> >>>>>>>>> >>>>>>>>> On Tue, Jun 16, 2020 at 3:04 PM Luke Cwik <lc...@google.com> >>>>>>>>> wrote: >>>>>>>>> >>>>>>>>>> My first idea was to use a connection pool that is shared across >>>>>>>>>> the entire worker across multiple bundles. The connection pool would >>>>>>>>>> TTL >>>>>>>>>> connections that have been unused. This would help a bunch up until >>>>>>>>>> you hit >>>>>>>>>> the problem where you don't want every worker connected to every >>>>>>>>>> resource >>>>>>>>>> because of sharding of the work. In this case we should really be >>>>>>>>>> making >>>>>>>>>> sure that workers that have processed the same "key" process the >>>>>>>>>> same "key" >>>>>>>>>> again without limiting the number of workers that can process a >>>>>>>>>> specific >>>>>>>>>> key. This is very similar to what we do with a stateful DoFn but one >>>>>>>>>> where >>>>>>>>>> the runner knows that it can "shard" the key. +Siyuan Chen >>>>>>>>>> <syc...@google.com> has been investigating something like this >>>>>>>>>> for Dataflow to solve scalability issues with the BigQuery sink and >>>>>>>>>> has >>>>>>>>>> been looking into how a better GroupIntoBatches and/or sharded >>>>>>>>>> stateful >>>>>>>>>> DoFn could really help in these situations. This applies in general >>>>>>>>>> to lots >>>>>>>>>> of things where we want to co-locate things with the same key but >>>>>>>>>> not limit >>>>>>>>>> the parallel processing to only a single worker like stateful DoFn >>>>>>>>>> does >>>>>>>>>> today. >>>>>>>>>> >>>>>>>>>> On Tue, Jun 16, 2020 at 2:44 PM Ismaël Mejía <ieme...@gmail.com> >>>>>>>>>> wrote: >>>>>>>>>> >>>>>>>>>>> We have been promoting the use of DoFn to write IO connectors >>>>>>>>>>> for many reasons >>>>>>>>>>> including better composability. A common pattern that arrives in >>>>>>>>>>> such IOs is >>>>>>>>>>> that a preceding transform prepares the specification element on >>>>>>>>>>> split that a >>>>>>>>>>> subsequent DoFn uses to read the data. You can see an example of >>>>>>>>>>> this on FileIO >>>>>>>>>>> [1] or in RedisIO [2] >>>>>>>>>>> >>>>>>>>>>> The issue is that if we process that spec in the >>>>>>>>>>> `@ProcessElement` method we >>>>>>>>>>> lose the DoFn lifecycle because we cannot establish a connection >>>>>>>>>>> on `@Setup` and >>>>>>>>>>> close it in `@Teardown` because the spec is per element, so we >>>>>>>>>>> end up re >>>>>>>>>>> creating connections which is a quite costly operation in some >>>>>>>>>>> systems like >>>>>>>>>>> Cassandra/HBase/etc and that it could end up saturating the data >>>>>>>>>>> store because >>>>>>>>>>> of the massive creation of connections (something that already >>>>>>>>>>> happened in the >>>>>>>>>>> past with JdbcIO in the streaming case). >>>>>>>>>>> >>>>>>>>>>> In the ongoing PR that transforms Cassandra to be DoFn based [3] >>>>>>>>>>> this subject >>>>>>>>>>> appeared again, and we were discussing how to eventually reuse >>>>>>>>>>> connections, >>>>>>>>>>> maybe by a pretty naive approach of saving a previous connection >>>>>>>>>>> (or set of >>>>>>>>>>> identified connections) statically so it can be reused by >>>>>>>>>>> multiple DoFns >>>>>>>>>>> instances. We already had some issues in the past because of >>>>>>>>>>> creating many >>>>>>>>>>> connections on other IOs (JdbcIO) with streaming pipelines where >>>>>>>>>>> databases were >>>>>>>>>>> swamped by massive amounts of connections, so reusing >>>>>>>>>>> connections seems to be >>>>>>>>>>> something that matters, but at the moment we do not have a clear >>>>>>>>>>> way to do this >>>>>>>>>>> better. >>>>>>>>>>> >>>>>>>>>>> Anyone have better ideas or recommendations for this scenario? >>>>>>>>>>> Thanks in advance. >>>>>>>>>>> >>>>>>>>>>> Ismaël >>>>>>>>>>> >>>>>>>>>>> [1] >>>>>>>>>>> https://github.com/apache/beam/blob/14085a5a3c0e146fcc13ca77515bd24abc255eda/sdks/java/io/parquet/src/main/java/org/apache/beam/sdk/io/parquet/ParquetIO.java#L260 >>>>>>>>>>> [2] >>>>>>>>>>> https://github.com/apache/beam/blob/14085a5a3c0e146fcc13ca77515bd24abc255eda/sdks/java/io/solr/src/main/java/org/apache/beam/sdk/io/solr/SolrIO.java#L471 >>>>>>>>>>> [3] https://github.com/apache/beam/pull/10546 >>>>>>>>>> >>>>>>>>>> >>>>>>>>> >>>>>>>>