The runner can always just depend on the sorter to do it the legacy way by class matching; it shouldn't incur other dependency penalties... but now that I look briefly, the sorter depends on Hadoop bits. That seems a heavy price to pay for a user in any event. Are those Hadoop deps reasonably self-contained?
Kenn On Wed, Oct 17, 2018 at 2:39 PM Lukasz Cwik <[email protected]> wrote: > Merging the sorter into sdks-java-core isn't needed for pipelines executed > via portability since the Runner will be able to perform PTransform > replacement and optimization based upon the URN of the transform and its > payload so it would never need to have the "Sorter" class in its classpath. > > I'm ambivalent about whether merging it now is worth it. > > On Wed, Oct 17, 2018 at 2:31 PM David Morávek <[email protected]> > wrote: > >> We can always fall back to the External sorter in case of merging >> windows. I reckon in this case, values usually fit in memory, so it would >> not be an issue. >> >> In case of non-merging windows, runner implementation would probably >> require to group elements also by window during shuffle. >> >> On Wed, Oct 17, 2018 at 11:10 PM Reuven Lax <[email protected]> wrote: >> >>> One concern would be merging windows. This happens after shuffle, so >>> even if the shuffle were sorted you would need to do a sorted merge of two >>> sorted buffers. >>> >>> On Wed, Oct 17, 2018 at 2:08 PM David Morávek <[email protected]> >>> wrote: >>> >>>> Hello, >>>> >>>> I want to summarize my thoughts on the per key value sorting. >>>> >>>> Currently we have a separate module for sorting extension. The >>>> extension contains *SortValues* transformation and implementations of >>>> different sorters. >>>> >>>> Performance-wise it would be great to be able* to delegate sorting to >>>> a runner* if it supports sort based shuffle. In order to do so, we >>>> should *move SortValues transformation to sdks-java-core*, so a runner >>>> can easily provide its own implementation. >>>> >>>> The robust implementation is needed mainly for building of HFiles for >>>> the HBase bulk load. When using external sorter, we often sort the whole >>>> data set twice (shuffle may already did a job). >>>> >>>> SortValues can not use custom comparator, because we want to be able to >>>> push sorting logic down to a byte based shuffle. >>>> >>>> The usage of SortValues transformation is little bit confusing. I think >>>> we should add a *SortValues.perKey* method, which accepts a secondary >>>> key extractor and coder, as the usage would be easier to understand. Also, >>>> this explicitly states, that we sort values *perKey* only and that we >>>> sort using an *encoded secondary key*. Example usage: >>>> >>>> >>>> *PCollection<KV<String, Long>> input = ...;* >>>> *input.apply(SortValues.perKey(KV::getValue, BigEndianLongCoder.of()))* >>>> >>>> What do you think? Is this the right direction? >>>> >>>> Thanks for the comments! >>>> >>>> Links: >>>> - >>>> http://mail-archives.apache.org/mod_mbox/beam-dev/201805.mbox/%3Cl8D.1U3Hp.5IxQdKoVDzH.1R3dyk%40seznam.cz%3E >>>> >>>
