[ 
https://issues.apache.org/jira/browse/PHOENIX-2126?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Ankit Singhal updated PHOENIX-2126:
-----------------------------------
    Attachment: PHOENIX-2126_v3.patch


thanks [~samarthjain] for reviewing the patch and reporting a bug.

Because of a below bug(mistakenly wrong variable was used), Earlier I was just 
taking advantage of max/min heap optimization but after resolving the bug below 
, I am able to take advantage of multi-threaded as well.
my overall query response time improves from *64 seconds(single thread without 
min/max heap) to 9 seconds(multiple threads with min/max heap)(7x fatster)* on 
high cardinality result.

{code}
Map<Integer, List<PeekingResultIterator>> buckets=new HashMap<Integer, 
List<PeekingResultIterator>>();
+            int k=0;
+            for(int i=0;i<iterators.size();i++){
+                if(k>=NTHREDS){ // was a bug
+                    k=0;
+                }
+                List<PeekingResultIterator> list = buckets.get(k);
+                if(list==null){
+                    list=new ArrayList<PeekingResultIterator>();
+                    buckets.put(k, list);
+                }
+                list.add(iterators.get(i));
+                k++;
+            }
{code}

[~samarthjain], As per your suggestion , I have tested with single bucket as 
well but the performance is 2-3x slower when compared to 8 buckets(8 threads) 
run.

We will never get the OOM errors as we are using *mapped buffers*, it will 
serialize to disk whenever memory reaches the threshold of 
phoenix.query.spoolThresholdBytes , This we are already using in 
OrderedResultIterator as well, as in OrderedResultIterator also we need to do 
sort in memory when sort is requested on un-ordered attribute.

PFA, updated patch with bug fix.

Regards,
Ankit Singhal

> Improving performance of merge sort by multi-threaded and minheap 
> implementation
> --------------------------------------------------------------------------------
>
>                 Key: PHOENIX-2126
>                 URL: https://issues.apache.org/jira/browse/PHOENIX-2126
>             Project: Phoenix
>          Issue Type: Improvement
>    Affects Versions: 4.1.0, 4.2.0
>            Reporter: Ankit Singhal
>         Attachments: PHOENIX-2126_v1.0.patch, PHOENIX-2126_v2.0.patch, 
> PHOENIX-2126_v3.patch
>
>
> {code}
> CREATE TABLE IF NOT EXISTS test (
> dim1 INTEGER NOT NULL,
> A.B INTEGER,
> A.M DECIMAL,
> CONSTRAINT PK PRIMARY KEY
> (dim1))
> SALT_BUCKETS =256,DEFAULT_COLUMN_FAMILY='A';
> {code}
> *Query to benchmark:-*
> {code}
> select dim1,sum(b),sum(m) from test where Datemth>=201505 and Datemth<=201505 
> and dim1 IS NOT NULL  group by dim1 order by sum(m) desc nulls last limit 10;
> {code}
> *current scenario:-*
> *CASE 1: * consider the case when dim1 is high cardinality attribute (10K+) 
> and table have salt bucket set to 256, we will get 256 iterators from above 
> query at the client and MergeSortRowKeyResultIterator has to merge these 256 
> iterators with single thread. So let's say each iterator has 10k tuples 
> returned, then merge sort needs to merge 2.5M tuples which will be costly if 
> it is done with single thread and the query spend most of its time on client
> *CASE 2: * consider the case when dim1 is high cardinality attribute (10K+) 
> and table have salt bucket set to 1, we will get 1 iterator from  above query 
> at the client and MergeSortRowKeyResultIterator doesn't need to merge 
> anything. Here, it is fine with single thread.
> *CASE 3: * consider the case when dim1 is low cardinality attribute (10-100) 
> and table have salt bucket set to 256, we will get 256 iterator from  above 
> query at the client and MergeSortRowKeyResultIterator has to merge these 256 
> iterators with single thread. here the single thread is also fine as he has 
> to merge only 2560 tuples.
> *Solution for case1 problem is:-*
> Optimized the implementation of merging 'n'-sorted iterators(having 'm' 
> tuples)  by using "min heap" which optimizes the time complexity from 
> O(n2m) to O(nmLogn) (as heapify takes (Logn) time).
> And, By using multiple-threads('t') to process group of iterators which 
> further optimized the complexity to 
> T(nm)=T(nm)/t+T(t)



--
This message was sent by Atlassian JIRA
(v6.3.4#6332)

Reply via email to