> Combine.perKey ... certainly is standardized / well-defined

Is there any document where it's defined?

Viliam

On Tue, 16 Apr 2019 at 18:27, Kenneth Knowles <k...@apache.org> wrote:

> On Tue, Apr 16, 2019 at 9:18 AM Reuven Lax <re...@google.com> wrote:
>
>> A common request (especially in streaming) is to support sorting values
>> by timestamp, not by the full value.
>>
>
> On this point, I think an explicit secondary key probably addresses the
> need. Naively implemented, the "sort by values" use case would have a lot
> of data duplication so we might have some payload on the transform to
> configure that, or a couple of related transforms.
>
> Kenn
>
>
>>
>> Reuven
>>
>> On Tue, Apr 16, 2019 at 9:08 AM Kenneth Knowles <k...@apache.org> wrote:
>>
>>> 1. This is clearly useful, and extensively used. Agree with all that. I
>>> think it can work for batch and streaming equally well if sorting is
>>> required only per "pane", though I might be overlooking something.
>>>
>>> 2. A transform need not be primitive to be well-defined and executed in
>>> a special way by most runners. For example, Combine.perKey is not a
>>> "primitive", where primitive means "axiomatic, lacking an expansion to
>>> other transforms". It has a composite definition in terms of other
>>> transforms. However, it certainly is standardized / well-defined and
>>> executed in a custom way by all runners, with the possible exception of
>>> direct runners (I didn't double check this). To make something a
>>> standardized well-defined transform it just needs a URN and an explicitly
>>> documented payload that goes along with the URN (which might be empty).
>>> Apologies if this is going into details you already know; I just want to
>>> emphasize that this is a key aspect of Beam's design, avoiding
>>> proliferation of primitives while allowing runners to optimize execution.
>>>
>>> In order for GroupByKeyAndSortValues* to have a status analogous to
>>> Combine.perKey it needs a URN (say, "beam:transforms:gbk-and-sort-values")
>>> and a code location where it can have a fallback composite definition. I
>>> would suggest piloting the idea of making experimental features opt-in
>>> includes with "experimenta" in the artifact id, so something like artifact
>>> id "org.apache.beam:beam-sdks-java-experimental-gbk-and-sort-values" (very
>>> long, open to improvement). Another idea would be
>>> "org.apache.beam.experiments" as a group id.
>>>
>>> Kenn
>>>
>>> *Note that BatchViewOverrides.GroupByKeyAndSortValuesOnly is actually an
>>> even lower-level primitive, the "Only" part indicates that it is windowing
>>> and event time unaware.
>>>
>>> On Tue, Apr 16, 2019 at 7:42 AM Gleb Kanterov <g...@spotify.com> wrote:
>>>
>>>> At the moment, portability has GroupByKey transform. In most data
>>>> processing frameworks, such as Hadoop MR and Apache Spark there is a
>>>> concept of secondary sorting during the shuffle phase. Dataflow worker code
>>>> has it under the name BatchViewOverrides.GroupByKeyAndSortValuesOnly [1],
>>>> it's PTransform<PCollection<KV<K1, KV<K2, V>>>, PCollection<KV<K1,
>>>> Iterable<KV<K2, V>>>>>. It does sharding by K1 and sorting by K2 within
>>>> each shard.
>>>>
>>>> I see a lot of value in adding GroupByKeyAndSort to the list of
>>>> built-in transforms so that runners can efficiently override it. It's
>>>> possible to define GroupByKeyAndSort as GroupByKey+SortValues [2], however,
>>>> having it as primitive will open the possibility for more efficient
>>>> implementation. What could be potential drawbacks? I didn't think much how
>>>> it could work for non-bach pipelines.
>>>>
>>>> Gleb
>>>>
>>>> [1]:
>>>> https://github.com/spotify/beam/blob/master/runners/google-cloud-dataflow-java/src/main/java/org/apache/beam/runners/dataflow/BatchViewOverrides.java#L1246
>>>> [2]:
>>>> https://github.com/apache/beam/blob/master/sdks/java/extensions/sorter/src/main/java/org/apache/beam/sdk/extensions/sorter/SortValues.java
>>>>
>>>>

Reply via email to