[ 
https://issues.apache.org/jira/browse/PHOENIX-180?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=14093747#comment-14093747
 ] 

James Taylor commented on PHOENIX-180:
--------------------------------------

bq. Should the stats collection be per region and per CF? Because in major 
compaction case we would be able to collect the stats per family (on a region). 
Similarly when the StatsColletor endpoint is used per region we could issue a 
scan and collect the stats per CF. So we may have to add the CF name also in 
the primary key of the table and then try to group the stats like for a 
CF/region/table - > what is the max key, min key and guide posts.
This is a very good question. My initial thought was to come up with the 
guideposts that would mimic what HBase would do if you asked it to split a 
given into N regions. How would it handle multiple column families, as at the 
end of a split, a table has a set of region boundaries that are honored across 
all column families, right? Does it just choose one of the cf as the basis for 
the region boundaries? Or does it do some kind of averaging of the region keys 
using byte arithmetic?

However, if it's easier to collect these guideposts per cf, then we may be able 
to leverage this when a query comes in, based on the cf(s) being scanned. How 
would we handle the case of multiple cf(s) being scanned, though?

bq. In case there are no stats collected during a scan using ParallelIterators 
should we use the existing way of determining the region splits?
Yes, that makes sense, I suppose. Either that, or just use the region 
boundaries and don't try to break them up further by using the Bytes.split 
method.

bq. How should the guideposts information be consumed in ParallelIterators. 
The guideposts would determine the chunk of work given to each parallel scan. 
The consumption should be entirely in this code in 
DefaultParallelIteratorRegionSplitter:
{code}
                // Both startKey and stopKey will be empty the first time
                if (Bytes.compareTo(startKey, stopKey) >= 0 || (boundaries = 
Bytes.split(startKey, stopKey, splitsPerRegion - 1)) == null) {
                    // Bytes.split may return null if the key space
                    // between start and end key is too small
                    
keyRangesPerRegion.put(region,ParallelIterators.TO_KEY_RANGE.apply(region));
                } else {
                    
keyRangesPerRegion.put(region,KeyRange.getKeyRange(lowerUnbound ? 
KeyRange.UNBOUND : boundaries[0], boundaries[1]));
                    if (boundaries.length > 1) {
                        for (int i = 1; i < boundaries.length-2; i++) {
                            
keyRangesPerRegion.put(region,KeyRange.getKeyRange(boundaries[i], true, 
boundaries[i+1], false));
                        }
                        
keyRangesPerRegion.put(region,KeyRange.getKeyRange(boundaries[boundaries.length-2],
 true, upperUnbound ? KeyRange.UNBOUND : boundaries[boundaries.length-1], 
false));
                    }
                }
{code}
Instead of using Bytes.split(), you'd retrieve the guideposts from the PTable 
for the given region. I don't think we'd need a target and max concurrency 
configuration any longer, as these guideposts would form the "minimum unit of 
work".

> Use stats to guide query parallelization
> ----------------------------------------
>
>                 Key: PHOENIX-180
>                 URL: https://issues.apache.org/jira/browse/PHOENIX-180
>             Project: Phoenix
>          Issue Type: Task
>            Reporter: James Taylor
>            Assignee: ramkrishna.s.vasudevan
>              Labels: enhancement
>
> We're currently not using stats, beyond a table-wide min key/max key cached 
> per client connection, to guide parallelization. If a query targets just a 
> few regions, we don't know how to evenly divide the work among threads, 
> because we don't know the data distribution. This other [issue] 
> (https://github.com/forcedotcom/phoenix/issues/64) is targeting gather and 
> maintaining the stats, while this issue is focused on using the stats.
> The main changes are:
> 1. Create a PTableStats interface that encapsulates the stats information 
> (and implements the Writable interface so that it can be serialized back from 
> the server).
> 2. Add a stats member variable off of PTable to hold this.
> 3. From MetaDataEndPointImpl, lookup the stats row for the table in the stats 
> table. If the stats have changed, return a new PTable with the updated stats 
> information. We may want to cache the stats row and have the stats gatherer 
> invalidate the cache row when updated so we don't have to always do a scan 
> for it. Additionally, it would be idea if we could use the same split policy 
> on the stats table that we use on the system table to guarantee co-location 
> of data (for the sake of caching).
> - modify the client-side parallelization (ParallelIterators.getSplits()) to 
> use this information to guide how to chunk up the scans at query time.
> This should help boost query performance, especially in cases where the data 
> is highly skewed. It's likely the cause for the slowness reported in this 
> issue: https://github.com/forcedotcom/phoenix/issues/47.



--
This message was sent by Atlassian JIRA
(v6.2#6252)

Reply via email to