[ 
https://issues.apache.org/jira/browse/BEAM-12923?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Kenneth Knowles updated BEAM-12923:
-----------------------------------
    Component/s: extensions-java-sorter
                     (was: beam-community)

> Add Comparator to SortValues PTransformation
> --------------------------------------------
>
>                 Key: BEAM-12923
>                 URL: https://issues.apache.org/jira/browse/BEAM-12923
>             Project: Beam
>          Issue Type: Improvement
>          Components: extensions-java-sorter
>            Reporter: Nikhil Goyal
>            Priority: P2
>
> Code in Context:
> [https://github.com/apache/beam/tree/243128a8fc52798e1b58b0cf1a271d95ee7aa241/sdks/java/extensions/sorter/src/main/java/org/apache/beam/sdk/extensions/sorter]
>  
> Current implementation only compares serialized bytes. It would be great to 
> have a custom comparator to sort the elements. I was able to prototype a 
> solution but then hit some road blocks so decided to open this ticket to get 
> some feedback.
> *First approach*
> We add Comparator<SecondaryKey> to SortValues and propagate it down to 
> MemorySorter, ExternalSorter. This will require adding TypeParams to all the 
> classes including BufferedExternalSorter, MemorySorter, ExternalSorter, 
> Sorter Interface. Instead of creating List<KV<byte[], byte[]>> we will have 
> List<KV<KeyT, ValueT>> and it will be sorted by having comparator of KV<KeyT, 
> ValueT>
> Potential issues:
> 1) Since all the classes are public it does make this change not backward 
> compatible. If we expect users to interact only with SortValues PTransform we 
> can make this change by keeping SortValues backward compatible (If no 
> comparator is specified we fall back to default binary comparator).
> 2) Both NativeExternalSorter and MemorySorter has logic to calculate memory 
> used which can now get complicated as we are keeping deserialized objects in 
> memory. We can mitigate around it by using `Runtime.getRuntime().freeMemory` 
> before and after deserializing objects to estimate size of objects. (It is 
> possible that by the time deserialization happens GC frees up some memory and 
> we get inaccurate usage. We will have to keep a running average of the memory 
> allocated to every record or take ratio of serialized bytes to deserialized 
> objects)
> *Second approach*:
> We add Comparator<SecondaryKey> to SortValues and generate Comparator<byte[]> 
> out of it and use that instead. Small code snippet to show how the comparator 
> would look like
> private static class OrderingComparator<KeyT, ValueT> implements 
> Comparator<byte[]> {
>  private final Comparator<KV<KeyT, ValueT>> comparator;
>  private final KvCoder<Coder<KeyT>, Coder<ValueT>> kvCoder;
> @Override
>  public int compare(byte[] o1, byte[] o2) {
>  KV<KeyT, ValueT> kv1 = CoderUtils.decodeFromByteArray(kvCoder, o1);
>  KV<KeyT, ValueT> kv2 = CoderUtils.decodeFromByteArray(kvCoder, o2);
>  comparator.compare(kv1, kv2);
>  }
>  }
> Potential issues:
> 1) Sort operation is slower compared to first approach as we are serializing 
> & deserializing objects for every comparison.
> 2) Memory usage: We are allocating objects in memory inside the compare 
> method. I am not sure if they would be allocated only on the Stack (because 
> of escape analysis) or if they would be allocated in the YoungGen. Either way 
> they should get cleaned up quickly avoiding any memory issues.
> I will create a patch after getting some feedback on this.



--
This message was sent by Atlassian Jira
(v8.20.1#820001)

Reply via email to