I see, the splitting of state shards is related to splitting of splittable DoFns.
On Tue, Jun 30, 2020 at 3:36 PM Kenneth Knowles <k...@apache.org> wrote: > I agree at the level of GroupIntoBatches. The part that is similar is if > you implement GroupIntoBatches with a new primitive supporting > runner-scoped state where the state is per-shard and the runner is able to > split state shards. > > Kenn > > On Tue, Jun 30, 2020 at 9:22 AM Luke Cwik <lc...@google.com> wrote: > >> I'm not sure that runner determined sharding/GroupIntoBatches applies to >> splittable DoFns. Splittable DoFns is about taking one element and having a >> high fan-out / high cost function broken down to smaller pieces while >> runner determined sharding/GroupIntoBatches is about taking a lot of small >> elements and doing something with all of them together at once (e.g. >> service call). >> >> >> On Mon, Jun 29, 2020 at 3:00 PM Siyuan Chen <syc...@google.com> wrote: >> >>> >>> -- >>> Best regards, >>> Siyuan >>> >>> >>> On Mon, Jun 29, 2020 at 1:06 PM Kenneth Knowles <k...@apache.org> wrote: >>> >>>> Great doc. >>>> >>>> The extended API makes sense. I like how it removes a knob. The >>>> question that I have to ask is whether there is a core model change here or >>>> can we avoid it. Defining "shard" as a scope of state within which >>>> execution is observably serial, today the model has key+window sharding >>>> always. It seems like two options mentioned are: >>>> >>>> - new primitive way for runner to inject shard keys, so standard >>>> stateful ParDo works >>>> - new primitive stateful ParDo with runner-defined scope of state that >>>> is not observable to users >>>> >>>> Seems on the doc that the latter is favored. I like it better, too. I >>>> do not know how dynamic resharding would work for either of these. With >>>> dynamic resharding the latter seems related to SDF. >>>> >>> The runner-side implementation can be the same for both cases. One way >>> would be to generate a shard number for a given input key when the data is >>> emitted by an upstream transform and passed back to the runner, and then >>> the runner can distribute the data to the corresponding shard for >>> processing GroupIntoBatches. It's dynamic in that the data with the same >>> key emitted at different times can be tagged with different shard numbers. >>> SDF is mainly for sources AFAIU so is different (not familiar with that so >>> correct me if I am wrong :p). >>> >>>> >>>> Suppose we say that the semantics of the URN+payload "GroupIntoBatch { >>>> maxSize = <n> }" allows any segmentation of the PCollection into batches of >>>> size <= n. The runner can replace it with the proposed implementation. But >>>> what does the default implementation look like? If the default >>>> implementation is not good (like, say, "reshuffle") then maybe figuring out >>>> the primitive and adding it is better than leaving a transform that is sort >>>> of a hack when not replaced. >>>> >>> >>> It's a good point that we should think more carefully when extending the >>> API. The default implementation of `ofSize()` or other similar limits like >>> `ofBytes()` would behave like a normal stateful DoFn. (`ofMaxSize` sounds >>> similar to `ofSize()` since ofSize() guarantees emitting a fixed number of >>> elements whenever possible and emitting a partial result otherwise.) >>> Enabling runner determined sharding on a runner that does not support >>> dynamic sharding would fall back to the current (default) implementation >>> so no behavioral changes than today. Allowing the transform to accept >>> unkeyed input might have to use a naive sharding though, e.g., paring each >>> element with a random key. >>> >>>> >>>> Kenn >>>> >>>> On Mon, Jun 29, 2020 at 9:26 AM Luke Cwik <lc...@google.com> wrote: >>>> >>>>> >>>>> >>>>> On Fri, Jun 26, 2020 at 3:45 PM Tyson Hamilton <tyso...@google.com> >>>>> wrote: >>>>> >>>>>> Nice doc by the way, it's concise. Thanks for sharing and I'm excited >>>>>> to see this feature, particularly the PCollection<T> variant that would >>>>>> have been useful for the Cloud AI transforms recently introduced. >>>>>> >>>>>> On Fri, Jun 26, 2020 at 3:25 PM Tyson Hamilton <tyso...@google.com> >>>>>> wrote: >>>>>> >>>>>>> >>>>>>> >>>>>>> On Fri, Jun 26, 2020 at 12:24 PM Siyuan Chen <syc...@google.com> >>>>>>> wrote: >>>>>>> >>>>>>>> Thanks Luke! >>>>>>>> >>>>>>>> Hi, I'm Siyuan and I'm working on the Google Dataflow team. We are >>>>>>>> faced with similar issues with some sinks commonly used by Dataflow >>>>>>>> such as >>>>>>>> the streaming BigQuery sink. Basically the elements are grouped and >>>>>>>> batched >>>>>>>> so some following operations (potentially expensive) can be performed >>>>>>>> at >>>>>>>> once on a batch of elements. One problem with the grouping is that it >>>>>>>> could >>>>>>>> impose a limit on the parallelism of the DoFn performing those >>>>>>>> operations. >>>>>>>> To mitigate the limited parallelism problem, recently I have been >>>>>>>> looking >>>>>>>> into the idea of improving the `GroupIntoBatches` transform to allow >>>>>>>> the >>>>>>>> grouping to be dynamically sharded and therefore distributed - >>>>>>>> essentially >>>>>>>> a "shardable" stateful DoFn. The transform already does grouping and >>>>>>>> batching on the input KV - grouping on K and batching on V - and it >>>>>>>> could >>>>>>>> be extended to be able to shard the key and do batching within each >>>>>>>> shard >>>>>>>> (meaning that we would have a sharded form of keys somewhere). The >>>>>>>> idea is >>>>>>>> detailed in https://s.apache.org/sharded-group-into-batches >>>>>>>> >>>>>>>> Along with the proposal, there are two points I would like to ask >>>>>>>> for advice: >>>>>>>> - Would there be cases where the sharded keys need to be visible to >>>>>>>> users? One case where that might be needed would be to apply another >>>>>>>> stateful DoFn to the sharded output of the GroupIntoBatches, so the >>>>>>>> semantics of key-to-user state mapping is respected. >>>>>>>> >>>>>>> >>>>>>> Does exposing an API for the sharded keys change the implementation >>>>>>> of the feature? If it is only an API change I think it would be best to >>>>>>> avoid exposing the keys to start with to avoid any unnecessary >>>>>>> dependency >>>>>>> on the implementation. It seems like it could make it more difficult to >>>>>>> modify the sharding implementation in the future unnecessarily at this >>>>>>> point. >>>>>>> >>>>>> >>>>> Exposing the shard id makes it such that the shard id must cross the >>>>> portability APIs during execution. If we don't expose it then it could be >>>>> implemented completely within the runner and all the SDK has to do is say >>>>> that this stateful transform supports sharding or this transform is the >>>>> GroupIntoBatches transform. >>>>> >>>>> >>>>>> - Would there be a need to have a per element shard id or per bundle >>>>>>>> shard id would just be sufficient? The former is more general and we >>>>>>>> could >>>>>>>> still have the same shard id for all elements in a bundle. But the >>>>>>>> conclusion would potentially affect the way of implementation (like >>>>>>>> how the >>>>>>>> sharding information should be passed across FnAPI for example). >>>>>>>> >>>>>>>> >>>>>>> Are you referring to an API for a pipeline author to get the shard >>>>>>> id? I thought that a bundle isn't a pipeline author abstraction but an >>>>>>> implementation detail, I may be wrong in this since I'm not too familiar >>>>>>> with this area of code. In the proposal it looks like the shard id isn't >>>>>>> exposed, I prefer this, as I'm not sure there is any value for the user >>>>>>> in >>>>>>> having a specific 'shard id'. Is there? >>>>>>> >>>>>> >>>>> A bundle is exposed to a pipeline author since they regularly want to >>>>> know the lifetime (e.g. startBundle/finishBundle on DoFn) of a bundle to >>>>> amortize setup/teardown at the bundle level. >>>>> >>>>> Exposing the shard id is the big question and is there a use case for >>>>> it. I hope the community can provide guidance here otherwise I'm with you >>>>> and also agree that exposing it isn't necessary. >>>>> >>>>> >>>>>> >>>>>>> >>>>>>>> I'm very new to Beam so looking forward to hearing the thoughts >>>>>>>> from the community. Any comments will be appreciated :) >>>>>>>> -- >>>>>>>> Best regards, >>>>>>>> Siyuan >>>>>>>> >>>>>>>> >>>>>>>> On Tue, Jun 16, 2020 at 3:04 PM Luke Cwik <lc...@google.com> wrote: >>>>>>>> >>>>>>>>> My first idea was to use a connection pool that is shared across >>>>>>>>> the entire worker across multiple bundles. The connection pool would >>>>>>>>> TTL >>>>>>>>> connections that have been unused. This would help a bunch up until >>>>>>>>> you hit >>>>>>>>> the problem where you don't want every worker connected to every >>>>>>>>> resource >>>>>>>>> because of sharding of the work. In this case we should really be >>>>>>>>> making >>>>>>>>> sure that workers that have processed the same "key" process the same >>>>>>>>> "key" >>>>>>>>> again without limiting the number of workers that can process a >>>>>>>>> specific >>>>>>>>> key. This is very similar to what we do with a stateful DoFn but one >>>>>>>>> where >>>>>>>>> the runner knows that it can "shard" the key. +Siyuan Chen >>>>>>>>> <syc...@google.com> has been investigating something like this >>>>>>>>> for Dataflow to solve scalability issues with the BigQuery sink and >>>>>>>>> has >>>>>>>>> been looking into how a better GroupIntoBatches and/or sharded >>>>>>>>> stateful >>>>>>>>> DoFn could really help in these situations. This applies in general >>>>>>>>> to lots >>>>>>>>> of things where we want to co-locate things with the same key but not >>>>>>>>> limit >>>>>>>>> the parallel processing to only a single worker like stateful DoFn >>>>>>>>> does >>>>>>>>> today. >>>>>>>>> >>>>>>>>> On Tue, Jun 16, 2020 at 2:44 PM Ismaël Mejía <ieme...@gmail.com> >>>>>>>>> wrote: >>>>>>>>> >>>>>>>>>> We have been promoting the use of DoFn to write IO connectors for >>>>>>>>>> many reasons >>>>>>>>>> including better composability. A common pattern that arrives in >>>>>>>>>> such IOs is >>>>>>>>>> that a preceding transform prepares the specification element on >>>>>>>>>> split that a >>>>>>>>>> subsequent DoFn uses to read the data. You can see an example of >>>>>>>>>> this on FileIO >>>>>>>>>> [1] or in RedisIO [2] >>>>>>>>>> >>>>>>>>>> The issue is that if we process that spec in the >>>>>>>>>> `@ProcessElement` method we >>>>>>>>>> lose the DoFn lifecycle because we cannot establish a connection >>>>>>>>>> on `@Setup` and >>>>>>>>>> close it in `@Teardown` because the spec is per element, so we >>>>>>>>>> end up re >>>>>>>>>> creating connections which is a quite costly operation in some >>>>>>>>>> systems like >>>>>>>>>> Cassandra/HBase/etc and that it could end up saturating the data >>>>>>>>>> store because >>>>>>>>>> of the massive creation of connections (something that already >>>>>>>>>> happened in the >>>>>>>>>> past with JdbcIO in the streaming case). >>>>>>>>>> >>>>>>>>>> In the ongoing PR that transforms Cassandra to be DoFn based [3] >>>>>>>>>> this subject >>>>>>>>>> appeared again, and we were discussing how to eventually reuse >>>>>>>>>> connections, >>>>>>>>>> maybe by a pretty naive approach of saving a previous connection >>>>>>>>>> (or set of >>>>>>>>>> identified connections) statically so it can be reused by >>>>>>>>>> multiple DoFns >>>>>>>>>> instances. We already had some issues in the past because of >>>>>>>>>> creating many >>>>>>>>>> connections on other IOs (JdbcIO) with streaming pipelines where >>>>>>>>>> databases were >>>>>>>>>> swamped by massive amounts of connections, so reusing connections >>>>>>>>>> seems to be >>>>>>>>>> something that matters, but at the moment we do not have a clear >>>>>>>>>> way to do this >>>>>>>>>> better. >>>>>>>>>> >>>>>>>>>> Anyone have better ideas or recommendations for this scenario? >>>>>>>>>> Thanks in advance. >>>>>>>>>> >>>>>>>>>> Ismaël >>>>>>>>>> >>>>>>>>>> [1] >>>>>>>>>> https://github.com/apache/beam/blob/14085a5a3c0e146fcc13ca77515bd24abc255eda/sdks/java/io/parquet/src/main/java/org/apache/beam/sdk/io/parquet/ParquetIO.java#L260 >>>>>>>>>> [2] >>>>>>>>>> https://github.com/apache/beam/blob/14085a5a3c0e146fcc13ca77515bd24abc255eda/sdks/java/io/solr/src/main/java/org/apache/beam/sdk/io/solr/SolrIO.java#L471 >>>>>>>>>> [3] https://github.com/apache/beam/pull/10546 >>>>>>>>> >>>>>>>>> >>>>>>>> >>>>>>>