My first idea was to use a connection pool that is shared across the entire worker across multiple bundles. The connection pool would TTL connections that have been unused. This would help a bunch up until you hit the problem where you don't want every worker connected to every resource because of sharding of the work. In this case we should really be making sure that workers that have processed the same "key" process the same "key" again without limiting the number of workers that can process a specific key. This is very similar to what we do with a stateful DoFn but one where the runner knows that it can "shard" the key. +Siyuan Chen <syc...@google.com> has been investigating something like this for Dataflow to solve scalability issues with the BigQuery sink and has been looking into how a better GroupIntoBatches and/or sharded stateful DoFn could really help in these situations. This applies in general to lots of things where we want to co-locate things with the same key but not limit the parallel processing to only a single worker like stateful DoFn does today.
On Tue, Jun 16, 2020 at 2:44 PM Ismaël Mejía <ieme...@gmail.com> wrote: > We have been promoting the use of DoFn to write IO connectors for many > reasons > including better composability. A common pattern that arrives in such IOs > is > that a preceding transform prepares the specification element on split > that a > subsequent DoFn uses to read the data. You can see an example of this on > FileIO > [1] or in RedisIO [2] > > The issue is that if we process that spec in the `@ProcessElement` method > we > lose the DoFn lifecycle because we cannot establish a connection on > `@Setup` and > close it in `@Teardown` because the spec is per element, so we end up re > creating connections which is a quite costly operation in some systems like > Cassandra/HBase/etc and that it could end up saturating the data store > because > of the massive creation of connections (something that already happened in > the > past with JdbcIO in the streaming case). > > In the ongoing PR that transforms Cassandra to be DoFn based [3] this > subject > appeared again, and we were discussing how to eventually reuse connections, > maybe by a pretty naive approach of saving a previous connection (or set of > identified connections) statically so it can be reused by multiple DoFns > instances. We already had some issues in the past because of creating many > connections on other IOs (JdbcIO) with streaming pipelines where databases > were > swamped by massive amounts of connections, so reusing connections seems to > be > something that matters, but at the moment we do not have a clear way to do > this > better. > > Anyone have better ideas or recommendations for this scenario? > Thanks in advance. > > Ismaël > > [1] > https://github.com/apache/beam/blob/14085a5a3c0e146fcc13ca77515bd24abc255eda/sdks/java/io/parquet/src/main/java/org/apache/beam/sdk/io/parquet/ParquetIO.java#L260 > [2] > https://github.com/apache/beam/blob/14085a5a3c0e146fcc13ca77515bd24abc255eda/sdks/java/io/solr/src/main/java/org/apache/beam/sdk/io/solr/SolrIO.java#L471 > [3] https://github.com/apache/beam/pull/10546 >