I like the idea of asking for a coder for T with properties X. (E.g. the
order-preserving one may not be the the most efficient, so a poor default,
but required in some cases.)

Note that if we go the route of secondary-key-extraction, we don't even
need a full coder here, just an order-preserving encoding. (This has, as
mentioned, the disadvantage of shuffling possible redundancy between the
order-providing key and the actual value).

On Mon, Oct 22, 2018 at 9:46 PM Kenneth Knowles <[email protected]> wrote:

> A related approach to Robert's that does not involve new types is to alter
> coder inference from the current:
>
> 1. Ask for a coder for type T
> 2. Check that the coder is (order preserving / deterministic)
>
> To:
>
> 1. Ask for an order preserving coder for T / ask for a deterministic coder
> for T
>
> This would allow recursive search for a list or KV coder that is order
> preserving. This could be implemented as a parallel code path in
> CoderRegistry without other changes, and invoked by transforms, even before
> any global changes to how coders are inferred. We'd have to be careful
> about pipeline upgrade compatibility.
>
> Kenn
>
> On Mon, Oct 22, 2018 at 12:40 PM David Morávek <[email protected]>
> wrote:
>
>> Lukasz, you are right. I didn't think about structured coders. Thanks
>>
>> On Mon, Oct 22, 2018 at 7:40 PM Lukasz Cwik <[email protected]> wrote:
>>
>>> I don't believe an interface will work because KvCoder/ListCoder/...
>>> would only be order preserving if their components coders were order
>>> preserving.
>>>
>>> On Mon, Oct 22, 2018 at 8:52 AM David Morávek <[email protected]>
>>> wrote:
>>>
>>>> What should be the next step? I guess we all agree that hadoop
>>>> dependency should be splitted out. Then we're left off with the SortValues
>>>> transform + in memory implementation. I'm ok with keeping this as a
>>>> separate module, as this would discourage users to use sorting in their
>>>> business logic.
>>>>
>>>> Robert:
>>>> ad introduction of a new method for the coders. How about creating a
>>>> new interface eg. *OrderPreservingCoder*? Than you can require this
>>>> interface in your method signature and IDE will autocomplete all of the
>>>> possible implementations that you can use. In case of a new method, user
>>>> needs to now which implementations are order preserving and it can be
>>>> really confusing. I think the same thinking should apply to other coder
>>>> properties.
>>>>
>>>> D.
>>>>
>>>>
>>>>
>>>> On Thu, Oct 18, 2018 at 12:15 PM Niel Markwick <[email protected]>
>>>> wrote:
>>>>
>>>>> FYI: the BufferedExternalSorter depends on Hadoop client libraries
>>>>> (specifically hadoop_mapreduce_client_core and hadoop_common), but not on
>>>>> the Hadoop service -- because the  ExternalSorter
>>>>> <https://github.com/apache/beam/blob/master/sdks/java/extensions/sorter/src/main/java/org/apache/beam/sdk/extensions/sorter/ExternalSorter.java>
>>>>> uses Hadoop's SequenceFile
>>>>> <http://hadoop.apache.org/docs/stable/api/index.html?org/apache/hadoop/io/SequenceFile.html>
>>>>>  for
>>>>> on-disk sorting.
>>>>>
>>>>>
>>>>>
>>>>> On Thu, 18 Oct 2018 at 11:19 David Morávek <[email protected]>
>>>>> wrote:
>>>>>
>>>>>> Kenn, I believe we should not introduce hadoop dependency to neither
>>>>>> sdks or runners. We may split sorting in two packages, one with the
>>>>>> transformation + in memory implementation (this is the part I'd love to 
>>>>>> see
>>>>>> become part of sdks-java-core) and second module with more robust 
>>>>>> external
>>>>>> sorter (with hadoop dep).
>>>>>>
>>>>>> Does this make sense?
>>>>>>
>>>>>>
>>>>>> On Thu, Oct 18, 2018 at 2:03 AM Dan Halperin <[email protected]>
>>>>>> wrote:
>>>>>>
>>>>>>> On Wed, Oct 17, 2018 at 3:44 PM Kenneth Knowles <[email protected]>
>>>>>>> wrote:
>>>>>>>
>>>>>>>> The runner can always just depend on the sorter to do it the legacy
>>>>>>>> way by class matching; it shouldn't incur other dependency 
>>>>>>>> penalties... but
>>>>>>>> now that I look briefly, the sorter depends on Hadoop bits. That seems 
>>>>>>>> a
>>>>>>>> heavy price to pay for a user in any event. Are those Hadoop deps
>>>>>>>> reasonably self-contained?
>>>>>>>>
>>>>>>>
>>>>>>> Nice catch, Kenn! This is indeed why we didn't originally include
>>>>>>> the Sorter in core. The Hadoop deps have an enormous surface, or did at 
>>>>>>> the
>>>>>>> time.
>>>>>>>
>>>>>>> Dan
>>>>>>>
>>>>>>>
>>>>>>>>
>>>>>>>> Kenn
>>>>>>>>
>>>>>>>> On Wed, Oct 17, 2018 at 2:39 PM Lukasz Cwik <[email protected]>
>>>>>>>> wrote:
>>>>>>>>
>>>>>>>>> Merging the sorter into sdks-java-core isn't needed for pipelines
>>>>>>>>> executed via portability since the Runner will be able to perform
>>>>>>>>> PTransform replacement and optimization based upon the URN of the 
>>>>>>>>> transform
>>>>>>>>> and its payload so it would never need to have the "Sorter" class in 
>>>>>>>>> its
>>>>>>>>> classpath.
>>>>>>>>>
>>>>>>>>> I'm ambivalent about whether merging it now is worth it.
>>>>>>>>>
>>>>>>>>> On Wed, Oct 17, 2018 at 2:31 PM David Morávek <
>>>>>>>>> [email protected]> wrote:
>>>>>>>>>
>>>>>>>>>> We can always fall back to the External sorter in case of merging
>>>>>>>>>> windows. I reckon in this case, values usually fit in memory, so it 
>>>>>>>>>> would
>>>>>>>>>> not be an issue.
>>>>>>>>>>
>>>>>>>>>> In case of non-merging windows, runner implementation would
>>>>>>>>>> probably require to group elements also by window during shuffle.
>>>>>>>>>>
>>>>>>>>>> On Wed, Oct 17, 2018 at 11:10 PM Reuven Lax <[email protected]>
>>>>>>>>>> wrote:
>>>>>>>>>>
>>>>>>>>>>> One concern would be merging windows. This happens after
>>>>>>>>>>> shuffle, so even if the shuffle were sorted you would need to do a 
>>>>>>>>>>> sorted
>>>>>>>>>>> merge of two sorted buffers.
>>>>>>>>>>>
>>>>>>>>>>> On Wed, Oct 17, 2018 at 2:08 PM David Morávek <
>>>>>>>>>>> [email protected]> wrote:
>>>>>>>>>>>
>>>>>>>>>>>> Hello,
>>>>>>>>>>>>
>>>>>>>>>>>> I want to summarize my thoughts on the per key value sorting.
>>>>>>>>>>>>
>>>>>>>>>>>> Currently we have a separate module for sorting extension. The
>>>>>>>>>>>> extension contains *SortValues* transformation and
>>>>>>>>>>>> implementations of different sorters.
>>>>>>>>>>>>
>>>>>>>>>>>> Performance-wise it would be great to be able* to delegate
>>>>>>>>>>>> sorting to a runner* if it supports sort based shuffle. In
>>>>>>>>>>>> order to do so, we should *move SortValues transformation to
>>>>>>>>>>>> sdks-java-core*, so a runner can easily provide its own
>>>>>>>>>>>> implementation.
>>>>>>>>>>>>
>>>>>>>>>>>> The robust implementation is needed mainly for building of
>>>>>>>>>>>> HFiles for the HBase bulk load. When using external sorter, we 
>>>>>>>>>>>> often sort
>>>>>>>>>>>> the whole data set twice (shuffle may already did a job).
>>>>>>>>>>>>
>>>>>>>>>>>> SortValues can not use custom comparator, because we want to be
>>>>>>>>>>>> able to push sorting logic down to a byte based shuffle.
>>>>>>>>>>>>
>>>>>>>>>>>> The usage of SortValues transformation is little bit confusing.
>>>>>>>>>>>> I think we should add a *SortValues.perKey* method, which
>>>>>>>>>>>> accepts a secondary key extractor and coder, as the usage would be 
>>>>>>>>>>>> easier
>>>>>>>>>>>> to understand. Also, this explicitly states, that we sort values
>>>>>>>>>>>> *perKey* only and that we sort using an *encoded secondary key*.
>>>>>>>>>>>> Example usage:
>>>>>>>>>>>>
>>>>>>>>>>>>
>>>>>>>>>>>> *PCollection<KV<String, Long>> input = ...;*
>>>>>>>>>>>> *input.apply(SortValues.perKey(KV::getValue,
>>>>>>>>>>>> BigEndianLongCoder.of()))*
>>>>>>>>>>>>
>>>>>>>>>>>> What do you think? Is this the right direction?
>>>>>>>>>>>>
>>>>>>>>>>>> Thanks for the comments!
>>>>>>>>>>>>
>>>>>>>>>>>> Links:
>>>>>>>>>>>> -
>>>>>>>>>>>> http://mail-archives.apache.org/mod_mbox/beam-dev/201805.mbox/%3Cl8D.1U3Hp.5IxQdKoVDzH.1R3dyk%40seznam.cz%3E
>>>>>>>>>>>>
>>>>>>>>>>>

Reply via email to