damccorm opened a new issue, #21210:
URL: https://github.com/apache/beam/issues/21210

   Code in Context:
   
   
[https://github.com/apache/beam/tree/243128a8fc52798e1b58b0cf1a271d95ee7aa241/sdks/java/extensions/sorter/src/main/java/org/apache/beam/sdk/extensions/sorter](https://github.com/apache/beam/tree/243128a8fc52798e1b58b0cf1a271d95ee7aa241/sdks/java/extensions/sorter/src/main/java/org/apache/beam/sdk/extensions/sorter)
   
    
   Current implementation only compares serialized bytes. It would be great to 
have a custom comparator to sort the elements. I was able to prototype a 
solution but then hit some road blocks so decided to open this ticket to get 
some feedback.
   
   *First approach*
   We add Comparator<SecondaryKey\> to SortValues and propagate it down to 
MemorySorter, ExternalSorter. This will require adding TypeParams to all the 
classes including BufferedExternalSorter, MemorySorter, ExternalSorter, Sorter 
Interface. Instead of creating List<KV<byte[], byte[]\>\> we will have 
List<KV<KeyT, ValueT\>\> and it will be sorted by having comparator of KV<KeyT, 
ValueT\>
   
   Potential issues:
   1) Since all the classes are public it does make this change not backward 
compatible. If we expect users to interact only with SortValues PTransform we 
can make this change by keeping SortValues backward compatible (If no 
comparator is specified we fall back to default binary comparator).
   2) Both NativeExternalSorter and MemorySorter has logic to calculate memory 
used which can now get complicated as we are keeping deserialized objects in 
memory. We can mitigate around it by using `Runtime.getRuntime().freeMemory` 
before and after deserializing objects to estimate size of objects. (It is 
possible that by the time deserialization happens GC frees up some memory and 
we get inaccurate usage. We will have to keep a running average of the memory 
allocated to every record or take ratio of serialized bytes to deserialized 
objects)
   
   *Second approach*:
   We add Comparator<SecondaryKey\> to SortValues and generate 
Comparator<byte[]\> out of it and use that instead. Small code snippet to show 
how the comparator would look like
   
   private static class OrderingComparator<KeyT, ValueT\> implements 
Comparator<byte[]\> {
    private final Comparator<KV<KeyT, ValueT\>\> comparator;
    private final KvCoder<Coder<KeyT\>, Coder<ValueT\>\> kvCoder;
   
   @Override
    public int compare(byte[] o1, byte[] o2) {
    KV<KeyT, ValueT\> kv1 = CoderUtils.decodeFromByteArray(kvCoder, o1);
    KV<KeyT, ValueT\> kv2 = CoderUtils.decodeFromByteArray(kvCoder, o2);
    comparator.compare(kv1, kv2);
    }
    }
   
   Potential issues:
   1) Sort operation is slower compared to first approach as we are serializing 
& deserializing objects for every comparison.
   2) Memory usage: We are allocating objects in memory inside the compare 
method. I am not sure if they would be allocated only on the Stack (because of 
escape analysis) or if they would be allocated in the YoungGen. Either way they 
should get cleaned up quickly avoiding any memory issues.
   
   I will create a patch after getting some feedback on this.
   
   Imported from Jira 
[BEAM-12923](https://issues.apache.org/jira/browse/BEAM-12923). Original Jira 
may contain additional context.
   Reported by: nownikhil.


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: [email protected]

For queries about this service, please contact Infrastructure at:
[email protected]

Reply via email to