[ 
https://issues.apache.org/jira/browse/PHOENIX-5313?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=16867135#comment-16867135
 ] 

Chinmay Kulkarni commented on PHOENIX-5313:
-------------------------------------------

Serializing the QueryPlan to send it over to the mappers will require us to add 
a Protobuf serialization for QueryPlan, but also, I don't think this is 
necessary at all.

Currently, in the driver code, when we get the InputSplits via 
PhoenixInputFormat#getSplits, this calls PhoenixInputFormat#getQueryPlan which 
not only gets the queryPlan, but also initializes it to set up the parallel 
scans. This is mandatory in case of the driver code, since this populates the 
splits and scans which we need to get the actual InputSplits for the MR job.

However, when each map task calls PhoenixInputFormat#createRecordReader, we 
call PhoenixInputFormat#getQueryPlan again (which calls queryPlan.iterator() 
again). Later, when each mapper initializes its RecordReader, we use the passed 
queryPlan but *create our own iterator* using the queryPlan (See 
[this|https://github.com/apache/phoenix/blob/master/phoenix-core/src/main/java/org/apache/phoenix/mapreduce/PhoenixRecordReader.java#L151]).
 This newly created iterator is wrapped up inside our 
[resultSet|https://github.com/apache/phoenix/blob/master/phoenix-core/src/main/java/org/apache/phoenix/mapreduce/PhoenixRecordReader.java#L159]
 which we use to [read 
fields|https://github.com/apache/phoenix/blob/master/phoenix-core/src/main/java/org/apache/phoenix/mapreduce/PhoenixRecordReader.java#L176].

*Thus, setting up the parallel scans from the queryPlan in each mapper is not 
useful at all*. "getQueryPlan" should do what it says and only get you the 
query plan, not set up iterators.

Drawing a parallel to the way Phoenix-Spark does this currently, when the 
driver code plans the input partitions (analogous to InputFormat#getSplits in 
MR), we get a queryPlan and initialize it to set up the parallel scans (See 
[this|https://github.com/apache/phoenix-connectors/blob/master/phoenix-spark/src/main/java/org/apache/phoenix/spark/datasource/v2/reader/PhoenixDataSourceReader.java#L159]),
 just like in the MR case. On the executors, when we initialize the 
PartitionReaders (analogous to RecordReader#initialize in MR), we create a 
queryPlan once again, however we do not set up the parallel scans for the 
queryPlan like we do in the MR case (See 
[this|https://github.com/apache/phoenix-connectors/blob/master/phoenix-spark/src/main/java/org/apache/phoenix/spark/datasource/v2/reader/PhoenixInputPartitionReader.java#L84-L105])
 Instead, we create a new iterator that's wrapped up in our resultSet (See 
[this|https://github.com/apache/phoenix-connectors/blob/master/phoenix-spark/src/main/java/org/apache/phoenix/spark/datasource/v2/reader/PhoenixInputPartitionReader.java#L139])
 just like we do in the MR case.

*Proposed Solution:* Overall, I don't think queryPlan generation itself is 
costly, so this can be called per mapper (we currently also call this per Spark 
executor in case of Phoenix-Spark). I'm not sure if it's worth trying to 
serialize and pass the QueryPlan to each mapper. Instead, I propose some 
refactoring in PhoenixInputFormat so that "getQueryPlan" gets you a queryPlan 
without initializing the parallel scans. Then, we can avoid this unnecessary 
step in case of mappers.

Agreed, there may be cases wherein regions have moved/split/merged and data 
locality might suffer, but we would not get wrong results 
([relocateRegion|https://github.com/apache/hbase/blob/branch-1.3/hbase-client/src/main/java/org/apache/hadoop/hbase/client/ConnectionManager.java#L1150]
 would handle these cases, besides, this can happen in case of regular HBase 
clients as well). I had an offline discussion with [~apurtell] and overall, 
MR/Spark over snapshots is a much better way forward. This would ensure 
snapshot isolation and fixed partitions (specified in the manifest), deeming 
none of these factors an issue.

Let me know what you guys think [~gjacoby] [~tdsilva] [~jamestaylor] 
[~vincentpoon]

> All mappers grab all RegionLocations from .META
> -----------------------------------------------
>
>                 Key: PHOENIX-5313
>                 URL: https://issues.apache.org/jira/browse/PHOENIX-5313
>             Project: Phoenix
>          Issue Type: Bug
>            Reporter: Geoffrey Jacoby
>            Assignee: Chinmay Kulkarni
>            Priority: Major
>
> Phoenix's MapReduce integration lives in PhoenixInputFormat. It implements 
> getSplits by calculating a QueryPlan for the provided SELECT query, and each 
> split gets a mapper. As part of this QueryPlan generation, we grab all 
> RegionLocations from .META
> In PhoenixInputFormat:getQueryPlan: 
> {code:java}
>  // Initialize the query plan so it sets up the parallel scans
>  queryPlan.iterator(MapReduceParallelScanGrouper.getInstance());
> {code}
> In MapReduceParallelScanGrouper.getRegionBoundaries()
> {code:java}
> return 
> context.getConnection().getQueryServices().getAllTableRegions(tableName);
> {code}
> This is fine.
> Unfortunately, each mapper Task spawned by the job will go through this 
> _same_ exercise. It will pass a MapReduceParallelScanGrouper to 
> queryPlan.iterator(), which I believe is eventually causing 
> getRegionBoundaries to get called when the scans are initialized in the 
> result iterator.
> Since HBase 1.x and up got rid of .META prefetching and caching within the 
> HBase client, that means that not only will each _Job_ make potentially 
> thousands of calls to .META, potentially thousands of _Tasks_ will each make 
> potentially thousands of calls to .META. 
> We should get a QueryPlan and setup the scans without having to read all 
> RegionLocations, either by using the mapper's internal knowledge of its split 
> key range, or by serializing the query plan from the client and sending it to 
> the mapper tasks for use there. 
> Note that MapReduce tasks over snapshots are not affected by this, because 
> region locations are stored in the snapshot manifest. 



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)

Reply via email to