A related approach to Robert's that does not involve new types is to alter coder inference from the current:
1. Ask for a coder for type T 2. Check that the coder is (order preserving / deterministic) To: 1. Ask for an order preserving coder for T / ask for a deterministic coder for T This would allow recursive search for a list or KV coder that is order preserving. This could be implemented as a parallel code path in CoderRegistry without other changes, and invoked by transforms, even before any global changes to how coders are inferred. We'd have to be careful about pipeline upgrade compatibility. Kenn On Mon, Oct 22, 2018 at 12:40 PM David Morávek <[email protected]> wrote: > Lukasz, you are right. I didn't think about structured coders. Thanks > > On Mon, Oct 22, 2018 at 7:40 PM Lukasz Cwik <[email protected]> wrote: > >> I don't believe an interface will work because KvCoder/ListCoder/... >> would only be order preserving if their components coders were order >> preserving. >> >> On Mon, Oct 22, 2018 at 8:52 AM David Morávek <[email protected]> >> wrote: >> >>> What should be the next step? I guess we all agree that hadoop >>> dependency should be splitted out. Then we're left off with the SortValues >>> transform + in memory implementation. I'm ok with keeping this as a >>> separate module, as this would discourage users to use sorting in their >>> business logic. >>> >>> Robert: >>> ad introduction of a new method for the coders. How about creating a new >>> interface eg. *OrderPreservingCoder*? Than you can require this >>> interface in your method signature and IDE will autocomplete all of the >>> possible implementations that you can use. In case of a new method, user >>> needs to now which implementations are order preserving and it can be >>> really confusing. I think the same thinking should apply to other coder >>> properties. >>> >>> D. >>> >>> >>> >>> On Thu, Oct 18, 2018 at 12:15 PM Niel Markwick <[email protected]> wrote: >>> >>>> FYI: the BufferedExternalSorter depends on Hadoop client libraries >>>> (specifically hadoop_mapreduce_client_core and hadoop_common), but not on >>>> the Hadoop service -- because the ExternalSorter >>>> <https://github.com/apache/beam/blob/master/sdks/java/extensions/sorter/src/main/java/org/apache/beam/sdk/extensions/sorter/ExternalSorter.java> >>>> uses Hadoop's SequenceFile >>>> <http://hadoop.apache.org/docs/stable/api/index.html?org/apache/hadoop/io/SequenceFile.html> >>>> for >>>> on-disk sorting. >>>> >>>> >>>> >>>> On Thu, 18 Oct 2018 at 11:19 David Morávek <[email protected]> >>>> wrote: >>>> >>>>> Kenn, I believe we should not introduce hadoop dependency to neither >>>>> sdks or runners. We may split sorting in two packages, one with the >>>>> transformation + in memory implementation (this is the part I'd love to >>>>> see >>>>> become part of sdks-java-core) and second module with more robust external >>>>> sorter (with hadoop dep). >>>>> >>>>> Does this make sense? >>>>> >>>>> >>>>> On Thu, Oct 18, 2018 at 2:03 AM Dan Halperin <[email protected]> >>>>> wrote: >>>>> >>>>>> On Wed, Oct 17, 2018 at 3:44 PM Kenneth Knowles <[email protected]> >>>>>> wrote: >>>>>> >>>>>>> The runner can always just depend on the sorter to do it the legacy >>>>>>> way by class matching; it shouldn't incur other dependency penalties... >>>>>>> but >>>>>>> now that I look briefly, the sorter depends on Hadoop bits. That seems a >>>>>>> heavy price to pay for a user in any event. Are those Hadoop deps >>>>>>> reasonably self-contained? >>>>>>> >>>>>> >>>>>> Nice catch, Kenn! This is indeed why we didn't originally include the >>>>>> Sorter in core. The Hadoop deps have an enormous surface, or did at the >>>>>> time. >>>>>> >>>>>> Dan >>>>>> >>>>>> >>>>>>> >>>>>>> Kenn >>>>>>> >>>>>>> On Wed, Oct 17, 2018 at 2:39 PM Lukasz Cwik <[email protected]> >>>>>>> wrote: >>>>>>> >>>>>>>> Merging the sorter into sdks-java-core isn't needed for pipelines >>>>>>>> executed via portability since the Runner will be able to perform >>>>>>>> PTransform replacement and optimization based upon the URN of the >>>>>>>> transform >>>>>>>> and its payload so it would never need to have the "Sorter" class in >>>>>>>> its >>>>>>>> classpath. >>>>>>>> >>>>>>>> I'm ambivalent about whether merging it now is worth it. >>>>>>>> >>>>>>>> On Wed, Oct 17, 2018 at 2:31 PM David Morávek < >>>>>>>> [email protected]> wrote: >>>>>>>> >>>>>>>>> We can always fall back to the External sorter in case of merging >>>>>>>>> windows. I reckon in this case, values usually fit in memory, so it >>>>>>>>> would >>>>>>>>> not be an issue. >>>>>>>>> >>>>>>>>> In case of non-merging windows, runner implementation would >>>>>>>>> probably require to group elements also by window during shuffle. >>>>>>>>> >>>>>>>>> On Wed, Oct 17, 2018 at 11:10 PM Reuven Lax <[email protected]> >>>>>>>>> wrote: >>>>>>>>> >>>>>>>>>> One concern would be merging windows. This happens after shuffle, >>>>>>>>>> so even if the shuffle were sorted you would need to do a sorted >>>>>>>>>> merge of >>>>>>>>>> two sorted buffers. >>>>>>>>>> >>>>>>>>>> On Wed, Oct 17, 2018 at 2:08 PM David Morávek < >>>>>>>>>> [email protected]> wrote: >>>>>>>>>> >>>>>>>>>>> Hello, >>>>>>>>>>> >>>>>>>>>>> I want to summarize my thoughts on the per key value sorting. >>>>>>>>>>> >>>>>>>>>>> Currently we have a separate module for sorting extension. The >>>>>>>>>>> extension contains *SortValues* transformation and >>>>>>>>>>> implementations of different sorters. >>>>>>>>>>> >>>>>>>>>>> Performance-wise it would be great to be able* to delegate >>>>>>>>>>> sorting to a runner* if it supports sort based shuffle. In >>>>>>>>>>> order to do so, we should *move SortValues transformation to >>>>>>>>>>> sdks-java-core*, so a runner can easily provide its own >>>>>>>>>>> implementation. >>>>>>>>>>> >>>>>>>>>>> The robust implementation is needed mainly for building of >>>>>>>>>>> HFiles for the HBase bulk load. When using external sorter, we >>>>>>>>>>> often sort >>>>>>>>>>> the whole data set twice (shuffle may already did a job). >>>>>>>>>>> >>>>>>>>>>> SortValues can not use custom comparator, because we want to be >>>>>>>>>>> able to push sorting logic down to a byte based shuffle. >>>>>>>>>>> >>>>>>>>>>> The usage of SortValues transformation is little bit confusing. >>>>>>>>>>> I think we should add a *SortValues.perKey* method, which >>>>>>>>>>> accepts a secondary key extractor and coder, as the usage would be >>>>>>>>>>> easier >>>>>>>>>>> to understand. Also, this explicitly states, that we sort values >>>>>>>>>>> *perKey* only and that we sort using an *encoded secondary key*. >>>>>>>>>>> Example usage: >>>>>>>>>>> >>>>>>>>>>> >>>>>>>>>>> *PCollection<KV<String, Long>> input = ...;* >>>>>>>>>>> *input.apply(SortValues.perKey(KV::getValue, >>>>>>>>>>> BigEndianLongCoder.of()))* >>>>>>>>>>> >>>>>>>>>>> What do you think? Is this the right direction? >>>>>>>>>>> >>>>>>>>>>> Thanks for the comments! >>>>>>>>>>> >>>>>>>>>>> Links: >>>>>>>>>>> - >>>>>>>>>>> http://mail-archives.apache.org/mod_mbox/beam-dev/201805.mbox/%3Cl8D.1U3Hp.5IxQdKoVDzH.1R3dyk%40seznam.cz%3E >>>>>>>>>>> >>>>>>>>>>
