One concern would be merging windows. This happens after shuffle, so even if the shuffle were sorted you would need to do a sorted merge of two sorted buffers.
On Wed, Oct 17, 2018 at 2:08 PM David Morávek <[email protected]> wrote: > Hello, > > I want to summarize my thoughts on the per key value sorting. > > Currently we have a separate module for sorting extension. The extension > contains *SortValues* transformation and implementations of different > sorters. > > Performance-wise it would be great to be able* to delegate sorting to a > runner* if it supports sort based shuffle. In order to do so, we should *move > SortValues transformation to sdks-java-core*, so a runner can easily > provide its own implementation. > > The robust implementation is needed mainly for building of HFiles for the > HBase bulk load. When using external sorter, we often sort the whole data > set twice (shuffle may already did a job). > > SortValues can not use custom comparator, because we want to be able to > push sorting logic down to a byte based shuffle. > > The usage of SortValues transformation is little bit confusing. I think we > should add a *SortValues.perKey* method, which accepts a secondary key > extractor and coder, as the usage would be easier to understand. Also, this > explicitly states, that we sort values *perKey* only and that we sort > using an *encoded secondary key*. Example usage: > > > *PCollection<KV<String, Long>> input = ...;* > *input.apply(SortValues.perKey(KV::getValue, BigEndianLongCoder.of()))* > > What do you think? Is this the right direction? > > Thanks for the comments! > > Links: > - > http://mail-archives.apache.org/mod_mbox/beam-dev/201805.mbox/%3Cl8D.1U3Hp.5IxQdKoVDzH.1R3dyk%40seznam.cz%3E >
