--
Best regards,
Siyuan

On Mon, Jun 29, 2020 at 1:06 PM Kenneth Knowles <k...@apache.org> wrote:

> Great doc.
>
> The extended API makes sense. I like how it removes a knob. The question
> that I have to ask is whether there is a core model change here or can we
> avoid it. Defining "shard" as a scope of state within which execution is
> observably serial, today the model has key+window sharding always. It seems
> like two options mentioned are:
>
>  - new primitive way for runner to inject shard keys, so standard stateful
> ParDo works
>  - new primitive stateful ParDo with runner-defined scope of state that is
> not observable to users
>
> Seems on the doc that the latter is favored. I like it better, too. I do
> not know how dynamic resharding would work for either of these. With
> dynamic resharding the latter seems related to SDF.
>
The runner-side implementation can be the same for both cases. One way
would be to generate a shard number for a given input key when the data is
emitted by an upstream transform and passed back to the runner, and then
the runner can distribute the data to the corresponding shard for
processing GroupIntoBatches. It's dynamic in that the data with the same
key emitted at different times can be tagged with different shard numbers.
SDF is mainly for sources AFAIU so is different (not familiar with that so
correct me if I am wrong :p).

>
> Suppose we say that the semantics of the URN+payload "GroupIntoBatch {
> maxSize = <n> }" allows any segmentation of the PCollection into batches of
> size <= n. The runner can replace it with the proposed implementation. But
> what does the default implementation look like? If the default
> implementation is not good (like, say, "reshuffle") then maybe figuring out
> the primitive and adding it is better than leaving a transform that is sort
> of a hack when not replaced.
>

It's a good point that we should think more carefully when extending the
API. The default implementation of `ofSize()` or other similar limits like
`ofBytes()` would behave like a normal stateful DoFn. (`ofMaxSize` sounds
similar to `ofSize()` since ofSize() guarantees emitting a fixed number of
elements whenever possible and emitting a partial result otherwise.)
Enabling runner determined sharding on a runner that does not support
dynamic sharding would fall back to the current (default) implementation
so no behavioral changes than today. Allowing the transform to accept
unkeyed input might have to use a naive sharding though, e.g., paring each
element with a random key.

>
> Kenn
>
> On Mon, Jun 29, 2020 at 9:26 AM Luke Cwik <lc...@google.com> wrote:
>
>>
>>
>> On Fri, Jun 26, 2020 at 3:45 PM Tyson Hamilton <tyso...@google.com>
>> wrote:
>>
>>> Nice doc by the way, it's concise. Thanks for sharing and I'm excited to
>>> see this feature, particularly the PCollection<T> variant that would have
>>> been useful for the Cloud AI transforms recently introduced.
>>>
>>> On Fri, Jun 26, 2020 at 3:25 PM Tyson Hamilton <tyso...@google.com>
>>> wrote:
>>>
>>>>
>>>>
>>>> On Fri, Jun 26, 2020 at 12:24 PM Siyuan Chen <syc...@google.com> wrote:
>>>>
>>>>> Thanks Luke!
>>>>>
>>>>> Hi, I'm Siyuan and I'm working on the Google Dataflow team. We are
>>>>> faced with similar issues with some sinks commonly used by Dataflow such 
>>>>> as
>>>>> the streaming BigQuery sink. Basically the elements are grouped and 
>>>>> batched
>>>>> so some following operations (potentially expensive) can be performed at
>>>>> once on a batch of elements. One problem with the grouping is that it 
>>>>> could
>>>>> impose a limit on the parallelism of the DoFn performing those operations.
>>>>> To mitigate the limited parallelism problem, recently I have been looking
>>>>> into the idea of improving the `GroupIntoBatches` transform to allow the
>>>>> grouping to be dynamically sharded and therefore distributed - essentially
>>>>> a "shardable" stateful DoFn. The transform already does grouping and
>>>>> batching on the input KV - grouping on K and batching on V - and it could
>>>>> be extended to be able to shard the key and do batching within each shard
>>>>> (meaning that we would have a sharded form of keys somewhere). The idea is
>>>>> detailed in https://s.apache.org/sharded-group-into-batches
>>>>>
>>>>> Along with the proposal, there are two points I would like to ask for
>>>>> advice:
>>>>> - Would there be cases where the sharded keys need to be visible to
>>>>> users? One case where that might be needed would be to apply another
>>>>> stateful DoFn to the sharded output of the GroupIntoBatches, so the
>>>>> semantics of key-to-user state mapping is respected.
>>>>>
>>>>
>>>> Does exposing an API for the sharded keys change the implementation of
>>>> the feature? If it is only an API change I think it would be best to avoid
>>>> exposing the keys to start with to avoid any unnecessary dependency on the
>>>> implementation. It seems like it could make it more difficult to modify the
>>>> sharding implementation in the future unnecessarily at this point.
>>>>
>>>
>> Exposing the shard id makes it such that the shard id must cross the
>> portability APIs during execution. If we don't expose it then it could be
>> implemented completely within the runner and all the SDK has to do is say
>> that this stateful transform supports sharding or this transform is the
>> GroupIntoBatches transform.
>>
>>
>>> - Would there be a need to have a per element shard id or per bundle
>>>>> shard id would just be sufficient? The former is more general and we could
>>>>> still have the same shard id for all elements in a bundle. But the
>>>>> conclusion would potentially affect the way of implementation (like how 
>>>>> the
>>>>> sharding information should be passed across FnAPI for example).
>>>>>
>>>>>
>>>> Are you referring to an API for a pipeline author to get the shard id?
>>>> I thought that a bundle isn't a pipeline author abstraction but an
>>>> implementation detail, I may be wrong in this since I'm not too familiar
>>>> with this area of code. In the proposal it looks like the shard id isn't
>>>> exposed, I prefer this, as I'm not sure there is any value for the user in
>>>> having a specific 'shard id'. Is there?
>>>>
>>>
>> A bundle is exposed to a pipeline author since they regularly want to
>> know the lifetime (e.g. startBundle/finishBundle on DoFn) of a bundle to
>> amortize  setup/teardown at the bundle level.
>>
>> Exposing the shard id is the big question and is there a use case for
>> it. I hope the community can provide guidance here otherwise I'm with you
>> and also agree that exposing it isn't necessary.
>>
>>
>>>
>>>>
>>>>> I'm very new to Beam so looking forward to hearing the thoughts from
>>>>> the community. Any comments will be appreciated :)
>>>>> --
>>>>> Best regards,
>>>>> Siyuan
>>>>>
>>>>>
>>>>> On Tue, Jun 16, 2020 at 3:04 PM Luke Cwik <lc...@google.com> wrote:
>>>>>
>>>>>> My first idea was to use a connection pool that is shared across the
>>>>>> entire worker across multiple bundles. The connection pool would TTL
>>>>>> connections that have been unused. This would help a bunch up until you 
>>>>>> hit
>>>>>> the problem where you don't want every worker connected to every resource
>>>>>> because of sharding of the work. In this case we should really be making
>>>>>> sure that workers that have processed the same "key" process the same 
>>>>>> "key"
>>>>>> again without limiting the number of workers that can process a specific
>>>>>> key. This is very similar to what we do with a stateful DoFn but one 
>>>>>> where
>>>>>> the runner knows that it can "shard" the key. +Siyuan Chen
>>>>>> <syc...@google.com> has been investigating something like this for
>>>>>> Dataflow to solve scalability issues with the BigQuery sink and has been
>>>>>> looking into how a better GroupIntoBatches and/or sharded stateful DoFn
>>>>>> could really help in these situations. This applies in general to lots of
>>>>>> things where we want to co-locate things with the same key but not limit
>>>>>> the parallel processing to only a single worker like stateful DoFn does
>>>>>> today.
>>>>>>
>>>>>> On Tue, Jun 16, 2020 at 2:44 PM Ismaël Mejía <ieme...@gmail.com>
>>>>>> wrote:
>>>>>>
>>>>>>> We have been promoting the use of DoFn to write IO connectors for
>>>>>>> many reasons
>>>>>>> including better composability. A common pattern that arrives in
>>>>>>> such IOs is
>>>>>>> that a preceding transform prepares the specification element on
>>>>>>> split that a
>>>>>>> subsequent DoFn uses to read the data. You can see an example of
>>>>>>> this on FileIO
>>>>>>> [1] or in RedisIO [2]
>>>>>>>
>>>>>>> The issue is that if we process that spec in the `@ProcessElement`
>>>>>>> method we
>>>>>>> lose the DoFn lifecycle because we cannot establish a connection on
>>>>>>> `@Setup` and
>>>>>>> close it in `@Teardown` because the spec is per element, so we end
>>>>>>> up re
>>>>>>> creating connections which is a quite costly operation in some
>>>>>>> systems like
>>>>>>> Cassandra/HBase/etc and that it could end up saturating the data
>>>>>>> store because
>>>>>>> of the massive creation of connections (something that already
>>>>>>> happened in the
>>>>>>> past with JdbcIO in the streaming case).
>>>>>>>
>>>>>>> In the ongoing PR that transforms Cassandra to be DoFn based [3]
>>>>>>> this subject
>>>>>>> appeared again, and we were discussing how to eventually reuse
>>>>>>> connections,
>>>>>>> maybe by a pretty naive approach of saving a previous connection (or
>>>>>>> set of
>>>>>>> identified connections) statically so it can be reused by multiple
>>>>>>> DoFns
>>>>>>> instances. We already had some issues in the past because of
>>>>>>> creating many
>>>>>>> connections on other IOs (JdbcIO) with streaming pipelines where
>>>>>>> databases were
>>>>>>> swamped by massive amounts of connections, so reusing connections
>>>>>>> seems to be
>>>>>>> something that matters, but at the moment we do not have a clear way
>>>>>>> to do this
>>>>>>> better.
>>>>>>>
>>>>>>> Anyone have better ideas or recommendations for this scenario?
>>>>>>> Thanks in advance.
>>>>>>>
>>>>>>> Ismaël
>>>>>>>
>>>>>>> [1]
>>>>>>> https://github.com/apache/beam/blob/14085a5a3c0e146fcc13ca77515bd24abc255eda/sdks/java/io/parquet/src/main/java/org/apache/beam/sdk/io/parquet/ParquetIO.java#L260
>>>>>>> [2]
>>>>>>> https://github.com/apache/beam/blob/14085a5a3c0e146fcc13ca77515bd24abc255eda/sdks/java/io/solr/src/main/java/org/apache/beam/sdk/io/solr/SolrIO.java#L471
>>>>>>> [3] https://github.com/apache/beam/pull/10546
>>>>>>
>>>>>>
>>>>>
>>>>

Reply via email to