[
https://issues.apache.org/jira/browse/BEAM-12923?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=17476321#comment-17476321
]
Kenneth Knowles commented on BEAM-12923:
----------------------------------------
I believe there has been some discussion on the dev list about comparators and
sorting values. Emailing [email protected] and/or browsing the archives at
https://lists.apache.org/[email protected] are the best way to
make progress on this, I think.
> Add Comparator to SortValues PTransformation
> --------------------------------------------
>
> Key: BEAM-12923
> URL: https://issues.apache.org/jira/browse/BEAM-12923
> Project: Beam
> Issue Type: Improvement
> Components: extensions-java-sorter
> Reporter: Nikhil Goyal
> Priority: P2
>
> Code in Context:
> [https://github.com/apache/beam/tree/243128a8fc52798e1b58b0cf1a271d95ee7aa241/sdks/java/extensions/sorter/src/main/java/org/apache/beam/sdk/extensions/sorter]
>
> Current implementation only compares serialized bytes. It would be great to
> have a custom comparator to sort the elements. I was able to prototype a
> solution but then hit some road blocks so decided to open this ticket to get
> some feedback.
> *First approach*
> We add Comparator<SecondaryKey> to SortValues and propagate it down to
> MemorySorter, ExternalSorter. This will require adding TypeParams to all the
> classes including BufferedExternalSorter, MemorySorter, ExternalSorter,
> Sorter Interface. Instead of creating List<KV<byte[], byte[]>> we will have
> List<KV<KeyT, ValueT>> and it will be sorted by having comparator of KV<KeyT,
> ValueT>
> Potential issues:
> 1) Since all the classes are public it does make this change not backward
> compatible. If we expect users to interact only with SortValues PTransform we
> can make this change by keeping SortValues backward compatible (If no
> comparator is specified we fall back to default binary comparator).
> 2) Both NativeExternalSorter and MemorySorter has logic to calculate memory
> used which can now get complicated as we are keeping deserialized objects in
> memory. We can mitigate around it by using `Runtime.getRuntime().freeMemory`
> before and after deserializing objects to estimate size of objects. (It is
> possible that by the time deserialization happens GC frees up some memory and
> we get inaccurate usage. We will have to keep a running average of the memory
> allocated to every record or take ratio of serialized bytes to deserialized
> objects)
> *Second approach*:
> We add Comparator<SecondaryKey> to SortValues and generate Comparator<byte[]>
> out of it and use that instead. Small code snippet to show how the comparator
> would look like
> private static class OrderingComparator<KeyT, ValueT> implements
> Comparator<byte[]> {
> private final Comparator<KV<KeyT, ValueT>> comparator;
> private final KvCoder<Coder<KeyT>, Coder<ValueT>> kvCoder;
> @Override
> public int compare(byte[] o1, byte[] o2) {
> KV<KeyT, ValueT> kv1 = CoderUtils.decodeFromByteArray(kvCoder, o1);
> KV<KeyT, ValueT> kv2 = CoderUtils.decodeFromByteArray(kvCoder, o2);
> comparator.compare(kv1, kv2);
> }
> }
> Potential issues:
> 1) Sort operation is slower compared to first approach as we are serializing
> & deserializing objects for every comparison.
> 2) Memory usage: We are allocating objects in memory inside the compare
> method. I am not sure if they would be allocated only on the Stack (because
> of escape analysis) or if they would be allocated in the YoungGen. Either way
> they should get cleaned up quickly avoiding any memory issues.
> I will create a patch after getting some feedback on this.
--
This message was sent by Atlassian Jira
(v8.20.1#820001)