Hello, I want to summarize my thoughts on the per key value sorting.
Currently we have a separate module for sorting extension. The extension contains *SortValues* transformation and implementations of different sorters. Performance-wise it would be great to be able* to delegate sorting to a runner* if it supports sort based shuffle. In order to do so, we should *move SortValues transformation to sdks-java-core*, so a runner can easily provide its own implementation. The robust implementation is needed mainly for building of HFiles for the HBase bulk load. When using external sorter, we often sort the whole data set twice (shuffle may already did a job). SortValues can not use custom comparator, because we want to be able to push sorting logic down to a byte based shuffle. The usage of SortValues transformation is little bit confusing. I think we should add a *SortValues.perKey* method, which accepts a secondary key extractor and coder, as the usage would be easier to understand. Also, this explicitly states, that we sort values *perKey* only and that we sort using an *encoded secondary key*. Example usage: *PCollection<KV<String, Long>> input = ...;* *input.apply(SortValues.perKey(KV::getValue, BigEndianLongCoder.of()))* What do you think? Is this the right direction? Thanks for the comments! Links: - http://mail-archives.apache.org/mod_mbox/beam-dev/201805.mbox/%3Cl8D.1U3Hp.5IxQdKoVDzH.1R3dyk%40seznam.cz%3E
