I like the idea of asking for a coder for T with properties X. (E.g. the order-preserving one may not be the the most efficient, so a poor default, but required in some cases.)
Note that if we go the route of secondary-key-extraction, we don't even need a full coder here, just an order-preserving encoding. (This has, as mentioned, the disadvantage of shuffling possible redundancy between the order-providing key and the actual value). On Mon, Oct 22, 2018 at 9:46 PM Kenneth Knowles <[email protected]> wrote: > A related approach to Robert's that does not involve new types is to alter > coder inference from the current: > > 1. Ask for a coder for type T > 2. Check that the coder is (order preserving / deterministic) > > To: > > 1. Ask for an order preserving coder for T / ask for a deterministic coder > for T > > This would allow recursive search for a list or KV coder that is order > preserving. This could be implemented as a parallel code path in > CoderRegistry without other changes, and invoked by transforms, even before > any global changes to how coders are inferred. We'd have to be careful > about pipeline upgrade compatibility. > > Kenn > > On Mon, Oct 22, 2018 at 12:40 PM David Morávek <[email protected]> > wrote: > >> Lukasz, you are right. I didn't think about structured coders. Thanks >> >> On Mon, Oct 22, 2018 at 7:40 PM Lukasz Cwik <[email protected]> wrote: >> >>> I don't believe an interface will work because KvCoder/ListCoder/... >>> would only be order preserving if their components coders were order >>> preserving. >>> >>> On Mon, Oct 22, 2018 at 8:52 AM David Morávek <[email protected]> >>> wrote: >>> >>>> What should be the next step? I guess we all agree that hadoop >>>> dependency should be splitted out. Then we're left off with the SortValues >>>> transform + in memory implementation. I'm ok with keeping this as a >>>> separate module, as this would discourage users to use sorting in their >>>> business logic. >>>> >>>> Robert: >>>> ad introduction of a new method for the coders. How about creating a >>>> new interface eg. *OrderPreservingCoder*? Than you can require this >>>> interface in your method signature and IDE will autocomplete all of the >>>> possible implementations that you can use. In case of a new method, user >>>> needs to now which implementations are order preserving and it can be >>>> really confusing. I think the same thinking should apply to other coder >>>> properties. >>>> >>>> D. >>>> >>>> >>>> >>>> On Thu, Oct 18, 2018 at 12:15 PM Niel Markwick <[email protected]> >>>> wrote: >>>> >>>>> FYI: the BufferedExternalSorter depends on Hadoop client libraries >>>>> (specifically hadoop_mapreduce_client_core and hadoop_common), but not on >>>>> the Hadoop service -- because the ExternalSorter >>>>> <https://github.com/apache/beam/blob/master/sdks/java/extensions/sorter/src/main/java/org/apache/beam/sdk/extensions/sorter/ExternalSorter.java> >>>>> uses Hadoop's SequenceFile >>>>> <http://hadoop.apache.org/docs/stable/api/index.html?org/apache/hadoop/io/SequenceFile.html> >>>>> for >>>>> on-disk sorting. >>>>> >>>>> >>>>> >>>>> On Thu, 18 Oct 2018 at 11:19 David Morávek <[email protected]> >>>>> wrote: >>>>> >>>>>> Kenn, I believe we should not introduce hadoop dependency to neither >>>>>> sdks or runners. We may split sorting in two packages, one with the >>>>>> transformation + in memory implementation (this is the part I'd love to >>>>>> see >>>>>> become part of sdks-java-core) and second module with more robust >>>>>> external >>>>>> sorter (with hadoop dep). >>>>>> >>>>>> Does this make sense? >>>>>> >>>>>> >>>>>> On Thu, Oct 18, 2018 at 2:03 AM Dan Halperin <[email protected]> >>>>>> wrote: >>>>>> >>>>>>> On Wed, Oct 17, 2018 at 3:44 PM Kenneth Knowles <[email protected]> >>>>>>> wrote: >>>>>>> >>>>>>>> The runner can always just depend on the sorter to do it the legacy >>>>>>>> way by class matching; it shouldn't incur other dependency >>>>>>>> penalties... but >>>>>>>> now that I look briefly, the sorter depends on Hadoop bits. That seems >>>>>>>> a >>>>>>>> heavy price to pay for a user in any event. Are those Hadoop deps >>>>>>>> reasonably self-contained? >>>>>>>> >>>>>>> >>>>>>> Nice catch, Kenn! This is indeed why we didn't originally include >>>>>>> the Sorter in core. The Hadoop deps have an enormous surface, or did at >>>>>>> the >>>>>>> time. >>>>>>> >>>>>>> Dan >>>>>>> >>>>>>> >>>>>>>> >>>>>>>> Kenn >>>>>>>> >>>>>>>> On Wed, Oct 17, 2018 at 2:39 PM Lukasz Cwik <[email protected]> >>>>>>>> wrote: >>>>>>>> >>>>>>>>> Merging the sorter into sdks-java-core isn't needed for pipelines >>>>>>>>> executed via portability since the Runner will be able to perform >>>>>>>>> PTransform replacement and optimization based upon the URN of the >>>>>>>>> transform >>>>>>>>> and its payload so it would never need to have the "Sorter" class in >>>>>>>>> its >>>>>>>>> classpath. >>>>>>>>> >>>>>>>>> I'm ambivalent about whether merging it now is worth it. >>>>>>>>> >>>>>>>>> On Wed, Oct 17, 2018 at 2:31 PM David Morávek < >>>>>>>>> [email protected]> wrote: >>>>>>>>> >>>>>>>>>> We can always fall back to the External sorter in case of merging >>>>>>>>>> windows. I reckon in this case, values usually fit in memory, so it >>>>>>>>>> would >>>>>>>>>> not be an issue. >>>>>>>>>> >>>>>>>>>> In case of non-merging windows, runner implementation would >>>>>>>>>> probably require to group elements also by window during shuffle. >>>>>>>>>> >>>>>>>>>> On Wed, Oct 17, 2018 at 11:10 PM Reuven Lax <[email protected]> >>>>>>>>>> wrote: >>>>>>>>>> >>>>>>>>>>> One concern would be merging windows. This happens after >>>>>>>>>>> shuffle, so even if the shuffle were sorted you would need to do a >>>>>>>>>>> sorted >>>>>>>>>>> merge of two sorted buffers. >>>>>>>>>>> >>>>>>>>>>> On Wed, Oct 17, 2018 at 2:08 PM David Morávek < >>>>>>>>>>> [email protected]> wrote: >>>>>>>>>>> >>>>>>>>>>>> Hello, >>>>>>>>>>>> >>>>>>>>>>>> I want to summarize my thoughts on the per key value sorting. >>>>>>>>>>>> >>>>>>>>>>>> Currently we have a separate module for sorting extension. The >>>>>>>>>>>> extension contains *SortValues* transformation and >>>>>>>>>>>> implementations of different sorters. >>>>>>>>>>>> >>>>>>>>>>>> Performance-wise it would be great to be able* to delegate >>>>>>>>>>>> sorting to a runner* if it supports sort based shuffle. In >>>>>>>>>>>> order to do so, we should *move SortValues transformation to >>>>>>>>>>>> sdks-java-core*, so a runner can easily provide its own >>>>>>>>>>>> implementation. >>>>>>>>>>>> >>>>>>>>>>>> The robust implementation is needed mainly for building of >>>>>>>>>>>> HFiles for the HBase bulk load. When using external sorter, we >>>>>>>>>>>> often sort >>>>>>>>>>>> the whole data set twice (shuffle may already did a job). >>>>>>>>>>>> >>>>>>>>>>>> SortValues can not use custom comparator, because we want to be >>>>>>>>>>>> able to push sorting logic down to a byte based shuffle. >>>>>>>>>>>> >>>>>>>>>>>> The usage of SortValues transformation is little bit confusing. >>>>>>>>>>>> I think we should add a *SortValues.perKey* method, which >>>>>>>>>>>> accepts a secondary key extractor and coder, as the usage would be >>>>>>>>>>>> easier >>>>>>>>>>>> to understand. Also, this explicitly states, that we sort values >>>>>>>>>>>> *perKey* only and that we sort using an *encoded secondary key*. >>>>>>>>>>>> Example usage: >>>>>>>>>>>> >>>>>>>>>>>> >>>>>>>>>>>> *PCollection<KV<String, Long>> input = ...;* >>>>>>>>>>>> *input.apply(SortValues.perKey(KV::getValue, >>>>>>>>>>>> BigEndianLongCoder.of()))* >>>>>>>>>>>> >>>>>>>>>>>> What do you think? Is this the right direction? >>>>>>>>>>>> >>>>>>>>>>>> Thanks for the comments! >>>>>>>>>>>> >>>>>>>>>>>> Links: >>>>>>>>>>>> - >>>>>>>>>>>> http://mail-archives.apache.org/mod_mbox/beam-dev/201805.mbox/%3Cl8D.1U3Hp.5IxQdKoVDzH.1R3dyk%40seznam.cz%3E >>>>>>>>>>>> >>>>>>>>>>>
