Hi all:
   I  want to ask a question about POSplit.  A new mr operatator is generated 
when POSplit is encounted.

MRCompiler#visitSplit

public void visitSplit(POSplit op) throws VisitorException{
    try{
        FileSpec fSpec = op.getSplitStore();
        MapReduceOper mro = endSingleInputPlanWithStr(fSpec);
        mro.setSplitter(true);
        splitsSeen.put(op.getOperatorKey(), mro);
        curMROp = startNew(fSpec, mro);
        phyToMROpMap.put(op, curMROp);
    }catch(Exception e){
        int errCode = 2034;
        String msg = "Error compiling operator " + 
op.getClass().getSimpleName();
        throw new MRCompilerException(msg, errCode, PigException.BUG, e);
    }
}


Here is a pig script shows that  a new mr operator should not be generated when 
POSplit is encounted:

testAccumulator.join.pig



REGISTER /home/zly/prj/oss/kellyzly/pig/bin/myudfs.jar;

A = load './testAccumulator.txt' as (id:int,f);

B = foreach A generate id, f, id as t;

C = group B by id;

D = foreach C {

E = order B by f desc;

F = E.f;

generate group, myudfs.AccumulativeSumBag(F);

};

G = foreach C {

E = order B by f desc;

F = E.f;

generate group, myudfs.AccumulativeSumBag(F);

};

H = join D by group, G by group;

store H into 'testAccumulator.join.out';

explain H


Physical Plan:
An implicit POSplit is generated in physical plan.

#-----------------------------------------------

# Physical Plan:

#-----------------------------------------------

H: Store(fakefile:org.apache.pig.builtin.PigStorage) - scope-57

|

|---H: New For Each(true,true)[tuple] - scope-56

    |   |

    |   Project[bag][1] - scope-54

    |   |

    |   Project[bag][2] - scope-55

    |

    |---H: Package(Packager)[tuple]{int} - scope-49

        |

        |---H: Global Rearrange[tuple] - scope-48

            |

            |---H: Local Rearrange[tuple]{int}(false) - scope-50

            |   |   |

            |   |   Project[int][0] - scope-51

            |   |

            |   |---D: New For Each(false,false)[bag] - scope-32

            |       |   |

            |       |   Project[int][0] - scope-22

            |       |   |

            |       |   POUserFunc(myudfs.AccumulativeSumBag)[chararray] - 
scope-25

            |       |   |

            |       |   |---RelationToExpressionProject[bag][*] - scope-24

            |       |       |

            |       |       |---F: New For Each(false)[bag] - scope-31

            |       |           |   |

            |       |           |   Project[bytearray][1] - scope-29

            |       |           |

            |       |           |---E: POSort[bag]() - scope-28

            |       |               |   |

            |       |               |   Project[bytearray][1] - scope-27

            |       |               |

            |       |               |---Project[bag][1] - scope-26

            |       |

            |       |---C: Filter[bag] - scope-20

            |           |   |

            |           |   Constant(true) - scope-21

            |           |

            |           |---C: Split - scope-19  // an implicit Split is 
generated

            |               |

            |               |---C: Package(Packager)[tuple]{int} - scope-16

            |                   |

            |                   |---C: Global Rearrange[tuple] - scope-15

            |                       |

            |                       |---C: Local Rearrange[tuple]{int}(false) - 
scope-17

            |                           |   |

            |                           |   Project[int][0] - scope-18

            |                           |

            |                           |---B: New For 
Each(false,false,false)[bag] - scope-14

            |                               |   |

            |                               |   Project[int][0] - scope-7

            |                               |   |

            |                               |   Project[bytearray][1] - scope-9

            |                               |   |

            |                               |   
POUserFunc(org.apache.pig.impl.builtin.IdentityColumn)[int] - scope-12

            |                               |   |

            |                               |   |---Project[int][0] - scope-11

            |                               |

            |                               |---A: New For 
Each(false,false)[bag] - scope-6

            |                                   |   |

            |                                   |   Cast[int] - scope-2

            |                                   |   |

            |                                   |   |---Project[bytearray][0] - 
scope-1

            |                                   |   |

            |                                   |   Project[bytearray][1] - 
scope-4

            |                                   |

            |                                   |---A: 
Load(hdfs://zly2.sh.intel.com:8020/user/root/testAccumulator.txt:org.apache.pig.builtin.PigStorage)
 - scope-0

            |

            |---H: Local Rearrange[tuple]{int}(false) - scope-52

                |   |

                |   Project[int][0] - scope-53

                |

                |---G: New For Each(false,false)[bag] - scope-45

                    |   |

                    |   Project[int][0] - scope-35

                    |   |

                    |   POUserFunc(myudfs.AccumulativeSumBag)[chararray] - 
scope-38

                    |   |

                    |   |---RelationToExpressionProject[bag][*] - scope-37

                    |       |

                    |       |---F: New For Each(false)[bag] - scope-44

                    |           |   |

                    |           |   Project[bytearray][1] - scope-42

                    |           |

                    |           |---E: POSort[bag]() - scope-41

                    |               |   |

                    |               |   Project[bytearray][1] - scope-40

                    |               |

                    |               |---Project[bag][1] - scope-39

                    |

                    |---C: Filter[bag] - scope-33

                        |   |

                        |   Constant(true) - scope-34

                        |

                        |---C: Split - scope-19

                            |

                            |---C: Package(Packager)[tuple]{int} - scope-16

                                |

                                |---C: Global Rearrange[tuple] - scope-15

                                    |

                                    |---C: Local Rearrange[tuple]{int}(false) - 
scope-17

                                        |   |

                                        |   Project[int][0] - scope-18

                                        |

                                        |---B: New For 
Each(false,false,false)[bag] - scope-14

                                            |   |

                                            |   Project[int][0] - scope-7

                                            |   |

                                            |   Project[bytearray][1] - scope-9

                                            |   |

                                            |   
POUserFunc(org.apache.pig.impl.builtin.IdentityColumn)[int] - scope-12

                                            |   |

                                            |   |---Project[int][0] - scope-11

                                            |

                                            |---A: New For 
Each(false,false)[bag] - scope-6

                                                |   |

                                                |   Cast[int] - scope-2

                                                |   |

                                                |   |---Project[bytearray][0] - 
scope-1

                                                |   |

                                                |   Project[bytearray][1] - 
scope-4

                                                |

                                                |---A: 
Load(hdfs://zly2.sh.intel.com:8020/user/root/testAccumulator.txt:org.apache.pig.builtin.PigStorage)
 - scope-0

MR plan

POSort(scope-28) should be deleted in 
SecondaryKeyOptimizerUtil.java#applySecondaryKeySort<https://github.com/apache/pig/blob/spark/src/org/apache/pig/backend/hadoop/executionengine/util/SecondaryKeyOptimizerUtil.java#L329>.
 but it is not deleted in this situation because SecondaryKeyOptimizer will 
only be enabled when group+foreach case is found in one MapReduceOper
while POSplit(scope-19 in physical plan ) makes group and foreach in different 
operators. So my questions is: a new mr operator should  be generated when 
POSplit is encounted? If we should create a new MapReduceOper when posplit is 
encounted, how to deal with this situation?
#--------------------------------------------------
# Map Reduce Plan
#--------------------------------------------------
MapReduce node scope-58
Map Plan
C: Local Rearrange[tuple]{int}(false) - scope-17
|   |
|   Project[int][0] - scope-18
|
|---B: New For Each(false,false,false)[bag] - scope-14
    |   |
    |   Project[int][0] - scope-7
    |   |
    |   Project[bytearray][1] - scope-9
    |   |
    |   POUserFunc(org.apache.pig.impl.builtin.IdentityColumn)[int] - scope-12
    |   |
    |   |---Project[int][0] - scope-11
    |
    |---A: New For Each(false,false)[bag] - scope-6
        |   |
        |   Cast[int] - scope-2
        |   |
        |   |---Project[bytearray][0] - scope-1
        |   |
        |   Project[bytearray][1] - scope-4
        |
        |---A: 
Load(hdfs://zly2.sh.intel.com:8020/user/root/testAccumulator.txt:org.apache.pig.builtin.PigStorage)
 - scope-0--------
Reduce Plan
Store(hdfs://zly2.sh.intel.com:8020/tmp/temp523129898/tmp344582360:org.apache.pig.impl.io.InterStorage)
 - scope-59
|
|---C: Package(Packager)[tuple]{int} - scope-16--------
Global sort: false
----------------

MapReduce node scope-64
Map Plan
Union[tuple] - scope-65
|
|---H: Local Rearrange[tuple]{int}(false) - scope-50
|   |   |
|   |   Project[int][0] - scope-51
|   |
|   |---D: New For Each(false,false)[bag] - scope-32
|       |   |
|       |   Project[int][0] - scope-22
|       |   |
|       |   POUserFunc(myudfs.AccumulativeSumBag)[chararray] - scope-25
|       |   |
|       |   |---RelationToExpressionProject[bag][*] - scope-24
|       |       |
|       |       |---F: New For Each(false)[bag] - scope-31
|       |           |   |
|       |           |   Project[bytearray][1] - scope-29
|       |           |
|       |           |---E: POSort[bag]() - scope-28
|       |               |   |
|       |               |   Project[bytearray][1] - scope-27
|       |               |
|       |               |---Project[bag][1] - scope-26
|       |
|       
|---Load(hdfs://zly2.sh.intel.com:8020/tmp/temp523129898/tmp344582360:org.apache.pig.impl.io.InterStorage)
 - scope-60
|
|---H: Local Rearrange[tuple]{int}(false) - scope-52
    |   |
    |   Project[int][0] - scope-53
    |
    |---G: New For Each(false,false)[bag] - scope-45
        |   |
        |   Project[int][0] - scope-35
        |   |
        |   POUserFunc(myudfs.AccumulativeSumBag)[chararray] - scope-38
        |   |
        |   |---RelationToExpressionProject[bag][*] - scope-37
        |       |
        |       |---F: New For Each(false)[bag] - scope-44
        |           |   |
        |           |   Project[bytearray][1] - scope-42
        |           |
        |           |---E: POSort[bag]() - scope-41
        |               |   |
        |               |   Project[bytearray][1] - scope-40
        |               |
        |               |---Project[bag][1] - scope-39
        |
        
|---Load(hdfs://zly2.sh.intel.com:8020/tmp/temp523129898/tmp344582360:org.apache.pig.impl.io.InterStorage)
 - scope-62--------
Reduce Plan
H: Store(fakefile:org.apache.pig.builtin.PigStorage) - scope-57
|
|---H: Package(JoinPackager(true,true))[tuple]{int} - scope-49--------
Global sort: false
----------------



Kelly Zhang/Zhang,Liyun
Best Regards

Reply via email to