Kenn, I believe we should not introduce hadoop dependency to neither sdks or runners. We may split sorting in two packages, one with the transformation + in memory implementation (this is the part I'd love to see become part of sdks-java-core) and second module with more robust external sorter (with hadoop dep).
Does this make sense? On Thu, Oct 18, 2018 at 2:03 AM Dan Halperin <[email protected]> wrote: > On Wed, Oct 17, 2018 at 3:44 PM Kenneth Knowles <[email protected]> wrote: > >> The runner can always just depend on the sorter to do it the legacy way >> by class matching; it shouldn't incur other dependency penalties... but now >> that I look briefly, the sorter depends on Hadoop bits. That seems a heavy >> price to pay for a user in any event. Are those Hadoop deps reasonably >> self-contained? >> > > Nice catch, Kenn! This is indeed why we didn't originally include the > Sorter in core. The Hadoop deps have an enormous surface, or did at the > time. > > Dan > > >> >> Kenn >> >> On Wed, Oct 17, 2018 at 2:39 PM Lukasz Cwik <[email protected]> wrote: >> >>> Merging the sorter into sdks-java-core isn't needed for pipelines >>> executed via portability since the Runner will be able to perform >>> PTransform replacement and optimization based upon the URN of the transform >>> and its payload so it would never need to have the "Sorter" class in its >>> classpath. >>> >>> I'm ambivalent about whether merging it now is worth it. >>> >>> On Wed, Oct 17, 2018 at 2:31 PM David Morávek <[email protected]> >>> wrote: >>> >>>> We can always fall back to the External sorter in case of merging >>>> windows. I reckon in this case, values usually fit in memory, so it would >>>> not be an issue. >>>> >>>> In case of non-merging windows, runner implementation would probably >>>> require to group elements also by window during shuffle. >>>> >>>> On Wed, Oct 17, 2018 at 11:10 PM Reuven Lax <[email protected]> wrote: >>>> >>>>> One concern would be merging windows. This happens after shuffle, so >>>>> even if the shuffle were sorted you would need to do a sorted merge of two >>>>> sorted buffers. >>>>> >>>>> On Wed, Oct 17, 2018 at 2:08 PM David Morávek <[email protected]> >>>>> wrote: >>>>> >>>>>> Hello, >>>>>> >>>>>> I want to summarize my thoughts on the per key value sorting. >>>>>> >>>>>> Currently we have a separate module for sorting extension. The >>>>>> extension contains *SortValues* transformation and implementations >>>>>> of different sorters. >>>>>> >>>>>> Performance-wise it would be great to be able* to delegate sorting >>>>>> to a runner* if it supports sort based shuffle. In order to do so, >>>>>> we should *move SortValues transformation to sdks-java-core*, so a >>>>>> runner can easily provide its own implementation. >>>>>> >>>>>> The robust implementation is needed mainly for building of HFiles for >>>>>> the HBase bulk load. When using external sorter, we often sort the whole >>>>>> data set twice (shuffle may already did a job). >>>>>> >>>>>> SortValues can not use custom comparator, because we want to be able >>>>>> to push sorting logic down to a byte based shuffle. >>>>>> >>>>>> The usage of SortValues transformation is little bit confusing. I >>>>>> think we should add a *SortValues.perKey* method, which accepts a >>>>>> secondary key extractor and coder, as the usage would be easier to >>>>>> understand. Also, this explicitly states, that we sort values >>>>>> *perKey* only and that we sort using an *encoded secondary key*. >>>>>> Example usage: >>>>>> >>>>>> >>>>>> *PCollection<KV<String, Long>> input = ...;* >>>>>> *input.apply(SortValues.perKey(KV::getValue, >>>>>> BigEndianLongCoder.of()))* >>>>>> >>>>>> What do you think? Is this the right direction? >>>>>> >>>>>> Thanks for the comments! >>>>>> >>>>>> Links: >>>>>> - >>>>>> http://mail-archives.apache.org/mod_mbox/beam-dev/201805.mbox/%3Cl8D.1U3Hp.5IxQdKoVDzH.1R3dyk%40seznam.cz%3E >>>>>> >>>>>
