[ 
https://issues.apache.org/jira/browse/PHOENIX-2126?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=14903978#comment-14903978
 ] 

Samarth Jain commented on PHOENIX-2126:
---------------------------------------

[~ankit.singhal] - Have you experimented with using a single heap instead of 
creating buckets.size() heaps? It would be good to understand what benefit you 
are getting out of using n threads operating on n buckets vs a single thread 
picking up the next iterator using a min/max heap.
For K salt buckets, with each bucket returning N tuples, you would do O(NlogK) 
comparisons to sort.

I think the real benefit you are deriving from doing things in multiple threads 
is because you are pulling everything from HBase to client memory and sorting 
each bucket in parallel. However, that won't scale when there are large number 
of rows returned which might cause client to go OOM.

Also, it looks like your prepareIterators method is assigning every iterator to 
bucket 0 once the index i >= nThreads. Why not just assign the iterator to the 
next bucket? Probably better to do k = k % nThreads?
{code}
Map<Integer, List<PeekingResultIterator>> buckets=new HashMap<Integer, 
List<PeekingResultIterator>>();
+            int k=0;
+            for(int i=0;i<iterators.size();i++){
+                if(i>=NTHREDS){
+                    k=0;
+                }
+                List<PeekingResultIterator> list = buckets.get(k);
+                if(list==null){
+                    list=new ArrayList<PeekingResultIterator>();
+                    buckets.put(k, list);
+                }
+                list.add(iterators.get(i));
+                k++;
+            }
{code}

Minor nit: Rename NTHREDS to nThreads.

> Improving performance of merge sort by multi-threaded and minheap 
> implementation
> --------------------------------------------------------------------------------
>
>                 Key: PHOENIX-2126
>                 URL: https://issues.apache.org/jira/browse/PHOENIX-2126
>             Project: Phoenix
>          Issue Type: Improvement
>    Affects Versions: 4.1.0, 4.2.0
>            Reporter: Ankit Singhal
>         Attachments: PHOENIX-2126_v1.0.patch, PHOENIX-2126_v2.0.patch
>
>
> {code}
> CREATE TABLE IF NOT EXISTS test (
> dim1 INTEGER NOT NULL,
> A.B INTEGER,
> A.M DECIMAL,
> CONSTRAINT PK PRIMARY KEY
> (dim1))
> SALT_BUCKETS =256,DEFAULT_COLUMN_FAMILY='A';
> {code}
> *Query to benchmark:-*
> {code}
> select dim1,sum(b),sum(m) from test where Datemth>=201505 and Datemth<=201505 
> and dim1 IS NOT NULL  group by dim1 order by sum(m) desc nulls last limit 10;
> {code}
> *current scenario:-*
> *CASE 1: * consider the case when dim1 is high cardinality attribute (10K+) 
> and table have salt bucket set to 256, we will get 256 iterators from above 
> query at the client and MergeSortRowKeyResultIterator has to merge these 256 
> iterators with single thread. So let's say each iterator has 10k tuples 
> returned, then merge sort needs to merge 2.5M tuples which will be costly if 
> it is done with single thread and the query spend most of its time on client
> *CASE 2: * consider the case when dim1 is high cardinality attribute (10K+) 
> and table have salt bucket set to 1, we will get 1 iterator from  above query 
> at the client and MergeSortRowKeyResultIterator doesn't need to merge 
> anything. Here, it is fine with single thread.
> *CASE 3: * consider the case when dim1 is low cardinality attribute (10-100) 
> and table have salt bucket set to 256, we will get 256 iterator from  above 
> query at the client and MergeSortRowKeyResultIterator has to merge these 256 
> iterators with single thread. here the single thread is also fine as he has 
> to merge only 2560 tuples.
> *Solution for case1 problem is:-*
> Optimized the implementation of merging 'n'-sorted iterators(having 'm' 
> tuples)  by using "min heap" which optimizes the time complexity from 
> O(n2m) to O(nmLogn) (as heapify takes (Logn) time).
> And, By using multiple-threads('t') to process group of iterators which 
> further optimized the complexity to 
> T(nm)=T(nm)/t+T(t)



--
This message was sent by Atlassian JIRA
(v6.3.4#6332)

Reply via email to