Kenn, I believe we should not introduce hadoop dependency to neither sdks
or runners. We may split sorting in two packages, one with the
transformation + in memory implementation (this is the part I'd love to see
become part of sdks-java-core) and second module with more robust external
sorter (with hadoop dep).

Does this make sense?


On Thu, Oct 18, 2018 at 2:03 AM Dan Halperin <[email protected]> wrote:

> On Wed, Oct 17, 2018 at 3:44 PM Kenneth Knowles <[email protected]> wrote:
>
>> The runner can always just depend on the sorter to do it the legacy way
>> by class matching; it shouldn't incur other dependency penalties... but now
>> that I look briefly, the sorter depends on Hadoop bits. That seems a heavy
>> price to pay for a user in any event. Are those Hadoop deps reasonably
>> self-contained?
>>
>
> Nice catch, Kenn! This is indeed why we didn't originally include the
> Sorter in core. The Hadoop deps have an enormous surface, or did at the
> time.
>
> Dan
>
>
>>
>> Kenn
>>
>> On Wed, Oct 17, 2018 at 2:39 PM Lukasz Cwik <[email protected]> wrote:
>>
>>> Merging the sorter into sdks-java-core isn't needed for pipelines
>>> executed via portability since the Runner will be able to perform
>>> PTransform replacement and optimization based upon the URN of the transform
>>> and its payload so it would never need to have the "Sorter" class in its
>>> classpath.
>>>
>>> I'm ambivalent about whether merging it now is worth it.
>>>
>>> On Wed, Oct 17, 2018 at 2:31 PM David Morávek <[email protected]>
>>> wrote:
>>>
>>>> We can always fall back to the External sorter in case of merging
>>>> windows. I reckon in this case, values usually fit in memory, so it would
>>>> not be an issue.
>>>>
>>>> In case of non-merging windows, runner implementation would probably
>>>> require to group elements also by window during shuffle.
>>>>
>>>> On Wed, Oct 17, 2018 at 11:10 PM Reuven Lax <[email protected]> wrote:
>>>>
>>>>> One concern would be merging windows. This happens after shuffle, so
>>>>> even if the shuffle were sorted you would need to do a sorted merge of two
>>>>> sorted buffers.
>>>>>
>>>>> On Wed, Oct 17, 2018 at 2:08 PM David Morávek <[email protected]>
>>>>> wrote:
>>>>>
>>>>>> Hello,
>>>>>>
>>>>>> I want to summarize my thoughts on the per key value sorting.
>>>>>>
>>>>>> Currently we have a separate module for sorting extension. The
>>>>>> extension contains *SortValues* transformation and implementations
>>>>>> of different sorters.
>>>>>>
>>>>>> Performance-wise it would be great to be able* to delegate sorting
>>>>>> to a runner* if it supports sort based shuffle. In order to do so,
>>>>>> we should *move SortValues transformation to sdks-java-core*, so a
>>>>>> runner can easily provide its own implementation.
>>>>>>
>>>>>> The robust implementation is needed mainly for building of HFiles for
>>>>>> the HBase bulk load. When using external sorter, we often sort the whole
>>>>>> data set twice (shuffle may already did a job).
>>>>>>
>>>>>> SortValues can not use custom comparator, because we want to be able
>>>>>> to push sorting logic down to a byte based shuffle.
>>>>>>
>>>>>> The usage of SortValues transformation is little bit confusing. I
>>>>>> think we should add a *SortValues.perKey* method, which accepts a
>>>>>> secondary key extractor and coder, as the usage would be easier to
>>>>>> understand. Also, this explicitly states, that we sort values
>>>>>> *perKey* only and that we sort using an *encoded secondary key*.
>>>>>> Example usage:
>>>>>>
>>>>>>
>>>>>> *PCollection<KV<String, Long>> input = ...;*
>>>>>> *input.apply(SortValues.perKey(KV::getValue,
>>>>>> BigEndianLongCoder.of()))*
>>>>>>
>>>>>> What do you think? Is this the right direction?
>>>>>>
>>>>>> Thanks for the comments!
>>>>>>
>>>>>> Links:
>>>>>> -
>>>>>> http://mail-archives.apache.org/mod_mbox/beam-dev/201805.mbox/%3Cl8D.1U3Hp.5IxQdKoVDzH.1R3dyk%40seznam.cz%3E
>>>>>>
>>>>>

Reply via email to