Nice doc by the way, it's concise. Thanks for sharing and I'm excited to
see this feature, particularly the PCollection<T> variant that would have
been useful for the Cloud AI transforms recently introduced.

On Fri, Jun 26, 2020 at 3:25 PM Tyson Hamilton <tyso...@google.com> wrote:

>
>
> On Fri, Jun 26, 2020 at 12:24 PM Siyuan Chen <syc...@google.com> wrote:
>
>> Thanks Luke!
>>
>> Hi, I'm Siyuan and I'm working on the Google Dataflow team. We are faced
>> with similar issues with some sinks commonly used by Dataflow such as the
>> streaming BigQuery sink. Basically the elements are grouped and batched so
>> some following operations (potentially expensive) can be performed at once
>> on a batch of elements. One problem with the grouping is that it could
>> impose a limit on the parallelism of the DoFn performing those operations.
>> To mitigate the limited parallelism problem, recently I have been looking
>> into the idea of improving the `GroupIntoBatches` transform to allow the
>> grouping to be dynamically sharded and therefore distributed - essentially
>> a "shardable" stateful DoFn. The transform already does grouping and
>> batching on the input KV - grouping on K and batching on V - and it could
>> be extended to be able to shard the key and do batching within each shard
>> (meaning that we would have a sharded form of keys somewhere). The idea is
>> detailed in https://s.apache.org/sharded-group-into-batches
>>
>> Along with the proposal, there are two points I would like to ask for
>> advice:
>> - Would there be cases where the sharded keys need to be visible to
>> users? One case where that might be needed would be to apply another
>> stateful DoFn to the sharded output of the GroupIntoBatches, so the
>> semantics of key-to-user state mapping is respected.
>>
>
> Does exposing an API for the sharded keys change the implementation of the
> feature? If it is only an API change I think it would be best to avoid
> exposing the keys to start with to avoid any unnecessary dependency on the
> implementation. It seems like it could make it more difficult to modify the
> sharding implementation in the future unnecessarily at this point.
>
>
>> - Would there be a need to have a per element shard id or per bundle
>> shard id would just be sufficient? The former is more general and we could
>> still have the same shard id for all elements in a bundle. But the
>> conclusion would potentially affect the way of implementation (like how the
>> sharding information should be passed across FnAPI for example).
>>
>>
> Are you referring to an API for a pipeline author to get the shard id? I
> thought that a bundle isn't a pipeline author abstraction but an
> implementation detail, I may be wrong in this since I'm not too familiar
> with this area of code. In the proposal it looks like the shard id isn't
> exposed, I prefer this, as I'm not sure there is any value for the user in
> having a specific 'shard id'. Is there?
>
>
>
>> I'm very new to Beam so looking forward to hearing the thoughts from the
>> community. Any comments will be appreciated :)
>> --
>> Best regards,
>> Siyuan
>>
>>
>> On Tue, Jun 16, 2020 at 3:04 PM Luke Cwik <lc...@google.com> wrote:
>>
>>> My first idea was to use a connection pool that is shared across the
>>> entire worker across multiple bundles. The connection pool would TTL
>>> connections that have been unused. This would help a bunch up until you hit
>>> the problem where you don't want every worker connected to every resource
>>> because of sharding of the work. In this case we should really be making
>>> sure that workers that have processed the same "key" process the same "key"
>>> again without limiting the number of workers that can process a specific
>>> key. This is very similar to what we do with a stateful DoFn but one where
>>> the runner knows that it can "shard" the key. +Siyuan Chen
>>> <syc...@google.com> has been investigating something like this for
>>> Dataflow to solve scalability issues with the BigQuery sink and has been
>>> looking into how a better GroupIntoBatches and/or sharded stateful DoFn
>>> could really help in these situations. This applies in general to lots of
>>> things where we want to co-locate things with the same key but not limit
>>> the parallel processing to only a single worker like stateful DoFn does
>>> today.
>>>
>>> On Tue, Jun 16, 2020 at 2:44 PM Ismaël Mejía <ieme...@gmail.com> wrote:
>>>
>>>> We have been promoting the use of DoFn to write IO connectors for many
>>>> reasons
>>>> including better composability. A common pattern that arrives in such
>>>> IOs is
>>>> that a preceding transform prepares the specification element on split
>>>> that a
>>>> subsequent DoFn uses to read the data. You can see an example of this
>>>> on FileIO
>>>> [1] or in RedisIO [2]
>>>>
>>>> The issue is that if we process that spec in the `@ProcessElement`
>>>> method we
>>>> lose the DoFn lifecycle because we cannot establish a connection on
>>>> `@Setup` and
>>>> close it in `@Teardown` because the spec is per element, so we end up re
>>>> creating connections which is a quite costly operation in some systems
>>>> like
>>>> Cassandra/HBase/etc and that it could end up saturating the data store
>>>> because
>>>> of the massive creation of connections (something that already happened
>>>> in the
>>>> past with JdbcIO in the streaming case).
>>>>
>>>> In the ongoing PR that transforms Cassandra to be DoFn based [3] this
>>>> subject
>>>> appeared again, and we were discussing how to eventually reuse
>>>> connections,
>>>> maybe by a pretty naive approach of saving a previous connection (or
>>>> set of
>>>> identified connections) statically so it can be reused by multiple DoFns
>>>> instances. We already had some issues in the past because of creating
>>>> many
>>>> connections on other IOs (JdbcIO) with streaming pipelines where
>>>> databases were
>>>> swamped by massive amounts of connections, so reusing connections seems
>>>> to be
>>>> something that matters, but at the moment we do not have a clear way to
>>>> do this
>>>> better.
>>>>
>>>> Anyone have better ideas or recommendations for this scenario?
>>>> Thanks in advance.
>>>>
>>>> Ismaël
>>>>
>>>> [1]
>>>> https://github.com/apache/beam/blob/14085a5a3c0e146fcc13ca77515bd24abc255eda/sdks/java/io/parquet/src/main/java/org/apache/beam/sdk/io/parquet/ParquetIO.java#L260
>>>> [2]
>>>> https://github.com/apache/beam/blob/14085a5a3c0e146fcc13ca77515bd24abc255eda/sdks/java/io/solr/src/main/java/org/apache/beam/sdk/io/solr/SolrIO.java#L471
>>>> [3] https://github.com/apache/beam/pull/10546
>>>
>>>
>>
>

Reply via email to