Nice doc by the way, it's concise. Thanks for sharing and I'm excited to see this feature, particularly the PCollection<T> variant that would have been useful for the Cloud AI transforms recently introduced.
On Fri, Jun 26, 2020 at 3:25 PM Tyson Hamilton <tyso...@google.com> wrote: > > > On Fri, Jun 26, 2020 at 12:24 PM Siyuan Chen <syc...@google.com> wrote: > >> Thanks Luke! >> >> Hi, I'm Siyuan and I'm working on the Google Dataflow team. We are faced >> with similar issues with some sinks commonly used by Dataflow such as the >> streaming BigQuery sink. Basically the elements are grouped and batched so >> some following operations (potentially expensive) can be performed at once >> on a batch of elements. One problem with the grouping is that it could >> impose a limit on the parallelism of the DoFn performing those operations. >> To mitigate the limited parallelism problem, recently I have been looking >> into the idea of improving the `GroupIntoBatches` transform to allow the >> grouping to be dynamically sharded and therefore distributed - essentially >> a "shardable" stateful DoFn. The transform already does grouping and >> batching on the input KV - grouping on K and batching on V - and it could >> be extended to be able to shard the key and do batching within each shard >> (meaning that we would have a sharded form of keys somewhere). The idea is >> detailed in https://s.apache.org/sharded-group-into-batches >> >> Along with the proposal, there are two points I would like to ask for >> advice: >> - Would there be cases where the sharded keys need to be visible to >> users? One case where that might be needed would be to apply another >> stateful DoFn to the sharded output of the GroupIntoBatches, so the >> semantics of key-to-user state mapping is respected. >> > > Does exposing an API for the sharded keys change the implementation of the > feature? If it is only an API change I think it would be best to avoid > exposing the keys to start with to avoid any unnecessary dependency on the > implementation. It seems like it could make it more difficult to modify the > sharding implementation in the future unnecessarily at this point. > > >> - Would there be a need to have a per element shard id or per bundle >> shard id would just be sufficient? The former is more general and we could >> still have the same shard id for all elements in a bundle. But the >> conclusion would potentially affect the way of implementation (like how the >> sharding information should be passed across FnAPI for example). >> >> > Are you referring to an API for a pipeline author to get the shard id? I > thought that a bundle isn't a pipeline author abstraction but an > implementation detail, I may be wrong in this since I'm not too familiar > with this area of code. In the proposal it looks like the shard id isn't > exposed, I prefer this, as I'm not sure there is any value for the user in > having a specific 'shard id'. Is there? > > > >> I'm very new to Beam so looking forward to hearing the thoughts from the >> community. Any comments will be appreciated :) >> -- >> Best regards, >> Siyuan >> >> >> On Tue, Jun 16, 2020 at 3:04 PM Luke Cwik <lc...@google.com> wrote: >> >>> My first idea was to use a connection pool that is shared across the >>> entire worker across multiple bundles. The connection pool would TTL >>> connections that have been unused. This would help a bunch up until you hit >>> the problem where you don't want every worker connected to every resource >>> because of sharding of the work. In this case we should really be making >>> sure that workers that have processed the same "key" process the same "key" >>> again without limiting the number of workers that can process a specific >>> key. This is very similar to what we do with a stateful DoFn but one where >>> the runner knows that it can "shard" the key. +Siyuan Chen >>> <syc...@google.com> has been investigating something like this for >>> Dataflow to solve scalability issues with the BigQuery sink and has been >>> looking into how a better GroupIntoBatches and/or sharded stateful DoFn >>> could really help in these situations. This applies in general to lots of >>> things where we want to co-locate things with the same key but not limit >>> the parallel processing to only a single worker like stateful DoFn does >>> today. >>> >>> On Tue, Jun 16, 2020 at 2:44 PM Ismaël Mejía <ieme...@gmail.com> wrote: >>> >>>> We have been promoting the use of DoFn to write IO connectors for many >>>> reasons >>>> including better composability. A common pattern that arrives in such >>>> IOs is >>>> that a preceding transform prepares the specification element on split >>>> that a >>>> subsequent DoFn uses to read the data. You can see an example of this >>>> on FileIO >>>> [1] or in RedisIO [2] >>>> >>>> The issue is that if we process that spec in the `@ProcessElement` >>>> method we >>>> lose the DoFn lifecycle because we cannot establish a connection on >>>> `@Setup` and >>>> close it in `@Teardown` because the spec is per element, so we end up re >>>> creating connections which is a quite costly operation in some systems >>>> like >>>> Cassandra/HBase/etc and that it could end up saturating the data store >>>> because >>>> of the massive creation of connections (something that already happened >>>> in the >>>> past with JdbcIO in the streaming case). >>>> >>>> In the ongoing PR that transforms Cassandra to be DoFn based [3] this >>>> subject >>>> appeared again, and we were discussing how to eventually reuse >>>> connections, >>>> maybe by a pretty naive approach of saving a previous connection (or >>>> set of >>>> identified connections) statically so it can be reused by multiple DoFns >>>> instances. We already had some issues in the past because of creating >>>> many >>>> connections on other IOs (JdbcIO) with streaming pipelines where >>>> databases were >>>> swamped by massive amounts of connections, so reusing connections seems >>>> to be >>>> something that matters, but at the moment we do not have a clear way to >>>> do this >>>> better. >>>> >>>> Anyone have better ideas or recommendations for this scenario? >>>> Thanks in advance. >>>> >>>> Ismaël >>>> >>>> [1] >>>> https://github.com/apache/beam/blob/14085a5a3c0e146fcc13ca77515bd24abc255eda/sdks/java/io/parquet/src/main/java/org/apache/beam/sdk/io/parquet/ParquetIO.java#L260 >>>> [2] >>>> https://github.com/apache/beam/blob/14085a5a3c0e146fcc13ca77515bd24abc255eda/sdks/java/io/solr/src/main/java/org/apache/beam/sdk/io/solr/SolrIO.java#L471 >>>> [3] https://github.com/apache/beam/pull/10546 >>> >>> >> >