[
https://issues.apache.org/jira/browse/PIG-4797?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
]
liyunzhang_intel updated PIG-4797:
----------------------------------
Attachment: PIG-4797_2.patch
Changes in PIG-4797_2.patch
fixing unit test failures imported from PIG-4797_1.patch
1. fixing the secondary key sort problem
2. fixing multiquery optimization problem like TestMultiQuery#
testMultiQueryJiraPig1108
Here some points need to be mentioned
1. the object of this patch is to reduce unnecessary map operations in original
code when join or group cases are encounted.
Give an join example to explain.
{code}
cat bin/testFRJoin.pig
A = load './SkewedJoinInput1.txt' as (id,name,n);
B = load './SkewedJoinInput2.txt' as (id,name);
D = join A by (id,name), B by (id,name);
store D into './testFRJoin.out';
explain D;
{code}
before PIG-4797:
{code}
RDD lineage: (1) MapPartitionsRDD[14] at map at StoreConverter.java:80 []
| MapPartitionsRDD[13] at mapPartitions at ForEachConverter.java:64 []
| MapPartitionsRDD[12] at map at PackageConverter.java:60 []
| MapPartitionsRDD[11] at map at GlobalRearrangeConverter.java:107 []
| CoGroupedRDD[10] at CoGroupedRDD at GlobalRearrangeConverter.java:102 []
+-(1) MapPartitionsRDD[8] at map at GlobalRearrangeConverter.java:92 []
| | MapPartitionsRDD[3] at map at LocalRearrangeConverter.java:48 []
| | MapPartitionsRDD[2] at mapPartitions at ForEachConverter.java:64 []
| | MapPartitionsRDD[1] at map at LoadConverter.java:127 []
| | NewHadoopRDD[0] at newAPIHadoopRDD at LoadConverter.java:102 []
+-(1) MapPartitionsRDD[9] at map at GlobalRearrangeConverter.java:92 []
| MapPartitionsRDD[7] at map at LocalRearrangeConverter.java:48 []
| MapPartitionsRDD[6] at mapPartitions at ForEachConverter.java:64 []
| MapPartitionsRDD[5] at map at LoadConverter.java:127 []
| NewHadoopRDD[4] at newAPIHadoopRDD at LoadConverter.java:102 []
{code}
after PIG-4797, It reduces 2 maps(MapPartitionsRDD 8,9,11 in above RDD lineage)
{code}
RDD lineage: (1) MapPartitionsRDD[11] at map at StoreConverter.java:80 []
| MapPartitionsRDD[10] at mapPartitions at ForEachConverter.java:64 []
| MapPartitionsRDD[9] at map at JoinGroupSparkConverter.java:94 []
| CoGroupedRDD[8] at CoGroupedRDD at JoinGroupSparkConverter.java:89 []
+-(1) MapPartitionsRDD[6] at map at JoinGroupSparkConverter.java:79 []
| | MapPartitionsRDD[2] at mapPartitions at ForEachConverter.java:64 []
| | MapPartitionsRDD[1] at map at LoadConverter.java:127 []
| | NewHadoopRDD[0] at newAPIHadoopRDD at LoadConverter.java:102 []
+-(1) MapPartitionsRDD[7] at map at JoinGroupSparkConverter.java:79 []
| MapPartitionsRDD[5] at mapPartitions at ForEachConverter.java:64 []
| MapPartitionsRDD[4] at map at LoadConverter.java:127 []
{code}
Before in LocalRearrangeConverter#LocalRearrangeFunction we convert Tuple to
Tuple(index,key,value) and in GlobalRearrangeConverter#ToKeyValueFunction we
convert Tuple(index,key,value) to Tuple2<IndexedKey,value>.
After we combine these two in
JoinGroupSparkConverter.LocalRearrangeFunction#LocalRearrangeFunction we
convert Tuple to Tuple2<IndexedKey,value>.
Before in GlobalRearrangeConverter#ToGroupKeyValueFunction we convert
Tuple2<IndexedKey,value> to Tuple and in PackageConverter#PackageFunction we
convert Tuple to Tuple to get the result of join or group case by
POPackage#getNextTuple().
After we combine these two in GroupPkgFunction#GroupPkgFunction.
2. Why can not use PlanOperator#getPredecessor(POJoinGroupSpark) to calculate
the predecessors of POJoinGroupSpark?
Give an example to explain this:
before optimization:
{noformat}
POLOAD(scope-1) POLOAD(scope-2)
\ /
POFOREach(scope-3)
POLocalRearrange(scope-5)
\ /
POLocalRearrange(scope-4)
POLocalRearrange(scope-5)
\ /
POGlobalRearrange(scope-6)
|
POPackage(scope-7)
{noformat}
after optimization:
{noformat}
POLOAD(scope-1) POLOAD(scope-2)
\ /
POFOREach(scope-3) /
\ /
POJoinGroupSpark(scope-8)
{noformat}
the predecessor of POJoinGroupSpark(scope-8) is POForEach(scope-3) and
POLoad(scope-2) because they are the predecessor of POLocalRearrange(scope-4)
and POLocalRearrange(scope-5) while we will get POLoad(scope-2) and
POForEach(scope-3) if use OperatorPlan#getPredecessor(op)to gain predecessors
and sort predecessors.
3.Now PigSecondaryKeyComparatorSpark is not used in JoinGroupSparkConverter .
We use
JavaPairRDD#repartitionAndSortWithinPartitions(org.apache.spark.Partitioner) in
JoinGroupSparkConverter#handleSecondarySort to resolve the secondary key
solution. We add IndexedKey.java#compareTo( sort by first key, then by
secondary key).
Explain about secondarykey case: in
JoinGroupSparkConverter.LocalRearrangeFunction#LocalRearrangeFunction we
convert Tuple to Tuple2<IndexedKey,value>. then use
JavaPairRDD#repartitionAndSortWithinPartitions(org.apache.spark.Partitioner) to
sort the tuples. then use JoinGroupSparkConverter.RemoveSecondaryKey to convert
Tuple2<IndexedKey(with firstkey,secondarykey), value> to Tuple<IndexedKey(only
with firstkey),value>.
> Analyze JOIN performance and improve the same.
> ----------------------------------------------
>
> Key: PIG-4797
> URL: https://issues.apache.org/jira/browse/PIG-4797
> Project: Pig
> Issue Type: Sub-task
> Components: spark
> Reporter: Pallavi Rao
> Assignee: liyunzhang_intel
> Labels: spork
> Attachments: Join performance analysis.pdf, PIG-4797.patch,
> PIG-4797_2.patch
>
>
> There are a big performance difference in join between spark and mr mode.
> {code}
> daily = load './NYSE_daily' as (exchange:chararray, symbol:chararray,
> date:chararray, open:float, high:float, low:float,
> close:float, volume:int, adj_close:float);
> divs = load './NYSE_dividends' as (exchange:chararray, symbol:chararray,
> date:chararray, dividends:float);
> jnd = join daily by (exchange, symbol), divs by (exchange, symbol);
> store jnd into './join.out';
> {code}
> join.sh
> {code}
> mode=$1
> start=$(date +%s)
> ./pig -x $mode $PIG_HOME/bin/join.pig
> end=$(date +%s)
> execution_time=$(( $end - $start ))
> echo "execution_time:"$excution_time
> {code}
> The execution time:
> || |||mr||spark||
> |join|20 sec|79 sec|
> You can download the test data NYSE_daily and NYSE_dividends in
> https://github.com/alanfgates/programmingpig/blob/master/data/
--
This message was sent by Atlassian JIRA
(v6.3.4#6332)