Thanks Luke! Hi, I'm Siyuan and I'm working on the Google Dataflow team. We are faced with similar issues with some sinks commonly used by Dataflow such as the streaming BigQuery sink. Basically the elements are grouped and batched so some following operations (potentially expensive) can be performed at once on a batch of elements. One problem with the grouping is that it could impose a limit on the parallelism of the DoFn performing those operations. To mitigate the limited parallelism problem, recently I have been looking into the idea of improving the `GroupIntoBatches` transform to allow the grouping to be dynamically sharded and therefore distributed - essentially a "shardable" stateful DoFn. The transform already does grouping and batching on the input KV - grouping on K and batching on V - and it could be extended to be able to shard the key and do batching within each shard (meaning that we would have a sharded form of keys somewhere). The idea is detailed in https://s.apache.org/sharded-group-into-batches
Along with the proposal, there are two points I would like to ask for advice: - Would there be cases where the sharded keys need to be visible to users? One case where that might be needed would be to apply another stateful DoFn to the sharded output of the GroupIntoBatches, so the semantics of key-to-user state mapping is respected. - Would there be a need to have a per element shard id or per bundle shard id would just be sufficient? The former is more general and we could still have the same shard id for all elements in a bundle. But the conclusion would potentially affect the way of implementation (like how the sharding information should be passed across FnAPI for example). I'm very new to Beam so looking forward to hearing the thoughts from the community. Any comments will be appreciated :) -- Best regards, Siyuan On Tue, Jun 16, 2020 at 3:04 PM Luke Cwik <[email protected]> wrote: > My first idea was to use a connection pool that is shared across the > entire worker across multiple bundles. The connection pool would TTL > connections that have been unused. This would help a bunch up until you hit > the problem where you don't want every worker connected to every resource > because of sharding of the work. In this case we should really be making > sure that workers that have processed the same "key" process the same "key" > again without limiting the number of workers that can process a specific > key. This is very similar to what we do with a stateful DoFn but one where > the runner knows that it can "shard" the key. +Siyuan Chen > <[email protected]> has been investigating something like this for > Dataflow to solve scalability issues with the BigQuery sink and has been > looking into how a better GroupIntoBatches and/or sharded stateful DoFn > could really help in these situations. This applies in general to lots of > things where we want to co-locate things with the same key but not limit > the parallel processing to only a single worker like stateful DoFn does > today. > > On Tue, Jun 16, 2020 at 2:44 PM Ismaël Mejía <[email protected]> wrote: > >> We have been promoting the use of DoFn to write IO connectors for many >> reasons >> including better composability. A common pattern that arrives in such IOs >> is >> that a preceding transform prepares the specification element on split >> that a >> subsequent DoFn uses to read the data. You can see an example of this on >> FileIO >> [1] or in RedisIO [2] >> >> The issue is that if we process that spec in the `@ProcessElement` method >> we >> lose the DoFn lifecycle because we cannot establish a connection on >> `@Setup` and >> close it in `@Teardown` because the spec is per element, so we end up re >> creating connections which is a quite costly operation in some systems >> like >> Cassandra/HBase/etc and that it could end up saturating the data store >> because >> of the massive creation of connections (something that already happened >> in the >> past with JdbcIO in the streaming case). >> >> In the ongoing PR that transforms Cassandra to be DoFn based [3] this >> subject >> appeared again, and we were discussing how to eventually reuse >> connections, >> maybe by a pretty naive approach of saving a previous connection (or set >> of >> identified connections) statically so it can be reused by multiple DoFns >> instances. We already had some issues in the past because of creating many >> connections on other IOs (JdbcIO) with streaming pipelines where >> databases were >> swamped by massive amounts of connections, so reusing connections seems >> to be >> something that matters, but at the moment we do not have a clear way to >> do this >> better. >> >> Anyone have better ideas or recommendations for this scenario? >> Thanks in advance. >> >> Ismaël >> >> [1] >> https://github.com/apache/beam/blob/14085a5a3c0e146fcc13ca77515bd24abc255eda/sdks/java/io/parquet/src/main/java/org/apache/beam/sdk/io/parquet/ParquetIO.java#L260 >> [2] >> https://github.com/apache/beam/blob/14085a5a3c0e146fcc13ca77515bd24abc255eda/sdks/java/io/solr/src/main/java/org/apache/beam/sdk/io/solr/SolrIO.java#L471 >> [3] https://github.com/apache/beam/pull/10546 > >
