+1 to splitting out the Hadoop deps.

As has been said, there's no need to move it to core for runners to
optimize this. But perhaps a case could be made that this belongs in core?
(On the other hand, recent discussions indicate a desire to make core even
smaller.)

Also,  +1 to re-thinking the API. The current API takes elements of the form

    KV<PrimaryK, Iterable<KV<SecondaryK, SecondaryValue>>>

and the proposed API (from what I understand) would instead take elements
of the form

   KV<PrimaryK, Iterable<Value>>

and extract the secondary key from Value for sorting. This may be less
efficient in the case that SecondaryValue above is empty or trivial,
specifically, a runner may have to shuffle values of the form
serialized(key)+serialized(extractSecondaryKey(value))+serialized(value)
where serialized(value) may be quite redundant with
serialized(extractSecondaryKey(value)). I'm not sure this difference merits
the more complex API (especially as the only API).

I would also like to isolate the user from dealing directly with coders,
and having to know which ones are order-preserving in their encoding.
(Technically, BigEndianLongCoder as used above is problematic as it sorts
negative values after positive ones...an easy trap to fall into.) Perhaps
we would allow a function V -> KS where KS is a type where we know of a
(registered) order-preserving coder. Order-preserving KV and Tuple coders
could be built out of these; it's a property like key-deterministic. (We'd
need a "LengthPrefixCoder" that preserves order as well?) This notion could
be useful for optimizations of operations like Top and Quantiles as well.


On Thu, Oct 18, 2018 at 11:19 AM David Morávek <[email protected]>
wrote:

> Kenn, I believe we should not introduce hadoop dependency to neither sdks
> or runners. We may split sorting in two packages, one with the
> transformation + in memory implementation (this is the part I'd love to see
> become part of sdks-java-core) and second module with more robust external
> sorter (with hadoop dep).
>
> Does this make sense?
>
>
> On Thu, Oct 18, 2018 at 2:03 AM Dan Halperin <[email protected]> wrote:
>
>> On Wed, Oct 17, 2018 at 3:44 PM Kenneth Knowles <[email protected]> wrote:
>>
>>> The runner can always just depend on the sorter to do it the legacy way
>>> by class matching; it shouldn't incur other dependency penalties... but now
>>> that I look briefly, the sorter depends on Hadoop bits. That seems a heavy
>>> price to pay for a user in any event. Are those Hadoop deps reasonably
>>> self-contained?
>>>
>>
>> Nice catch, Kenn! This is indeed why we didn't originally include the
>> Sorter in core. The Hadoop deps have an enormous surface, or did at the
>> time.
>>
>> Dan
>>
>>
>>>
>>> Kenn
>>>
>>> On Wed, Oct 17, 2018 at 2:39 PM Lukasz Cwik <[email protected]> wrote:
>>>
>>>> Merging the sorter into sdks-java-core isn't needed for pipelines
>>>> executed via portability since the Runner will be able to perform
>>>> PTransform replacement and optimization based upon the URN of the transform
>>>> and its payload so it would never need to have the "Sorter" class in its
>>>> classpath.
>>>>
>>>> I'm ambivalent about whether merging it now is worth it.
>>>>
>>>> On Wed, Oct 17, 2018 at 2:31 PM David Morávek <[email protected]>
>>>> wrote:
>>>>
>>>>> We can always fall back to the External sorter in case of merging
>>>>> windows. I reckon in this case, values usually fit in memory, so it would
>>>>> not be an issue.
>>>>>
>>>>> In case of non-merging windows, runner implementation would probably
>>>>> require to group elements also by window during shuffle.
>>>>>
>>>>> On Wed, Oct 17, 2018 at 11:10 PM Reuven Lax <[email protected]> wrote:
>>>>>
>>>>>> One concern would be merging windows. This happens after shuffle, so
>>>>>> even if the shuffle were sorted you would need to do a sorted merge of 
>>>>>> two
>>>>>> sorted buffers.
>>>>>>
>>>>>> On Wed, Oct 17, 2018 at 2:08 PM David Morávek <
>>>>>> [email protected]> wrote:
>>>>>>
>>>>>>> Hello,
>>>>>>>
>>>>>>> I want to summarize my thoughts on the per key value sorting.
>>>>>>>
>>>>>>> Currently we have a separate module for sorting extension. The
>>>>>>> extension contains *SortValues* transformation and implementations
>>>>>>> of different sorters.
>>>>>>>
>>>>>>> Performance-wise it would be great to be able* to delegate sorting
>>>>>>> to a runner* if it supports sort based shuffle. In order to do so,
>>>>>>> we should *move SortValues transformation to sdks-java-core*, so a
>>>>>>> runner can easily provide its own implementation.
>>>>>>>
>>>>>>> The robust implementation is needed mainly for building of HFiles
>>>>>>> for the HBase bulk load. When using external sorter, we often sort the
>>>>>>> whole data set twice (shuffle may already did a job).
>>>>>>>
>>>>>>> SortValues can not use custom comparator, because we want to be able
>>>>>>> to push sorting logic down to a byte based shuffle.
>>>>>>>
>>>>>>> The usage of SortValues transformation is little bit confusing. I
>>>>>>> think we should add a *SortValues.perKey* method, which accepts a
>>>>>>> secondary key extractor and coder, as the usage would be easier to
>>>>>>> understand. Also, this explicitly states, that we sort values
>>>>>>> *perKey* only and that we sort using an *encoded secondary key*.
>>>>>>> Example usage:
>>>>>>>
>>>>>>>
>>>>>>> *PCollection<KV<String, Long>> input = ...;*
>>>>>>> *input.apply(SortValues.perKey(KV::getValue,
>>>>>>> BigEndianLongCoder.of()))*
>>>>>>>
>>>>>>> What do you think? Is this the right direction?
>>>>>>>
>>>>>>> Thanks for the comments!
>>>>>>>
>>>>>>> Links:
>>>>>>> -
>>>>>>> http://mail-archives.apache.org/mod_mbox/beam-dev/201805.mbox/%3Cl8D.1U3Hp.5IxQdKoVDzH.1R3dyk%40seznam.cz%3E
>>>>>>>
>>>>>>

Reply via email to