I agree at the level of GroupIntoBatches. The part that is similar is if
you implement GroupIntoBatches with a new primitive supporting
runner-scoped state where the state is per-shard and the runner is able to
split state shards.

Kenn

On Tue, Jun 30, 2020 at 9:22 AM Luke Cwik <lc...@google.com> wrote:

> I'm not sure that runner determined sharding/GroupIntoBatches applies to
> splittable DoFns. Splittable DoFns is about taking one element and having a
> high fan-out / high cost function broken down to smaller pieces while
> runner determined sharding/GroupIntoBatches is about taking a lot of small
> elements and doing something with all of them together at once (e.g.
> service call).
>
>
> On Mon, Jun 29, 2020 at 3:00 PM Siyuan Chen <syc...@google.com> wrote:
>
>>
>> --
>> Best regards,
>> Siyuan
>>
>>
>> On Mon, Jun 29, 2020 at 1:06 PM Kenneth Knowles <k...@apache.org> wrote:
>>
>>> Great doc.
>>>
>>> The extended API makes sense. I like how it removes a knob. The question
>>> that I have to ask is whether there is a core model change here or can we
>>> avoid it. Defining "shard" as a scope of state within which execution is
>>> observably serial, today the model has key+window sharding always. It seems
>>> like two options mentioned are:
>>>
>>>  - new primitive way for runner to inject shard keys, so standard
>>> stateful ParDo works
>>>  - new primitive stateful ParDo with runner-defined scope of state that
>>> is not observable to users
>>>
>>> Seems on the doc that the latter is favored. I like it better, too. I do
>>> not know how dynamic resharding would work for either of these. With
>>> dynamic resharding the latter seems related to SDF.
>>>
>> The runner-side implementation can be the same for both cases. One way
>> would be to generate a shard number for a given input key when the data is
>> emitted by an upstream transform and passed back to the runner, and then
>> the runner can distribute the data to the corresponding shard for
>> processing GroupIntoBatches. It's dynamic in that the data with the same
>> key emitted at different times can be tagged with different shard numbers.
>> SDF is mainly for sources AFAIU so is different (not familiar with that so
>> correct me if I am wrong :p).
>>
>>>
>>> Suppose we say that the semantics of the URN+payload "GroupIntoBatch {
>>> maxSize = <n> }" allows any segmentation of the PCollection into batches of
>>> size <= n. The runner can replace it with the proposed implementation. But
>>> what does the default implementation look like? If the default
>>> implementation is not good (like, say, "reshuffle") then maybe figuring out
>>> the primitive and adding it is better than leaving a transform that is sort
>>> of a hack when not replaced.
>>>
>>
>> It's a good point that we should think more carefully when extending the
>> API. The default implementation of `ofSize()` or other similar limits like
>> `ofBytes()` would behave like a normal stateful DoFn. (`ofMaxSize` sounds
>> similar to `ofSize()` since ofSize() guarantees emitting a fixed number of
>> elements whenever possible and emitting a partial result otherwise.)
>> Enabling runner determined sharding on a runner that does not support
>> dynamic sharding would fall back to the current (default) implementation
>> so no behavioral changes than today. Allowing the transform to accept
>> unkeyed input might have to use a naive sharding though, e.g., paring each
>> element with a random key.
>>
>>>
>>> Kenn
>>>
>>> On Mon, Jun 29, 2020 at 9:26 AM Luke Cwik <lc...@google.com> wrote:
>>>
>>>>
>>>>
>>>> On Fri, Jun 26, 2020 at 3:45 PM Tyson Hamilton <tyso...@google.com>
>>>> wrote:
>>>>
>>>>> Nice doc by the way, it's concise. Thanks for sharing and I'm excited
>>>>> to see this feature, particularly the PCollection<T> variant that would
>>>>> have been useful for the Cloud AI transforms recently introduced.
>>>>>
>>>>> On Fri, Jun 26, 2020 at 3:25 PM Tyson Hamilton <tyso...@google.com>
>>>>> wrote:
>>>>>
>>>>>>
>>>>>>
>>>>>> On Fri, Jun 26, 2020 at 12:24 PM Siyuan Chen <syc...@google.com>
>>>>>> wrote:
>>>>>>
>>>>>>> Thanks Luke!
>>>>>>>
>>>>>>> Hi, I'm Siyuan and I'm working on the Google Dataflow team. We are
>>>>>>> faced with similar issues with some sinks commonly used by Dataflow 
>>>>>>> such as
>>>>>>> the streaming BigQuery sink. Basically the elements are grouped and 
>>>>>>> batched
>>>>>>> so some following operations (potentially expensive) can be performed at
>>>>>>> once on a batch of elements. One problem with the grouping is that it 
>>>>>>> could
>>>>>>> impose a limit on the parallelism of the DoFn performing those 
>>>>>>> operations.
>>>>>>> To mitigate the limited parallelism problem, recently I have been 
>>>>>>> looking
>>>>>>> into the idea of improving the `GroupIntoBatches` transform to allow the
>>>>>>> grouping to be dynamically sharded and therefore distributed - 
>>>>>>> essentially
>>>>>>> a "shardable" stateful DoFn. The transform already does grouping and
>>>>>>> batching on the input KV - grouping on K and batching on V - and it 
>>>>>>> could
>>>>>>> be extended to be able to shard the key and do batching within each 
>>>>>>> shard
>>>>>>> (meaning that we would have a sharded form of keys somewhere). The idea 
>>>>>>> is
>>>>>>> detailed in https://s.apache.org/sharded-group-into-batches
>>>>>>>
>>>>>>> Along with the proposal, there are two points I would like to ask
>>>>>>> for advice:
>>>>>>> - Would there be cases where the sharded keys need to be visible to
>>>>>>> users? One case where that might be needed would be to apply another
>>>>>>> stateful DoFn to the sharded output of the GroupIntoBatches, so the
>>>>>>> semantics of key-to-user state mapping is respected.
>>>>>>>
>>>>>>
>>>>>> Does exposing an API for the sharded keys change the implementation
>>>>>> of the feature? If it is only an API change I think it would be best to
>>>>>> avoid exposing the keys to start with to avoid any unnecessary dependency
>>>>>> on the implementation. It seems like it could make it more difficult to
>>>>>> modify the sharding implementation in the future unnecessarily at this
>>>>>> point.
>>>>>>
>>>>>
>>>> Exposing the shard id makes it such that the shard id must cross the
>>>> portability APIs during execution. If we don't expose it then it could be
>>>> implemented completely within the runner and all the SDK has to do is say
>>>> that this stateful transform supports sharding or this transform is the
>>>> GroupIntoBatches transform.
>>>>
>>>>
>>>>> - Would there be a need to have a per element shard id or per bundle
>>>>>>> shard id would just be sufficient? The former is more general and we 
>>>>>>> could
>>>>>>> still have the same shard id for all elements in a bundle. But the
>>>>>>> conclusion would potentially affect the way of implementation (like how 
>>>>>>> the
>>>>>>> sharding information should be passed across FnAPI for example).
>>>>>>>
>>>>>>>
>>>>>> Are you referring to an API for a pipeline author to get the shard
>>>>>> id? I thought that a bundle isn't a pipeline author abstraction but an
>>>>>> implementation detail, I may be wrong in this since I'm not too familiar
>>>>>> with this area of code. In the proposal it looks like the shard id isn't
>>>>>> exposed, I prefer this, as I'm not sure there is any value for the user 
>>>>>> in
>>>>>> having a specific 'shard id'. Is there?
>>>>>>
>>>>>
>>>> A bundle is exposed to a pipeline author since they regularly want to
>>>> know the lifetime (e.g. startBundle/finishBundle on DoFn) of a bundle to
>>>> amortize  setup/teardown at the bundle level.
>>>>
>>>> Exposing the shard id is the big question and is there a use case for
>>>> it. I hope the community can provide guidance here otherwise I'm with you
>>>> and also agree that exposing it isn't necessary.
>>>>
>>>>
>>>>>
>>>>>>
>>>>>>> I'm very new to Beam so looking forward to hearing the thoughts from
>>>>>>> the community. Any comments will be appreciated :)
>>>>>>> --
>>>>>>> Best regards,
>>>>>>> Siyuan
>>>>>>>
>>>>>>>
>>>>>>> On Tue, Jun 16, 2020 at 3:04 PM Luke Cwik <lc...@google.com> wrote:
>>>>>>>
>>>>>>>> My first idea was to use a connection pool that is shared across
>>>>>>>> the entire worker across multiple bundles. The connection pool would 
>>>>>>>> TTL
>>>>>>>> connections that have been unused. This would help a bunch up until 
>>>>>>>> you hit
>>>>>>>> the problem where you don't want every worker connected to every 
>>>>>>>> resource
>>>>>>>> because of sharding of the work. In this case we should really be 
>>>>>>>> making
>>>>>>>> sure that workers that have processed the same "key" process the same 
>>>>>>>> "key"
>>>>>>>> again without limiting the number of workers that can process a 
>>>>>>>> specific
>>>>>>>> key. This is very similar to what we do with a stateful DoFn but one 
>>>>>>>> where
>>>>>>>> the runner knows that it can "shard" the key. +Siyuan Chen
>>>>>>>> <syc...@google.com> has been investigating something like this for
>>>>>>>> Dataflow to solve scalability issues with the BigQuery sink and has 
>>>>>>>> been
>>>>>>>> looking into how a better GroupIntoBatches and/or sharded stateful DoFn
>>>>>>>> could really help in these situations. This applies in general to lots 
>>>>>>>> of
>>>>>>>> things where we want to co-locate things with the same key but not 
>>>>>>>> limit
>>>>>>>> the parallel processing to only a single worker like stateful DoFn does
>>>>>>>> today.
>>>>>>>>
>>>>>>>> On Tue, Jun 16, 2020 at 2:44 PM Ismaël Mejía <ieme...@gmail.com>
>>>>>>>> wrote:
>>>>>>>>
>>>>>>>>> We have been promoting the use of DoFn to write IO connectors for
>>>>>>>>> many reasons
>>>>>>>>> including better composability. A common pattern that arrives in
>>>>>>>>> such IOs is
>>>>>>>>> that a preceding transform prepares the specification element on
>>>>>>>>> split that a
>>>>>>>>> subsequent DoFn uses to read the data. You can see an example of
>>>>>>>>> this on FileIO
>>>>>>>>> [1] or in RedisIO [2]
>>>>>>>>>
>>>>>>>>> The issue is that if we process that spec in the `@ProcessElement`
>>>>>>>>> method we
>>>>>>>>> lose the DoFn lifecycle because we cannot establish a connection
>>>>>>>>> on `@Setup` and
>>>>>>>>> close it in `@Teardown` because the spec is per element, so we end
>>>>>>>>> up re
>>>>>>>>> creating connections which is a quite costly operation in some
>>>>>>>>> systems like
>>>>>>>>> Cassandra/HBase/etc and that it could end up saturating the data
>>>>>>>>> store because
>>>>>>>>> of the massive creation of connections (something that already
>>>>>>>>> happened in the
>>>>>>>>> past with JdbcIO in the streaming case).
>>>>>>>>>
>>>>>>>>> In the ongoing PR that transforms Cassandra to be DoFn based [3]
>>>>>>>>> this subject
>>>>>>>>> appeared again, and we were discussing how to eventually reuse
>>>>>>>>> connections,
>>>>>>>>> maybe by a pretty naive approach of saving a previous connection
>>>>>>>>> (or set of
>>>>>>>>> identified connections) statically so it can be reused by multiple
>>>>>>>>> DoFns
>>>>>>>>> instances. We already had some issues in the past because of
>>>>>>>>> creating many
>>>>>>>>> connections on other IOs (JdbcIO) with streaming pipelines where
>>>>>>>>> databases were
>>>>>>>>> swamped by massive amounts of connections, so reusing connections
>>>>>>>>> seems to be
>>>>>>>>> something that matters, but at the moment we do not have a clear
>>>>>>>>> way to do this
>>>>>>>>> better.
>>>>>>>>>
>>>>>>>>> Anyone have better ideas or recommendations for this scenario?
>>>>>>>>> Thanks in advance.
>>>>>>>>>
>>>>>>>>> Ismaël
>>>>>>>>>
>>>>>>>>> [1]
>>>>>>>>> https://github.com/apache/beam/blob/14085a5a3c0e146fcc13ca77515bd24abc255eda/sdks/java/io/parquet/src/main/java/org/apache/beam/sdk/io/parquet/ParquetIO.java#L260
>>>>>>>>> [2]
>>>>>>>>> https://github.com/apache/beam/blob/14085a5a3c0e146fcc13ca77515bd24abc255eda/sdks/java/io/solr/src/main/java/org/apache/beam/sdk/io/solr/SolrIO.java#L471
>>>>>>>>> [3] https://github.com/apache/beam/pull/10546
>>>>>>>>
>>>>>>>>
>>>>>>>
>>>>>>

Reply via email to