On Fri, Jun 26, 2020 at 12:24 PM Siyuan Chen <syc...@google.com> wrote:

> Thanks Luke!
>
> Hi, I'm Siyuan and I'm working on the Google Dataflow team. We are faced
> with similar issues with some sinks commonly used by Dataflow such as the
> streaming BigQuery sink. Basically the elements are grouped and batched so
> some following operations (potentially expensive) can be performed at once
> on a batch of elements. One problem with the grouping is that it could
> impose a limit on the parallelism of the DoFn performing those operations.
> To mitigate the limited parallelism problem, recently I have been looking
> into the idea of improving the `GroupIntoBatches` transform to allow the
> grouping to be dynamically sharded and therefore distributed - essentially
> a "shardable" stateful DoFn. The transform already does grouping and
> batching on the input KV - grouping on K and batching on V - and it could
> be extended to be able to shard the key and do batching within each shard
> (meaning that we would have a sharded form of keys somewhere). The idea is
> detailed in https://s.apache.org/sharded-group-into-batches
>
> Along with the proposal, there are two points I would like to ask for
> advice:
> - Would there be cases where the sharded keys need to be visible to users?
> One case where that might be needed would be to apply another stateful DoFn
> to the sharded output of the GroupIntoBatches, so the semantics of
> key-to-user state mapping is respected.
>

Does exposing an API for the sharded keys change the implementation of the
feature? If it is only an API change I think it would be best to avoid
exposing the keys to start with to avoid any unnecessary dependency on the
implementation. It seems like it could make it more difficult to modify the
sharding implementation in the future unnecessarily at this point.


> - Would there be a need to have a per element shard id or per bundle shard
> id would just be sufficient? The former is more general and we could still
> have the same shard id for all elements in a bundle. But the conclusion
> would potentially affect the way of implementation (like how the sharding
> information should be passed across FnAPI for example).
>
>
Are you referring to an API for a pipeline author to get the shard id? I
thought that a bundle isn't a pipeline author abstraction but an
implementation detail, I may be wrong in this since I'm not too familiar
with this area of code. In the proposal it looks like the shard id isn't
exposed, I prefer this, as I'm not sure there is any value for the user in
having a specific 'shard id'. Is there?



> I'm very new to Beam so looking forward to hearing the thoughts from the
> community. Any comments will be appreciated :)
> --
> Best regards,
> Siyuan
>
>
> On Tue, Jun 16, 2020 at 3:04 PM Luke Cwik <lc...@google.com> wrote:
>
>> My first idea was to use a connection pool that is shared across the
>> entire worker across multiple bundles. The connection pool would TTL
>> connections that have been unused. This would help a bunch up until you hit
>> the problem where you don't want every worker connected to every resource
>> because of sharding of the work. In this case we should really be making
>> sure that workers that have processed the same "key" process the same "key"
>> again without limiting the number of workers that can process a specific
>> key. This is very similar to what we do with a stateful DoFn but one where
>> the runner knows that it can "shard" the key. +Siyuan Chen
>> <syc...@google.com> has been investigating something like this for
>> Dataflow to solve scalability issues with the BigQuery sink and has been
>> looking into how a better GroupIntoBatches and/or sharded stateful DoFn
>> could really help in these situations. This applies in general to lots of
>> things where we want to co-locate things with the same key but not limit
>> the parallel processing to only a single worker like stateful DoFn does
>> today.
>>
>> On Tue, Jun 16, 2020 at 2:44 PM Ismaël Mejía <ieme...@gmail.com> wrote:
>>
>>> We have been promoting the use of DoFn to write IO connectors for many
>>> reasons
>>> including better composability. A common pattern that arrives in such
>>> IOs is
>>> that a preceding transform prepares the specification element on split
>>> that a
>>> subsequent DoFn uses to read the data. You can see an example of this on
>>> FileIO
>>> [1] or in RedisIO [2]
>>>
>>> The issue is that if we process that spec in the `@ProcessElement`
>>> method we
>>> lose the DoFn lifecycle because we cannot establish a connection on
>>> `@Setup` and
>>> close it in `@Teardown` because the spec is per element, so we end up re
>>> creating connections which is a quite costly operation in some systems
>>> like
>>> Cassandra/HBase/etc and that it could end up saturating the data store
>>> because
>>> of the massive creation of connections (something that already happened
>>> in the
>>> past with JdbcIO in the streaming case).
>>>
>>> In the ongoing PR that transforms Cassandra to be DoFn based [3] this
>>> subject
>>> appeared again, and we were discussing how to eventually reuse
>>> connections,
>>> maybe by a pretty naive approach of saving a previous connection (or set
>>> of
>>> identified connections) statically so it can be reused by multiple DoFns
>>> instances. We already had some issues in the past because of creating
>>> many
>>> connections on other IOs (JdbcIO) with streaming pipelines where
>>> databases were
>>> swamped by massive amounts of connections, so reusing connections seems
>>> to be
>>> something that matters, but at the moment we do not have a clear way to
>>> do this
>>> better.
>>>
>>> Anyone have better ideas or recommendations for this scenario?
>>> Thanks in advance.
>>>
>>> Ismaël
>>>
>>> [1]
>>> https://github.com/apache/beam/blob/14085a5a3c0e146fcc13ca77515bd24abc255eda/sdks/java/io/parquet/src/main/java/org/apache/beam/sdk/io/parquet/ParquetIO.java#L260
>>> [2]
>>> https://github.com/apache/beam/blob/14085a5a3c0e146fcc13ca77515bd24abc255eda/sdks/java/io/solr/src/main/java/org/apache/beam/sdk/io/solr/SolrIO.java#L471
>>> [3] https://github.com/apache/beam/pull/10546
>>
>>
>

Reply via email to