I see, the splitting of state shards is related to splitting of splittable
DoFns.

On Tue, Jun 30, 2020 at 3:36 PM Kenneth Knowles <k...@apache.org> wrote:

> I agree at the level of GroupIntoBatches. The part that is similar is if
> you implement GroupIntoBatches with a new primitive supporting
> runner-scoped state where the state is per-shard and the runner is able to
> split state shards.
>
> Kenn
>
> On Tue, Jun 30, 2020 at 9:22 AM Luke Cwik <lc...@google.com> wrote:
>
>> I'm not sure that runner determined sharding/GroupIntoBatches applies to
>> splittable DoFns. Splittable DoFns is about taking one element and having a
>> high fan-out / high cost function broken down to smaller pieces while
>> runner determined sharding/GroupIntoBatches is about taking a lot of small
>> elements and doing something with all of them together at once (e.g.
>> service call).
>>
>>
>> On Mon, Jun 29, 2020 at 3:00 PM Siyuan Chen <syc...@google.com> wrote:
>>
>>>
>>> --
>>> Best regards,
>>> Siyuan
>>>
>>>
>>> On Mon, Jun 29, 2020 at 1:06 PM Kenneth Knowles <k...@apache.org> wrote:
>>>
>>>> Great doc.
>>>>
>>>> The extended API makes sense. I like how it removes a knob. The
>>>> question that I have to ask is whether there is a core model change here or
>>>> can we avoid it. Defining "shard" as a scope of state within which
>>>> execution is observably serial, today the model has key+window sharding
>>>> always. It seems like two options mentioned are:
>>>>
>>>>  - new primitive way for runner to inject shard keys, so standard
>>>> stateful ParDo works
>>>>  - new primitive stateful ParDo with runner-defined scope of state that
>>>> is not observable to users
>>>>
>>>> Seems on the doc that the latter is favored. I like it better, too. I
>>>> do not know how dynamic resharding would work for either of these. With
>>>> dynamic resharding the latter seems related to SDF.
>>>>
>>> The runner-side implementation can be the same for both cases. One way
>>> would be to generate a shard number for a given input key when the data is
>>> emitted by an upstream transform and passed back to the runner, and then
>>> the runner can distribute the data to the corresponding shard for
>>> processing GroupIntoBatches. It's dynamic in that the data with the same
>>> key emitted at different times can be tagged with different shard numbers.
>>> SDF is mainly for sources AFAIU so is different (not familiar with that so
>>> correct me if I am wrong :p).
>>>
>>>>
>>>> Suppose we say that the semantics of the URN+payload "GroupIntoBatch {
>>>> maxSize = <n> }" allows any segmentation of the PCollection into batches of
>>>> size <= n. The runner can replace it with the proposed implementation. But
>>>> what does the default implementation look like? If the default
>>>> implementation is not good (like, say, "reshuffle") then maybe figuring out
>>>> the primitive and adding it is better than leaving a transform that is sort
>>>> of a hack when not replaced.
>>>>
>>>
>>> It's a good point that we should think more carefully when extending the
>>> API. The default implementation of `ofSize()` or other similar limits like
>>> `ofBytes()` would behave like a normal stateful DoFn. (`ofMaxSize` sounds
>>> similar to `ofSize()` since ofSize() guarantees emitting a fixed number of
>>> elements whenever possible and emitting a partial result otherwise.)
>>> Enabling runner determined sharding on a runner that does not support
>>> dynamic sharding would fall back to the current (default) implementation
>>> so no behavioral changes than today. Allowing the transform to accept
>>> unkeyed input might have to use a naive sharding though, e.g., paring each
>>> element with a random key.
>>>
>>>>
>>>> Kenn
>>>>
>>>> On Mon, Jun 29, 2020 at 9:26 AM Luke Cwik <lc...@google.com> wrote:
>>>>
>>>>>
>>>>>
>>>>> On Fri, Jun 26, 2020 at 3:45 PM Tyson Hamilton <tyso...@google.com>
>>>>> wrote:
>>>>>
>>>>>> Nice doc by the way, it's concise. Thanks for sharing and I'm excited
>>>>>> to see this feature, particularly the PCollection<T> variant that would
>>>>>> have been useful for the Cloud AI transforms recently introduced.
>>>>>>
>>>>>> On Fri, Jun 26, 2020 at 3:25 PM Tyson Hamilton <tyso...@google.com>
>>>>>> wrote:
>>>>>>
>>>>>>>
>>>>>>>
>>>>>>> On Fri, Jun 26, 2020 at 12:24 PM Siyuan Chen <syc...@google.com>
>>>>>>> wrote:
>>>>>>>
>>>>>>>> Thanks Luke!
>>>>>>>>
>>>>>>>> Hi, I'm Siyuan and I'm working on the Google Dataflow team. We are
>>>>>>>> faced with similar issues with some sinks commonly used by Dataflow 
>>>>>>>> such as
>>>>>>>> the streaming BigQuery sink. Basically the elements are grouped and 
>>>>>>>> batched
>>>>>>>> so some following operations (potentially expensive) can be performed 
>>>>>>>> at
>>>>>>>> once on a batch of elements. One problem with the grouping is that it 
>>>>>>>> could
>>>>>>>> impose a limit on the parallelism of the DoFn performing those 
>>>>>>>> operations.
>>>>>>>> To mitigate the limited parallelism problem, recently I have been 
>>>>>>>> looking
>>>>>>>> into the idea of improving the `GroupIntoBatches` transform to allow 
>>>>>>>> the
>>>>>>>> grouping to be dynamically sharded and therefore distributed - 
>>>>>>>> essentially
>>>>>>>> a "shardable" stateful DoFn. The transform already does grouping and
>>>>>>>> batching on the input KV - grouping on K and batching on V - and it 
>>>>>>>> could
>>>>>>>> be extended to be able to shard the key and do batching within each 
>>>>>>>> shard
>>>>>>>> (meaning that we would have a sharded form of keys somewhere). The 
>>>>>>>> idea is
>>>>>>>> detailed in https://s.apache.org/sharded-group-into-batches
>>>>>>>>
>>>>>>>> Along with the proposal, there are two points I would like to ask
>>>>>>>> for advice:
>>>>>>>> - Would there be cases where the sharded keys need to be visible to
>>>>>>>> users? One case where that might be needed would be to apply another
>>>>>>>> stateful DoFn to the sharded output of the GroupIntoBatches, so the
>>>>>>>> semantics of key-to-user state mapping is respected.
>>>>>>>>
>>>>>>>
>>>>>>> Does exposing an API for the sharded keys change the implementation
>>>>>>> of the feature? If it is only an API change I think it would be best to
>>>>>>> avoid exposing the keys to start with to avoid any unnecessary 
>>>>>>> dependency
>>>>>>> on the implementation. It seems like it could make it more difficult to
>>>>>>> modify the sharding implementation in the future unnecessarily at this
>>>>>>> point.
>>>>>>>
>>>>>>
>>>>> Exposing the shard id makes it such that the shard id must cross the
>>>>> portability APIs during execution. If we don't expose it then it could be
>>>>> implemented completely within the runner and all the SDK has to do is say
>>>>> that this stateful transform supports sharding or this transform is the
>>>>> GroupIntoBatches transform.
>>>>>
>>>>>
>>>>>> - Would there be a need to have a per element shard id or per bundle
>>>>>>>> shard id would just be sufficient? The former is more general and we 
>>>>>>>> could
>>>>>>>> still have the same shard id for all elements in a bundle. But the
>>>>>>>> conclusion would potentially affect the way of implementation (like 
>>>>>>>> how the
>>>>>>>> sharding information should be passed across FnAPI for example).
>>>>>>>>
>>>>>>>>
>>>>>>> Are you referring to an API for a pipeline author to get the shard
>>>>>>> id? I thought that a bundle isn't a pipeline author abstraction but an
>>>>>>> implementation detail, I may be wrong in this since I'm not too familiar
>>>>>>> with this area of code. In the proposal it looks like the shard id isn't
>>>>>>> exposed, I prefer this, as I'm not sure there is any value for the user 
>>>>>>> in
>>>>>>> having a specific 'shard id'. Is there?
>>>>>>>
>>>>>>
>>>>> A bundle is exposed to a pipeline author since they regularly want to
>>>>> know the lifetime (e.g. startBundle/finishBundle on DoFn) of a bundle to
>>>>> amortize  setup/teardown at the bundle level.
>>>>>
>>>>> Exposing the shard id is the big question and is there a use case for
>>>>> it. I hope the community can provide guidance here otherwise I'm with you
>>>>> and also agree that exposing it isn't necessary.
>>>>>
>>>>>
>>>>>>
>>>>>>>
>>>>>>>> I'm very new to Beam so looking forward to hearing the thoughts
>>>>>>>> from the community. Any comments will be appreciated :)
>>>>>>>> --
>>>>>>>> Best regards,
>>>>>>>> Siyuan
>>>>>>>>
>>>>>>>>
>>>>>>>> On Tue, Jun 16, 2020 at 3:04 PM Luke Cwik <lc...@google.com> wrote:
>>>>>>>>
>>>>>>>>> My first idea was to use a connection pool that is shared across
>>>>>>>>> the entire worker across multiple bundles. The connection pool would 
>>>>>>>>> TTL
>>>>>>>>> connections that have been unused. This would help a bunch up until 
>>>>>>>>> you hit
>>>>>>>>> the problem where you don't want every worker connected to every 
>>>>>>>>> resource
>>>>>>>>> because of sharding of the work. In this case we should really be 
>>>>>>>>> making
>>>>>>>>> sure that workers that have processed the same "key" process the same 
>>>>>>>>> "key"
>>>>>>>>> again without limiting the number of workers that can process a 
>>>>>>>>> specific
>>>>>>>>> key. This is very similar to what we do with a stateful DoFn but one 
>>>>>>>>> where
>>>>>>>>> the runner knows that it can "shard" the key. +Siyuan Chen
>>>>>>>>> <syc...@google.com> has been investigating something like this
>>>>>>>>> for Dataflow to solve scalability issues with the BigQuery sink and 
>>>>>>>>> has
>>>>>>>>> been looking into how a better GroupIntoBatches and/or sharded 
>>>>>>>>> stateful
>>>>>>>>> DoFn could really help in these situations. This applies in general 
>>>>>>>>> to lots
>>>>>>>>> of things where we want to co-locate things with the same key but not 
>>>>>>>>> limit
>>>>>>>>> the parallel processing to only a single worker like stateful DoFn 
>>>>>>>>> does
>>>>>>>>> today.
>>>>>>>>>
>>>>>>>>> On Tue, Jun 16, 2020 at 2:44 PM Ismaël Mejía <ieme...@gmail.com>
>>>>>>>>> wrote:
>>>>>>>>>
>>>>>>>>>> We have been promoting the use of DoFn to write IO connectors for
>>>>>>>>>> many reasons
>>>>>>>>>> including better composability. A common pattern that arrives in
>>>>>>>>>> such IOs is
>>>>>>>>>> that a preceding transform prepares the specification element on
>>>>>>>>>> split that a
>>>>>>>>>> subsequent DoFn uses to read the data. You can see an example of
>>>>>>>>>> this on FileIO
>>>>>>>>>> [1] or in RedisIO [2]
>>>>>>>>>>
>>>>>>>>>> The issue is that if we process that spec in the
>>>>>>>>>> `@ProcessElement` method we
>>>>>>>>>> lose the DoFn lifecycle because we cannot establish a connection
>>>>>>>>>> on `@Setup` and
>>>>>>>>>> close it in `@Teardown` because the spec is per element, so we
>>>>>>>>>> end up re
>>>>>>>>>> creating connections which is a quite costly operation in some
>>>>>>>>>> systems like
>>>>>>>>>> Cassandra/HBase/etc and that it could end up saturating the data
>>>>>>>>>> store because
>>>>>>>>>> of the massive creation of connections (something that already
>>>>>>>>>> happened in the
>>>>>>>>>> past with JdbcIO in the streaming case).
>>>>>>>>>>
>>>>>>>>>> In the ongoing PR that transforms Cassandra to be DoFn based [3]
>>>>>>>>>> this subject
>>>>>>>>>> appeared again, and we were discussing how to eventually reuse
>>>>>>>>>> connections,
>>>>>>>>>> maybe by a pretty naive approach of saving a previous connection
>>>>>>>>>> (or set of
>>>>>>>>>> identified connections) statically so it can be reused by
>>>>>>>>>> multiple DoFns
>>>>>>>>>> instances. We already had some issues in the past because of
>>>>>>>>>> creating many
>>>>>>>>>> connections on other IOs (JdbcIO) with streaming pipelines where
>>>>>>>>>> databases were
>>>>>>>>>> swamped by massive amounts of connections, so reusing connections
>>>>>>>>>> seems to be
>>>>>>>>>> something that matters, but at the moment we do not have a clear
>>>>>>>>>> way to do this
>>>>>>>>>> better.
>>>>>>>>>>
>>>>>>>>>> Anyone have better ideas or recommendations for this scenario?
>>>>>>>>>> Thanks in advance.
>>>>>>>>>>
>>>>>>>>>> Ismaël
>>>>>>>>>>
>>>>>>>>>> [1]
>>>>>>>>>> https://github.com/apache/beam/blob/14085a5a3c0e146fcc13ca77515bd24abc255eda/sdks/java/io/parquet/src/main/java/org/apache/beam/sdk/io/parquet/ParquetIO.java#L260
>>>>>>>>>> [2]
>>>>>>>>>> https://github.com/apache/beam/blob/14085a5a3c0e146fcc13ca77515bd24abc255eda/sdks/java/io/solr/src/main/java/org/apache/beam/sdk/io/solr/SolrIO.java#L471
>>>>>>>>>> [3] https://github.com/apache/beam/pull/10546
>>>>>>>>>
>>>>>>>>>
>>>>>>>>
>>>>>>>

Reply via email to