Merging the sorter into sdks-java-core isn't needed for pipelines executed
via portability since the Runner will be able to perform PTransform
replacement and optimization based upon the URN of the transform and its
payload so it would never need to have the "Sorter" class in its classpath.

I'm ambivalent about whether merging it now is worth it.

On Wed, Oct 17, 2018 at 2:31 PM David Morávek <[email protected]>
wrote:

> We can always fall back to the External sorter in case of merging windows.
> I reckon in this case, values usually fit in memory, so it would not be an
> issue.
>
> In case of non-merging windows, runner implementation would probably
> require to group elements also by window during shuffle.
>
> On Wed, Oct 17, 2018 at 11:10 PM Reuven Lax <[email protected]> wrote:
>
>> One concern would be merging windows. This happens after shuffle, so even
>> if the shuffle were sorted you would need to do a sorted merge of two
>> sorted buffers.
>>
>> On Wed, Oct 17, 2018 at 2:08 PM David Morávek <[email protected]>
>> wrote:
>>
>>> Hello,
>>>
>>> I want to summarize my thoughts on the per key value sorting.
>>>
>>> Currently we have a separate module for sorting extension. The extension
>>> contains *SortValues* transformation and implementations of different
>>> sorters.
>>>
>>> Performance-wise it would be great to be able* to delegate sorting to a
>>> runner* if it supports sort based shuffle. In order to do so, we should 
>>> *move
>>> SortValues transformation to sdks-java-core*, so a runner can easily
>>> provide its own implementation.
>>>
>>> The robust implementation is needed mainly for building of HFiles for
>>> the HBase bulk load. When using external sorter, we often sort the whole
>>> data set twice (shuffle may already did a job).
>>>
>>> SortValues can not use custom comparator, because we want to be able to
>>> push sorting logic down to a byte based shuffle.
>>>
>>> The usage of SortValues transformation is little bit confusing. I think
>>> we should add a *SortValues.perKey* method, which accepts a secondary
>>> key extractor and coder, as the usage would be easier to understand. Also,
>>> this explicitly states, that we sort values *perKey* only and that we
>>> sort using an *encoded secondary key*. Example usage:
>>>
>>>
>>> *PCollection<KV<String, Long>> input = ...;*
>>> *input.apply(SortValues.perKey(KV::getValue, BigEndianLongCoder.of()))*
>>>
>>> What do you think? Is this the right direction?
>>>
>>> Thanks for the comments!
>>>
>>> Links:
>>> -
>>> http://mail-archives.apache.org/mod_mbox/beam-dev/201805.mbox/%3Cl8D.1U3Hp.5IxQdKoVDzH.1R3dyk%40seznam.cz%3E
>>>
>>

Reply via email to