[ 
https://issues.apache.org/jira/browse/PIG-4797?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

liyunzhang_intel updated PIG-4797:
----------------------------------
    Attachment: PIG-4797_2.patch

Changes in PIG-4797_2.patch
fixing unit test failures imported from PIG-4797_1.patch
1. fixing the secondary key sort problem 
2. fixing multiquery optimization problem like TestMultiQuery# 
testMultiQueryJiraPig1108 


Here some points need to be mentioned
1. the object of this patch is to reduce unnecessary map operations in original 
code when join or group cases are encounted.
Give an join example to explain. 
{code}
cat bin/testFRJoin.pig
A = load './SkewedJoinInput1.txt' as (id,name,n);
B = load './SkewedJoinInput2.txt' as (id,name);
D = join A by (id,name), B by (id,name);
store D into './testFRJoin.out';
explain D;
{code}
before PIG-4797:
{code}
RDD lineage: (1) MapPartitionsRDD[14] at map at StoreConverter.java:80 []
 |  MapPartitionsRDD[13] at mapPartitions at ForEachConverter.java:64 []
 |  MapPartitionsRDD[12] at map at PackageConverter.java:60 []
 |  MapPartitionsRDD[11] at map at GlobalRearrangeConverter.java:107 []
 |  CoGroupedRDD[10] at CoGroupedRDD at GlobalRearrangeConverter.java:102 []
 +-(1) MapPartitionsRDD[8] at map at GlobalRearrangeConverter.java:92 []
 |  |  MapPartitionsRDD[3] at map at LocalRearrangeConverter.java:48 []
 |  |  MapPartitionsRDD[2] at mapPartitions at ForEachConverter.java:64 []
 |  |  MapPartitionsRDD[1] at map at LoadConverter.java:127 []
 |  |  NewHadoopRDD[0] at newAPIHadoopRDD at LoadConverter.java:102 []
 +-(1) MapPartitionsRDD[9] at map at GlobalRearrangeConverter.java:92 []
    |  MapPartitionsRDD[7] at map at LocalRearrangeConverter.java:48 []
    |  MapPartitionsRDD[6] at mapPartitions at ForEachConverter.java:64 []
    |  MapPartitionsRDD[5] at map at LoadConverter.java:127 []
    |  NewHadoopRDD[4] at newAPIHadoopRDD at LoadConverter.java:102 []
{code}

after PIG-4797, It reduces 2 maps(MapPartitionsRDD 8,9,11 in above RDD lineage)
{code}
RDD lineage: (1) MapPartitionsRDD[11] at map at StoreConverter.java:80 []
 |  MapPartitionsRDD[10] at mapPartitions at ForEachConverter.java:64 []
 |  MapPartitionsRDD[9] at map at JoinGroupSparkConverter.java:94 []
 |  CoGroupedRDD[8] at CoGroupedRDD at JoinGroupSparkConverter.java:89 []
 +-(1) MapPartitionsRDD[6] at map at JoinGroupSparkConverter.java:79 []
 |  |  MapPartitionsRDD[2] at mapPartitions at ForEachConverter.java:64 []
 |  |  MapPartitionsRDD[1] at map at LoadConverter.java:127 []
 |  |  NewHadoopRDD[0] at newAPIHadoopRDD at LoadConverter.java:102 []
 +-(1) MapPartitionsRDD[7] at map at JoinGroupSparkConverter.java:79 []
    |  MapPartitionsRDD[5] at mapPartitions at ForEachConverter.java:64 []
    |  MapPartitionsRDD[4] at map at LoadConverter.java:127 []
{code}

Before in LocalRearrangeConverter#LocalRearrangeFunction  we convert Tuple to 
Tuple(index,key,value) and in GlobalRearrangeConverter#ToKeyValueFunction we 
convert Tuple(index,key,value) to Tuple2<IndexedKey,value>.
After we combine these two in 
JoinGroupSparkConverter.LocalRearrangeFunction#LocalRearrangeFunction we 
convert Tuple to Tuple2<IndexedKey,value>.

Before in GlobalRearrangeConverter#ToGroupKeyValueFunction we convert 
Tuple2<IndexedKey,value> to Tuple and in PackageConverter#PackageFunction we 
convert Tuple to Tuple to get the result of join or group case by 
POPackage#getNextTuple().
After we combine these two in GroupPkgFunction#GroupPkgFunction.



2. Why can not use PlanOperator#getPredecessor(POJoinGroupSpark) to calculate 
the predecessors of POJoinGroupSpark?
 Give an example to explain this:
 before optimization:    
{noformat}
            POLOAD(scope-1)                                POLOAD(scope-2)
                           \                                   /
               POFOREach(scope-3)                              
POLocalRearrange(scope-5)
                              \                                /
                          POLocalRearrange(scope-4)       
POLocalRearrange(scope-5)
                                  \                           /
                                          POGlobalRearrange(scope-6)
                                                  |
                                          POPackage(scope-7)
{noformat}
            after optimization:
{noformat}
            POLOAD(scope-1)                                POLOAD(scope-2)
                           \                                   /
               POFOREach(scope-3)                             /
                                 \                           /
                                    POJoinGroupSpark(scope-8)
{noformat}
 the predecessor of POJoinGroupSpark(scope-8) is POForEach(scope-3) and 
POLoad(scope-2) because they are the predecessor of POLocalRearrange(scope-4) 
and POLocalRearrange(scope-5) while we will get POLoad(scope-2) and 
POForEach(scope-3) if use OperatorPlan#getPredecessor(op)to gain predecessors 
and sort predecessors.
           
3.Now PigSecondaryKeyComparatorSpark is not used in JoinGroupSparkConverter . 
We use 
JavaPairRDD#repartitionAndSortWithinPartitions(org.apache.spark.Partitioner) in 
JoinGroupSparkConverter#handleSecondarySort to resolve the secondary key 
solution.  We add IndexedKey.java#compareTo( sort by first key, then by 
secondary key). 
 Explain about secondarykey case:  in 
JoinGroupSparkConverter.LocalRearrangeFunction#LocalRearrangeFunction we 
convert Tuple to Tuple2<IndexedKey,value>. then  use 
JavaPairRDD#repartitionAndSortWithinPartitions(org.apache.spark.Partitioner) to 
sort the tuples. then use JoinGroupSparkConverter.RemoveSecondaryKey to convert 
Tuple2<IndexedKey(with firstkey,secondarykey), value> to Tuple<IndexedKey(only 
with firstkey),value>.


> Analyze JOIN performance and improve the same.
> ----------------------------------------------
>
>                 Key: PIG-4797
>                 URL: https://issues.apache.org/jira/browse/PIG-4797
>             Project: Pig
>          Issue Type: Sub-task
>          Components: spark
>            Reporter: Pallavi Rao
>            Assignee: liyunzhang_intel
>              Labels: spork
>         Attachments: Join performance analysis.pdf, PIG-4797.patch, 
> PIG-4797_2.patch
>
>
> There are a big  performance difference in join between spark and mr mode.
> {code}
> daily = load './NYSE_daily' as (exchange:chararray, symbol:chararray,
>             date:chararray, open:float, high:float, low:float,
>             close:float, volume:int, adj_close:float);
> divs  = load './NYSE_dividends' as (exchange:chararray, symbol:chararray,
>             date:chararray, dividends:float);
> jnd   = join daily by (exchange, symbol), divs by (exchange, symbol);
> store jnd into './join.out';
> {code}
> join.sh
> {code}
> mode=$1
> start=$(date +%s)
> ./pig -x $mode  $PIG_HOME/bin/join.pig
> end=$(date +%s)
> execution_time=$(( $end - $start ))
> echo "execution_time:"$excution_time
> {code}
> The execution time:
> || |||mr||spark||
> |join|20 sec|79 sec|
> You can download the test data NYSE_daily and NYSE_dividends in 
> https://github.com/alanfgates/programmingpig/blob/master/data/



--
This message was sent by Atlassian JIRA
(v6.3.4#6332)

Reply via email to