I'm not sure that runner determined sharding/GroupIntoBatches applies to
splittable DoFns. Splittable DoFns is about taking one element and having a
high fan-out / high cost function broken down to smaller pieces while
runner determined sharding/GroupIntoBatches is about taking a lot of small
elements and doing something with all of them together at once (e.g.
service call).


On Mon, Jun 29, 2020 at 3:00 PM Siyuan Chen <syc...@google.com> wrote:

>
> --
> Best regards,
> Siyuan
>
>
> On Mon, Jun 29, 2020 at 1:06 PM Kenneth Knowles <k...@apache.org> wrote:
>
>> Great doc.
>>
>> The extended API makes sense. I like how it removes a knob. The question
>> that I have to ask is whether there is a core model change here or can we
>> avoid it. Defining "shard" as a scope of state within which execution is
>> observably serial, today the model has key+window sharding always. It seems
>> like two options mentioned are:
>>
>>  - new primitive way for runner to inject shard keys, so standard
>> stateful ParDo works
>>  - new primitive stateful ParDo with runner-defined scope of state that
>> is not observable to users
>>
>> Seems on the doc that the latter is favored. I like it better, too. I do
>> not know how dynamic resharding would work for either of these. With
>> dynamic resharding the latter seems related to SDF.
>>
> The runner-side implementation can be the same for both cases. One way
> would be to generate a shard number for a given input key when the data is
> emitted by an upstream transform and passed back to the runner, and then
> the runner can distribute the data to the corresponding shard for
> processing GroupIntoBatches. It's dynamic in that the data with the same
> key emitted at different times can be tagged with different shard numbers.
> SDF is mainly for sources AFAIU so is different (not familiar with that so
> correct me if I am wrong :p).
>
>>
>> Suppose we say that the semantics of the URN+payload "GroupIntoBatch {
>> maxSize = <n> }" allows any segmentation of the PCollection into batches of
>> size <= n. The runner can replace it with the proposed implementation. But
>> what does the default implementation look like? If the default
>> implementation is not good (like, say, "reshuffle") then maybe figuring out
>> the primitive and adding it is better than leaving a transform that is sort
>> of a hack when not replaced.
>>
>
> It's a good point that we should think more carefully when extending the
> API. The default implementation of `ofSize()` or other similar limits like
> `ofBytes()` would behave like a normal stateful DoFn. (`ofMaxSize` sounds
> similar to `ofSize()` since ofSize() guarantees emitting a fixed number of
> elements whenever possible and emitting a partial result otherwise.)
> Enabling runner determined sharding on a runner that does not support
> dynamic sharding would fall back to the current (default) implementation
> so no behavioral changes than today. Allowing the transform to accept
> unkeyed input might have to use a naive sharding though, e.g., paring each
> element with a random key.
>
>>
>> Kenn
>>
>> On Mon, Jun 29, 2020 at 9:26 AM Luke Cwik <lc...@google.com> wrote:
>>
>>>
>>>
>>> On Fri, Jun 26, 2020 at 3:45 PM Tyson Hamilton <tyso...@google.com>
>>> wrote:
>>>
>>>> Nice doc by the way, it's concise. Thanks for sharing and I'm excited
>>>> to see this feature, particularly the PCollection<T> variant that would
>>>> have been useful for the Cloud AI transforms recently introduced.
>>>>
>>>> On Fri, Jun 26, 2020 at 3:25 PM Tyson Hamilton <tyso...@google.com>
>>>> wrote:
>>>>
>>>>>
>>>>>
>>>>> On Fri, Jun 26, 2020 at 12:24 PM Siyuan Chen <syc...@google.com>
>>>>> wrote:
>>>>>
>>>>>> Thanks Luke!
>>>>>>
>>>>>> Hi, I'm Siyuan and I'm working on the Google Dataflow team. We are
>>>>>> faced with similar issues with some sinks commonly used by Dataflow such 
>>>>>> as
>>>>>> the streaming BigQuery sink. Basically the elements are grouped and 
>>>>>> batched
>>>>>> so some following operations (potentially expensive) can be performed at
>>>>>> once on a batch of elements. One problem with the grouping is that it 
>>>>>> could
>>>>>> impose a limit on the parallelism of the DoFn performing those 
>>>>>> operations.
>>>>>> To mitigate the limited parallelism problem, recently I have been looking
>>>>>> into the idea of improving the `GroupIntoBatches` transform to allow the
>>>>>> grouping to be dynamically sharded and therefore distributed - 
>>>>>> essentially
>>>>>> a "shardable" stateful DoFn. The transform already does grouping and
>>>>>> batching on the input KV - grouping on K and batching on V - and it could
>>>>>> be extended to be able to shard the key and do batching within each shard
>>>>>> (meaning that we would have a sharded form of keys somewhere). The idea 
>>>>>> is
>>>>>> detailed in https://s.apache.org/sharded-group-into-batches
>>>>>>
>>>>>> Along with the proposal, there are two points I would like to ask for
>>>>>> advice:
>>>>>> - Would there be cases where the sharded keys need to be visible to
>>>>>> users? One case where that might be needed would be to apply another
>>>>>> stateful DoFn to the sharded output of the GroupIntoBatches, so the
>>>>>> semantics of key-to-user state mapping is respected.
>>>>>>
>>>>>
>>>>> Does exposing an API for the sharded keys change the implementation of
>>>>> the feature? If it is only an API change I think it would be best to avoid
>>>>> exposing the keys to start with to avoid any unnecessary dependency on the
>>>>> implementation. It seems like it could make it more difficult to modify 
>>>>> the
>>>>> sharding implementation in the future unnecessarily at this point.
>>>>>
>>>>
>>> Exposing the shard id makes it such that the shard id must cross the
>>> portability APIs during execution. If we don't expose it then it could be
>>> implemented completely within the runner and all the SDK has to do is say
>>> that this stateful transform supports sharding or this transform is the
>>> GroupIntoBatches transform.
>>>
>>>
>>>> - Would there be a need to have a per element shard id or per bundle
>>>>>> shard id would just be sufficient? The former is more general and we 
>>>>>> could
>>>>>> still have the same shard id for all elements in a bundle. But the
>>>>>> conclusion would potentially affect the way of implementation (like how 
>>>>>> the
>>>>>> sharding information should be passed across FnAPI for example).
>>>>>>
>>>>>>
>>>>> Are you referring to an API for a pipeline author to get the shard id?
>>>>> I thought that a bundle isn't a pipeline author abstraction but an
>>>>> implementation detail, I may be wrong in this since I'm not too familiar
>>>>> with this area of code. In the proposal it looks like the shard id isn't
>>>>> exposed, I prefer this, as I'm not sure there is any value for the user in
>>>>> having a specific 'shard id'. Is there?
>>>>>
>>>>
>>> A bundle is exposed to a pipeline author since they regularly want to
>>> know the lifetime (e.g. startBundle/finishBundle on DoFn) of a bundle to
>>> amortize  setup/teardown at the bundle level.
>>>
>>> Exposing the shard id is the big question and is there a use case for
>>> it. I hope the community can provide guidance here otherwise I'm with you
>>> and also agree that exposing it isn't necessary.
>>>
>>>
>>>>
>>>>>
>>>>>> I'm very new to Beam so looking forward to hearing the thoughts from
>>>>>> the community. Any comments will be appreciated :)
>>>>>> --
>>>>>> Best regards,
>>>>>> Siyuan
>>>>>>
>>>>>>
>>>>>> On Tue, Jun 16, 2020 at 3:04 PM Luke Cwik <lc...@google.com> wrote:
>>>>>>
>>>>>>> My first idea was to use a connection pool that is shared across the
>>>>>>> entire worker across multiple bundles. The connection pool would TTL
>>>>>>> connections that have been unused. This would help a bunch up until you 
>>>>>>> hit
>>>>>>> the problem where you don't want every worker connected to every 
>>>>>>> resource
>>>>>>> because of sharding of the work. In this case we should really be making
>>>>>>> sure that workers that have processed the same "key" process the same 
>>>>>>> "key"
>>>>>>> again without limiting the number of workers that can process a specific
>>>>>>> key. This is very similar to what we do with a stateful DoFn but one 
>>>>>>> where
>>>>>>> the runner knows that it can "shard" the key. +Siyuan Chen
>>>>>>> <syc...@google.com> has been investigating something like this for
>>>>>>> Dataflow to solve scalability issues with the BigQuery sink and has been
>>>>>>> looking into how a better GroupIntoBatches and/or sharded stateful DoFn
>>>>>>> could really help in these situations. This applies in general to lots 
>>>>>>> of
>>>>>>> things where we want to co-locate things with the same key but not limit
>>>>>>> the parallel processing to only a single worker like stateful DoFn does
>>>>>>> today.
>>>>>>>
>>>>>>> On Tue, Jun 16, 2020 at 2:44 PM Ismaël Mejía <ieme...@gmail.com>
>>>>>>> wrote:
>>>>>>>
>>>>>>>> We have been promoting the use of DoFn to write IO connectors for
>>>>>>>> many reasons
>>>>>>>> including better composability. A common pattern that arrives in
>>>>>>>> such IOs is
>>>>>>>> that a preceding transform prepares the specification element on
>>>>>>>> split that a
>>>>>>>> subsequent DoFn uses to read the data. You can see an example of
>>>>>>>> this on FileIO
>>>>>>>> [1] or in RedisIO [2]
>>>>>>>>
>>>>>>>> The issue is that if we process that spec in the `@ProcessElement`
>>>>>>>> method we
>>>>>>>> lose the DoFn lifecycle because we cannot establish a connection on
>>>>>>>> `@Setup` and
>>>>>>>> close it in `@Teardown` because the spec is per element, so we end
>>>>>>>> up re
>>>>>>>> creating connections which is a quite costly operation in some
>>>>>>>> systems like
>>>>>>>> Cassandra/HBase/etc and that it could end up saturating the data
>>>>>>>> store because
>>>>>>>> of the massive creation of connections (something that already
>>>>>>>> happened in the
>>>>>>>> past with JdbcIO in the streaming case).
>>>>>>>>
>>>>>>>> In the ongoing PR that transforms Cassandra to be DoFn based [3]
>>>>>>>> this subject
>>>>>>>> appeared again, and we were discussing how to eventually reuse
>>>>>>>> connections,
>>>>>>>> maybe by a pretty naive approach of saving a previous connection
>>>>>>>> (or set of
>>>>>>>> identified connections) statically so it can be reused by multiple
>>>>>>>> DoFns
>>>>>>>> instances. We already had some issues in the past because of
>>>>>>>> creating many
>>>>>>>> connections on other IOs (JdbcIO) with streaming pipelines where
>>>>>>>> databases were
>>>>>>>> swamped by massive amounts of connections, so reusing connections
>>>>>>>> seems to be
>>>>>>>> something that matters, but at the moment we do not have a clear
>>>>>>>> way to do this
>>>>>>>> better.
>>>>>>>>
>>>>>>>> Anyone have better ideas or recommendations for this scenario?
>>>>>>>> Thanks in advance.
>>>>>>>>
>>>>>>>> Ismaël
>>>>>>>>
>>>>>>>> [1]
>>>>>>>> https://github.com/apache/beam/blob/14085a5a3c0e146fcc13ca77515bd24abc255eda/sdks/java/io/parquet/src/main/java/org/apache/beam/sdk/io/parquet/ParquetIO.java#L260
>>>>>>>> [2]
>>>>>>>> https://github.com/apache/beam/blob/14085a5a3c0e146fcc13ca77515bd24abc255eda/sdks/java/io/solr/src/main/java/org/apache/beam/sdk/io/solr/SolrIO.java#L471
>>>>>>>> [3] https://github.com/apache/beam/pull/10546
>>>>>>>
>>>>>>>
>>>>>>
>>>>>

Reply via email to