[ 
https://issues.apache.org/jira/browse/PIG-4601?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=15097955#comment-15097955
 ] 

liyunzhang_intel commented on PIG-4601:
---------------------------------------

*Let make an example to explain the implementation of this feature:*
mergeCogroup.pig
{code}
REGISTER myudfs.jar;
A = load './cogroupMerge1.txt' using myudfs.DummyCollectableLoader() as 
(c1:chararray,c2:chararray);
B = load './cogroupMerge2.txt' using myudfs.DummyIndexableLoader() as 
(c1:chararray,c2:chararray);
C = cogroup A by (c1,c2), B by (c1,c2) using 'merge';
store C into './cogroupMerge.out';
{code}

The spark plan will be:
{code}
scope-57
scope-59->scope-57 
#--------------------------------------------------
# Spark Plan                                  
#--------------------------------------------------

Spark node scope-59
Store(hdfs://zly1.sh.intel.com:8020/tmp/temp-1195388614/tmp-117717770:org.apache.pig.impl.io.InterStorage)
 - scope-69
|
|---POSort[tuple]() - scope-68
    |   |
    |   Project[tuple][*] - scope-67
    |
    
|---Load(hdfs://zly1.sh.intel.com:8020/user/root/multiSplits1.txt:org.apache.pig.backend.hadoop.executionengine.mapReduceLayer.MergeJoinIndexer('myudfs.DummyCollectableLoader','eNqt......dcBFu','A_1-9','scope','false'))
 - scope-66--------

Spark node scope-57
C: 
Store(hdfs://zly1.sh.intel.com:8020/user/root/multiSplits.out:org.apache.pig.builtin.PigStorage)
 - scope-56
|
|---C: MergeCogroup[tuple] - scope-55
    |
    |---A: New For Each(false,false)[bag] - scope-42
        |   |
        |   Cast[chararray] - scope-37
        |   |
        |   |---Project[bytearray][0] - scope-36
        |   |
        |   Cast[int] - scope-40
        |   |
        |   |---Project[bytearray][1] - scope-39
        |
        |---A: 
Load(hdfs://zly1.sh.intel.com:8020/user/root/multiSplits1.txt:myudfs.DummyCollectableLoader)
 - scope-35-----
{code}
There are two spark nodes(scope-59,scope-57):
1. Scope-59: Load big dataset(multSplit1.txt) and use MergeJoinIndexer to 
generate the indexes.
  the format of the indexes will be
  (key0, key1,...,position,splitIndex)
  Here we need sort(POSort scope-68) the result  is because the result is not 
sorted by key (MergeJoinIndexer reads each PigSplit of big dataset but can not 
guarantee PigSplits are read in sequence and there is possiblity that pigSplit 
with bigger key appears before pigSplit with smaller key).
2. scope-57:Load the big dataset according to the index file and small dataset 
into memory, MergeCogroupConverter reuses the algorithms in POMergeCogroup to 
group the two datasets in memory.


*About the modification about TestMapSideCogroup#testMultiSplits*
I modified the code (comparing the result and expected) of 
TestMapSideCogroup#testMultiSplits in the patch because the result between 
spark and mr is different in this case.
the case like following:
{code}
REGISTER myudfs.jar;
A = load './multiSplits1.txt,./multiSplits3.txt' using 
myudfs.DummyCollectableLoader() as (c1:chararray,c2:int);
B = load './multiSplits2.txt' using myudfs.DummyIndexableLoader() as 
(c1:chararray,c2:int);
C = cogroup A by c1, B by c1 using 'merge';
store C into './multiSplits.out';
{code}

{code}
 cat bin/multiSplits1.txt :
1     1
1     2
1     3
2     1
2     2
2     3
3     1
3     2
3     3
{code}
{code}
cat bin/multiSplits3.txt :
4     1
4     2
4     3
5     1
5     2
5     3
6     1
6     2
6     3
7     1
7     2
7     3
8     1
8     2
8     3
9     1
9     2
9     3
{code}
{code}
 cat bin/multiSplits2.txt 
3       1
3       2
3       3
4       1
4       2
4       3
5       1
5       2
5       3
{code}

The result in Spark and MR mode is different. It is very interesting that in MR 
mode it first loads the second file then first file when multi loads are 
used(A= Load first, second using PigStorage ...)
mr:
{code}
4,{(4,1),(4,2),(4,3)},{(4,1),(4,2),(4,3)}
5,{(5,2),(5,1),(5,3)},{(5,1),(5,2),(5,3)}
6,{(6,1),(6,2),(6,3)},{}
7,{(7,1),(7,2),(7,3)},{}
8,{(8,1),(8,2),(8,3)},{}
9,{(9,1),(9,2),(9,3)},{}
1,{(1,1),(1,2),(1,3)},{}
2,{(2,1),(2,2),(2,3)},{}
3,{(3,3),(3,2),(3,1)},{(3,1),(3,2),(3,3)}
{code}

spark:
{code}
1,{(1,1),(1,2),(1,3)},{}
2,{(2,1),(2,2),(2,3)},{}
3,{(3,3),(3,2),(3,1)},{(3,1),(3,2),(3,3)}
4,{(4,1),(4,2),(4,3)},{(4,1),(4,2),(4,3)}
5,{(5,2),(5,1),(5,3)},{(5,1),(5,2),(5,3)}
6,{(6,1),(6,2),(6,3)},{}
7,{(7,1),(7,2),(7,3)},{}
8,{(8,1),(8,2),(8,3)},{}
9,{(9,1),(9,2),(9,3)},{}
{code}



> Implement Merge CoGroup for Spark engine
> ----------------------------------------
>
>                 Key: PIG-4601
>                 URL: https://issues.apache.org/jira/browse/PIG-4601
>             Project: Pig
>          Issue Type: Sub-task
>          Components: spark
>    Affects Versions: spark-branch
>            Reporter: Mohit Sabharwal
>            Assignee: liyunzhang_intel
>             Fix For: spark-branch
>
>
> When doing a cogroup operation, we need do a map-reduce. The target of merge 
> cogroup is implementing cogroup only by a single stage(map). But we need to 
> guarantee the input data are sorted.
> There is performance improvement for cases when A(big dataset) merge cogroup 
> B( small dataset) because we first generate an index file of A then loading A 
> according to the index file and B into memory to do cogroup. The performance 
> improves because there is no cost of reduce period comparing cogroup.
> How to use
> {code}
> C = cogroup A by c1, B by c1 using 'merge';
> {code}
> Here A and B is sorted.



--
This message was sent by Atlassian JIRA
(v6.3.4#6332)

Reply via email to