Hi all:
I want to ask a question about POSplit. A new mr operatator is generated
when POSplit is encounted.
MRCompiler#visitSplit
public void visitSplit(POSplit op) throws VisitorException{
try{
FileSpec fSpec = op.getSplitStore();
MapReduceOper mro = endSingleInputPlanWithStr(fSpec);
mro.setSplitter(true);
splitsSeen.put(op.getOperatorKey(), mro);
curMROp = startNew(fSpec, mro);
phyToMROpMap.put(op, curMROp);
}catch(Exception e){
int errCode = 2034;
String msg = "Error compiling operator " +
op.getClass().getSimpleName();
throw new MRCompilerException(msg, errCode, PigException.BUG, e);
}
}
Here is a pig script shows that a new mr operator should not be generated when
POSplit is encounted:
testAccumulator.join.pig
REGISTER /home/zly/prj/oss/kellyzly/pig/bin/myudfs.jar;
A = load './testAccumulator.txt' as (id:int,f);
B = foreach A generate id, f, id as t;
C = group B by id;
D = foreach C {
E = order B by f desc;
F = E.f;
generate group, myudfs.AccumulativeSumBag(F);
};
G = foreach C {
E = order B by f desc;
F = E.f;
generate group, myudfs.AccumulativeSumBag(F);
};
H = join D by group, G by group;
store H into 'testAccumulator.join.out';
explain H
Physical Plan:
An implicit POSplit is generated in physical plan.
#-----------------------------------------------
# Physical Plan:
#-----------------------------------------------
H: Store(fakefile:org.apache.pig.builtin.PigStorage) - scope-57
|
|---H: New For Each(true,true)[tuple] - scope-56
| |
| Project[bag][1] - scope-54
| |
| Project[bag][2] - scope-55
|
|---H: Package(Packager)[tuple]{int} - scope-49
|
|---H: Global Rearrange[tuple] - scope-48
|
|---H: Local Rearrange[tuple]{int}(false) - scope-50
| | |
| | Project[int][0] - scope-51
| |
| |---D: New For Each(false,false)[bag] - scope-32
| | |
| | Project[int][0] - scope-22
| | |
| | POUserFunc(myudfs.AccumulativeSumBag)[chararray] -
scope-25
| | |
| | |---RelationToExpressionProject[bag][*] - scope-24
| | |
| | |---F: New For Each(false)[bag] - scope-31
| | | |
| | | Project[bytearray][1] - scope-29
| | |
| | |---E: POSort[bag]() - scope-28
| | | |
| | | Project[bytearray][1] - scope-27
| | |
| | |---Project[bag][1] - scope-26
| |
| |---C: Filter[bag] - scope-20
| | |
| | Constant(true) - scope-21
| |
| |---C: Split - scope-19 // an implicit Split is
generated
| |
| |---C: Package(Packager)[tuple]{int} - scope-16
| |
| |---C: Global Rearrange[tuple] - scope-15
| |
| |---C: Local Rearrange[tuple]{int}(false) -
scope-17
| | |
| | Project[int][0] - scope-18
| |
| |---B: New For
Each(false,false,false)[bag] - scope-14
| | |
| | Project[int][0] - scope-7
| | |
| | Project[bytearray][1] - scope-9
| | |
| |
POUserFunc(org.apache.pig.impl.builtin.IdentityColumn)[int] - scope-12
| | |
| | |---Project[int][0] - scope-11
| |
| |---A: New For
Each(false,false)[bag] - scope-6
| | |
| | Cast[int] - scope-2
| | |
| | |---Project[bytearray][0] -
scope-1
| | |
| | Project[bytearray][1] -
scope-4
| |
| |---A:
Load(hdfs://zly2.sh.intel.com:8020/user/root/testAccumulator.txt:org.apache.pig.builtin.PigStorage)
- scope-0
|
|---H: Local Rearrange[tuple]{int}(false) - scope-52
| |
| Project[int][0] - scope-53
|
|---G: New For Each(false,false)[bag] - scope-45
| |
| Project[int][0] - scope-35
| |
| POUserFunc(myudfs.AccumulativeSumBag)[chararray] -
scope-38
| |
| |---RelationToExpressionProject[bag][*] - scope-37
| |
| |---F: New For Each(false)[bag] - scope-44
| | |
| | Project[bytearray][1] - scope-42
| |
| |---E: POSort[bag]() - scope-41
| | |
| | Project[bytearray][1] - scope-40
| |
| |---Project[bag][1] - scope-39
|
|---C: Filter[bag] - scope-33
| |
| Constant(true) - scope-34
|
|---C: Split - scope-19
|
|---C: Package(Packager)[tuple]{int} - scope-16
|
|---C: Global Rearrange[tuple] - scope-15
|
|---C: Local Rearrange[tuple]{int}(false) -
scope-17
| |
| Project[int][0] - scope-18
|
|---B: New For
Each(false,false,false)[bag] - scope-14
| |
| Project[int][0] - scope-7
| |
| Project[bytearray][1] - scope-9
| |
|
POUserFunc(org.apache.pig.impl.builtin.IdentityColumn)[int] - scope-12
| |
| |---Project[int][0] - scope-11
|
|---A: New For
Each(false,false)[bag] - scope-6
| |
| Cast[int] - scope-2
| |
| |---Project[bytearray][0] -
scope-1
| |
| Project[bytearray][1] -
scope-4
|
|---A:
Load(hdfs://zly2.sh.intel.com:8020/user/root/testAccumulator.txt:org.apache.pig.builtin.PigStorage)
- scope-0
MR plan
POSort(scope-28) should be deleted in
SecondaryKeyOptimizerUtil.java#applySecondaryKeySort<https://github.com/apache/pig/blob/spark/src/org/apache/pig/backend/hadoop/executionengine/util/SecondaryKeyOptimizerUtil.java#L329>.
but it is not deleted in this situation because SecondaryKeyOptimizer will
only be enabled when group+foreach case is found in one MapReduceOper
while POSplit(scope-19 in physical plan ) makes group and foreach in different
operators. So my questions is: a new mr operator should be generated when
POSplit is encounted? If we should create a new MapReduceOper when posplit is
encounted, how to deal with this situation?
#--------------------------------------------------
# Map Reduce Plan
#--------------------------------------------------
MapReduce node scope-58
Map Plan
C: Local Rearrange[tuple]{int}(false) - scope-17
| |
| Project[int][0] - scope-18
|
|---B: New For Each(false,false,false)[bag] - scope-14
| |
| Project[int][0] - scope-7
| |
| Project[bytearray][1] - scope-9
| |
| POUserFunc(org.apache.pig.impl.builtin.IdentityColumn)[int] - scope-12
| |
| |---Project[int][0] - scope-11
|
|---A: New For Each(false,false)[bag] - scope-6
| |
| Cast[int] - scope-2
| |
| |---Project[bytearray][0] - scope-1
| |
| Project[bytearray][1] - scope-4
|
|---A:
Load(hdfs://zly2.sh.intel.com:8020/user/root/testAccumulator.txt:org.apache.pig.builtin.PigStorage)
- scope-0--------
Reduce Plan
Store(hdfs://zly2.sh.intel.com:8020/tmp/temp523129898/tmp344582360:org.apache.pig.impl.io.InterStorage)
- scope-59
|
|---C: Package(Packager)[tuple]{int} - scope-16--------
Global sort: false
----------------
MapReduce node scope-64
Map Plan
Union[tuple] - scope-65
|
|---H: Local Rearrange[tuple]{int}(false) - scope-50
| | |
| | Project[int][0] - scope-51
| |
| |---D: New For Each(false,false)[bag] - scope-32
| | |
| | Project[int][0] - scope-22
| | |
| | POUserFunc(myudfs.AccumulativeSumBag)[chararray] - scope-25
| | |
| | |---RelationToExpressionProject[bag][*] - scope-24
| | |
| | |---F: New For Each(false)[bag] - scope-31
| | | |
| | | Project[bytearray][1] - scope-29
| | |
| | |---E: POSort[bag]() - scope-28
| | | |
| | | Project[bytearray][1] - scope-27
| | |
| | |---Project[bag][1] - scope-26
| |
|
|---Load(hdfs://zly2.sh.intel.com:8020/tmp/temp523129898/tmp344582360:org.apache.pig.impl.io.InterStorage)
- scope-60
|
|---H: Local Rearrange[tuple]{int}(false) - scope-52
| |
| Project[int][0] - scope-53
|
|---G: New For Each(false,false)[bag] - scope-45
| |
| Project[int][0] - scope-35
| |
| POUserFunc(myudfs.AccumulativeSumBag)[chararray] - scope-38
| |
| |---RelationToExpressionProject[bag][*] - scope-37
| |
| |---F: New For Each(false)[bag] - scope-44
| | |
| | Project[bytearray][1] - scope-42
| |
| |---E: POSort[bag]() - scope-41
| | |
| | Project[bytearray][1] - scope-40
| |
| |---Project[bag][1] - scope-39
|
|---Load(hdfs://zly2.sh.intel.com:8020/tmp/temp523129898/tmp344582360:org.apache.pig.impl.io.InterStorage)
- scope-62--------
Reduce Plan
H: Store(fakefile:org.apache.pig.builtin.PigStorage) - scope-57
|
|---H: Package(JoinPackager(true,true))[tuple]{int} - scope-49--------
Global sort: false
----------------
Kelly Zhang/Zhang,Liyun
Best Regards