FYI: the BufferedExternalSorter depends on Hadoop client libraries (specifically hadoop_mapreduce_client_core and hadoop_common), but not on the Hadoop service -- because the ExternalSorter <https://github.com/apache/beam/blob/master/sdks/java/extensions/sorter/src/main/java/org/apache/beam/sdk/extensions/sorter/ExternalSorter.java> uses Hadoop's SequenceFile <http://hadoop.apache.org/docs/stable/api/index.html?org/apache/hadoop/io/SequenceFile.html> for on-disk sorting.
On Thu, 18 Oct 2018 at 11:19 David Morávek <[email protected]> wrote: > Kenn, I believe we should not introduce hadoop dependency to neither sdks > or runners. We may split sorting in two packages, one with the > transformation + in memory implementation (this is the part I'd love to see > become part of sdks-java-core) and second module with more robust external > sorter (with hadoop dep). > > Does this make sense? > > > On Thu, Oct 18, 2018 at 2:03 AM Dan Halperin <[email protected]> wrote: > >> On Wed, Oct 17, 2018 at 3:44 PM Kenneth Knowles <[email protected]> wrote: >> >>> The runner can always just depend on the sorter to do it the legacy way >>> by class matching; it shouldn't incur other dependency penalties... but now >>> that I look briefly, the sorter depends on Hadoop bits. That seems a heavy >>> price to pay for a user in any event. Are those Hadoop deps reasonably >>> self-contained? >>> >> >> Nice catch, Kenn! This is indeed why we didn't originally include the >> Sorter in core. The Hadoop deps have an enormous surface, or did at the >> time. >> >> Dan >> >> >>> >>> Kenn >>> >>> On Wed, Oct 17, 2018 at 2:39 PM Lukasz Cwik <[email protected]> wrote: >>> >>>> Merging the sorter into sdks-java-core isn't needed for pipelines >>>> executed via portability since the Runner will be able to perform >>>> PTransform replacement and optimization based upon the URN of the transform >>>> and its payload so it would never need to have the "Sorter" class in its >>>> classpath. >>>> >>>> I'm ambivalent about whether merging it now is worth it. >>>> >>>> On Wed, Oct 17, 2018 at 2:31 PM David Morávek <[email protected]> >>>> wrote: >>>> >>>>> We can always fall back to the External sorter in case of merging >>>>> windows. I reckon in this case, values usually fit in memory, so it would >>>>> not be an issue. >>>>> >>>>> In case of non-merging windows, runner implementation would probably >>>>> require to group elements also by window during shuffle. >>>>> >>>>> On Wed, Oct 17, 2018 at 11:10 PM Reuven Lax <[email protected]> wrote: >>>>> >>>>>> One concern would be merging windows. This happens after shuffle, so >>>>>> even if the shuffle were sorted you would need to do a sorted merge of >>>>>> two >>>>>> sorted buffers. >>>>>> >>>>>> On Wed, Oct 17, 2018 at 2:08 PM David Morávek < >>>>>> [email protected]> wrote: >>>>>> >>>>>>> Hello, >>>>>>> >>>>>>> I want to summarize my thoughts on the per key value sorting. >>>>>>> >>>>>>> Currently we have a separate module for sorting extension. The >>>>>>> extension contains *SortValues* transformation and implementations >>>>>>> of different sorters. >>>>>>> >>>>>>> Performance-wise it would be great to be able* to delegate sorting >>>>>>> to a runner* if it supports sort based shuffle. In order to do so, >>>>>>> we should *move SortValues transformation to sdks-java-core*, so a >>>>>>> runner can easily provide its own implementation. >>>>>>> >>>>>>> The robust implementation is needed mainly for building of HFiles >>>>>>> for the HBase bulk load. When using external sorter, we often sort the >>>>>>> whole data set twice (shuffle may already did a job). >>>>>>> >>>>>>> SortValues can not use custom comparator, because we want to be able >>>>>>> to push sorting logic down to a byte based shuffle. >>>>>>> >>>>>>> The usage of SortValues transformation is little bit confusing. I >>>>>>> think we should add a *SortValues.perKey* method, which accepts a >>>>>>> secondary key extractor and coder, as the usage would be easier to >>>>>>> understand. Also, this explicitly states, that we sort values >>>>>>> *perKey* only and that we sort using an *encoded secondary key*. >>>>>>> Example usage: >>>>>>> >>>>>>> >>>>>>> *PCollection<KV<String, Long>> input = ...;* >>>>>>> *input.apply(SortValues.perKey(KV::getValue, >>>>>>> BigEndianLongCoder.of()))* >>>>>>> >>>>>>> What do you think? Is this the right direction? >>>>>>> >>>>>>> Thanks for the comments! >>>>>>> >>>>>>> Links: >>>>>>> - >>>>>>> http://mail-archives.apache.org/mod_mbox/beam-dev/201805.mbox/%3Cl8D.1U3Hp.5IxQdKoVDzH.1R3dyk%40seznam.cz%3E >>>>>>> >>>>>>
