The runner can always just depend on the sorter to do it the legacy way by
class matching; it shouldn't incur other dependency penalties... but now
that I look briefly, the sorter depends on Hadoop bits. That seems a heavy
price to pay for a user in any event. Are those Hadoop deps reasonably
self-contained?

Kenn

On Wed, Oct 17, 2018 at 2:39 PM Lukasz Cwik <[email protected]> wrote:

> Merging the sorter into sdks-java-core isn't needed for pipelines executed
> via portability since the Runner will be able to perform PTransform
> replacement and optimization based upon the URN of the transform and its
> payload so it would never need to have the "Sorter" class in its classpath.
>
> I'm ambivalent about whether merging it now is worth it.
>
> On Wed, Oct 17, 2018 at 2:31 PM David Morávek <[email protected]>
> wrote:
>
>> We can always fall back to the External sorter in case of merging
>> windows. I reckon in this case, values usually fit in memory, so it would
>> not be an issue.
>>
>> In case of non-merging windows, runner implementation would probably
>> require to group elements also by window during shuffle.
>>
>> On Wed, Oct 17, 2018 at 11:10 PM Reuven Lax <[email protected]> wrote:
>>
>>> One concern would be merging windows. This happens after shuffle, so
>>> even if the shuffle were sorted you would need to do a sorted merge of two
>>> sorted buffers.
>>>
>>> On Wed, Oct 17, 2018 at 2:08 PM David Morávek <[email protected]>
>>> wrote:
>>>
>>>> Hello,
>>>>
>>>> I want to summarize my thoughts on the per key value sorting.
>>>>
>>>> Currently we have a separate module for sorting extension. The
>>>> extension contains *SortValues* transformation and implementations of
>>>> different sorters.
>>>>
>>>> Performance-wise it would be great to be able* to delegate sorting to
>>>> a runner* if it supports sort based shuffle. In order to do so, we
>>>> should *move SortValues transformation to sdks-java-core*, so a runner
>>>> can easily provide its own implementation.
>>>>
>>>> The robust implementation is needed mainly for building of HFiles for
>>>> the HBase bulk load. When using external sorter, we often sort the whole
>>>> data set twice (shuffle may already did a job).
>>>>
>>>> SortValues can not use custom comparator, because we want to be able to
>>>> push sorting logic down to a byte based shuffle.
>>>>
>>>> The usage of SortValues transformation is little bit confusing. I think
>>>> we should add a *SortValues.perKey* method, which accepts a secondary
>>>> key extractor and coder, as the usage would be easier to understand. Also,
>>>> this explicitly states, that we sort values *perKey* only and that we
>>>> sort using an *encoded secondary key*. Example usage:
>>>>
>>>>
>>>> *PCollection<KV<String, Long>> input = ...;*
>>>> *input.apply(SortValues.perKey(KV::getValue, BigEndianLongCoder.of()))*
>>>>
>>>> What do you think? Is this the right direction?
>>>>
>>>> Thanks for the comments!
>>>>
>>>> Links:
>>>> -
>>>> http://mail-archives.apache.org/mod_mbox/beam-dev/201805.mbox/%3Cl8D.1U3Hp.5IxQdKoVDzH.1R3dyk%40seznam.cz%3E
>>>>
>>>

Reply via email to