Hi all:
Now i'm working on PIG-4374<https://issues.apache.org/jira/browse/PIG-4374>(Add
SparkPlan in spark package). I met problem in following scripts in spark mode.
Join.pig
A = load '/SkewedJoinInput1.txt' as (id,name,n);
B = load '/SkewedJoinInput2.txt' as (id,name);
C = group A by id;
D = group B by id;
E = join C by group, D by group;
store E into '/skewedjoin.out';
explain E;
The physical plan will change to a mr plan which contains 3 mapreduce nodes
(see attached mr_join.txt)
"logroup" will converts to "poLocalRearrange","poGlobalRearrange", "poPackage"
"lojoin" will converts to "poLocalRearrange","poGlobalRearrange",
"poPackage","poPackage"
in mapreduce mode, In MapReduceOper, there is mapplan, reduceplan,
combineplan.
org.apache.pig.backend.hadoop.executionengine.mapReduceLayer.MRCompiler.visitLocalRearrange
org.apache.pig.backend.hadoop.executionengine.mapReduceLayer.MRCompiler.addToMap
private void addToMap(PhysicalOperator op) throws PlanException, IOException{
if (compiledInputs.length == 1) {
//For speed
MapReduceOper mro = compiledInputs[0];
if (!mro.isMapDone()) {
mro.mapPlan.addAsLeaf(op);
} else if (mro.isMapDone() && !mro.isReduceDone()) {
FileSpec fSpec = getTempFileSpec();
POStore st = getStore(); // MyComment: It will first add a
POStore in mro.reducePlan and store the mro result in a tmp file.
// Then create a new MROper
which contains a poload which loads previous tmp file
st.setSFile(fSpec);
mro.reducePlan.addAsLeaf(st);
mro.setReduceDone(true);
mro = startNew(fSpec, mro);
mro.mapPlan.addAsLeaf(op);
compiledInputs[0] = mro;
} else {
int errCode = 2022;
String msg = "Both map and reduce phases have been done. This
is unexpected while compiling.";
throw new PlanException(msg, errCode, PigException.BUG);
}
curMROp = mro;
....
}
In SparkOper I created, there is only plan.
How can i deal with the situation i mentioned above? Now I use following
ways to deal with:
org.apache.pig.backend.hadoop.executionengine.spark.plan.SparkCompiler.visitLocalRearrange
org.apache.pig.backend.hadoop.executionengine.spark.plan.SparkCompiler.addToMap
private void addToMap(POLocalRearrange op) throws PlanException,
IOException {
if (compiledInputs.length == 1) {
SparkOper sparkOp = compiledInputs[0];
List<PhysicalOperator> preds = plan.getPredecessors(op);
//MyComment: It will first search the predecessor of POLocalRearrange,
if( preds!=null && preds.size() >0 && preds.size() == 1){
if(!( preds.get(0) instanceof POLoad) ){ // If
predecessor is not a poload(usually the precessor of polocalrearrange is poload
when using "group", "join")
FileSpec fSpec = getTempFileSpec(); //it will
add a POStore in sparkOper.plan and store the sparkOper result in a tmp file
POStore st = getStore(); // Then
create a new SparkOper which contains a poload which loads previous tmp file
st.setSFile(fSpec);
sparkOp.plan.addAsLeaf(st);
sparkOp = startNew(fSpec, sparkOp);
compiledInputs[0] = sparkOp;
}
}
sparkOp.plan.addAsLeaf(op);
curSparkOp = sparkOp;
} else {
}
.....
}
Can anyone tell me how tez deal with this situation, I want to reference
something from other execution mode like mapreduce, tez.
Best regards
Zhang,Liyun
A = load '/SkewedJoinInput1.txt' as (id,name,n);
B = load '/SkewedJoinInput2.txt' as (id,name);
C = group A by id;
D = group B by id;
E = join C by group, D by group;
store E into '/skewedjoin.out';
explain E;
#-----------------------------------------------
# New Logical Plan:
#-----------------------------------------------
E: (Name: LOStore Schema:
C::group#10:bytearray,C::A#23:bag{#29:tuple(id#10:bytearray,name#11:bytearray,n#12:bytearray)},D::group#15:bytearray,D::B#25:bag{#30:tuple(id#15:bytearray,name#16:bytearray)})
|
|---E: (Name: LOJoin(HASH) Schema:
C::group#10:bytearray,C::A#23:bag{#29:tuple(id#10:bytearray,name#11:bytearray,n#12:bytearray)},D::group#15:bytearray,D::B#25:bag{#30:tuple(id#15:bytearray,name#16:bytearray)})
| |
| group:(Name: Project Type: bytearray Uid: 10 Input: 0 Column: 0)
| |
| group:(Name: Project Type: bytearray Uid: 15 Input: 1 Column: 0)
|
|---C: (Name: LOCogroup Schema:
group#10:bytearray,A#23:bag{#29:tuple(id#10:bytearray,name#11:bytearray,n#12:bytearray)})
| | |
| | id:(Name: Project Type: bytearray Uid: 10 Input: 0 Column: 0)
| |
| |---A: (Name: LOLoad Schema:
id#10:bytearray,name#11:bytearray,n#12:bytearray)RequiredFields:[0, 1, 2]
|
|---D: (Name: LOCogroup Schema:
group#15:bytearray,B#25:bag{#30:tuple(id#15:bytearray,name#16:bytearray)})
| |
| id:(Name: Project Type: bytearray Uid: 15 Input: 0 Column: 0)
|
|---B: (Name: LOLoad Schema:
id#15:bytearray,name#16:bytearray)RequiredFields:[0, 1]
#-----------------------------------------------
# Physical Plan:
#-----------------------------------------------
E: Store(fakefile:org.apache.pig.builtin.PigStorage) - scope-21
|
|---E: New For Each(true,true)[tuple] - scope-20
| |
| Project[bag][1] - scope-18
| |
| Project[bag][2] - scope-19
|
|---E: Package(Packager)[tuple]{bytearray} - scope-13
|
|---E: Global Rearrange[tuple] - scope-12
|
|---E: Local Rearrange[tuple]{bytearray}(false) - scope-14
| | |
| | Project[bytearray][0] - scope-15
| |
| |---C: Package(Packager)[tuple]{bytearray} - scope-2
| |
| |---C: Global Rearrange[tuple] - scope-1
| |
| |---C: Local Rearrange[tuple]{bytearray}(false) -
scope-3
| | |
| | Project[bytearray][0] - scope-4
| |
| |---A:
Load(/SkewedJoinInput1.txt:org.apache.pig.builtin.PigStorage) - scope-0
|
|---E: Local Rearrange[tuple]{bytearray}(false) - scope-16
| |
| Project[bytearray][0] - scope-17
|
|---D: Package(Packager)[tuple]{bytearray} - scope-7
|
|---D: Global Rearrange[tuple] - scope-6
|
|---D: Local Rearrange[tuple]{bytearray}(false) -
scope-8
| |
| Project[bytearray][0] - scope-9
|
|---B:
Load(/SkewedJoinInput2.txt:org.apache.pig.builtin.PigStorage) - scope-5
#--------------------------------------------------
# Map Reduce Plan
#--------------------------------------------------
MapReduce node scope-26
Map Plan
D: Local Rearrange[tuple]{bytearray}(false) - scope-8
| |
| Project[bytearray][0] - scope-9
|
|---B: Load(/SkewedJoinInput2.txt:org.apache.pig.builtin.PigStorage) -
scope-5--------
Reduce Plan
Store(hdfs://liyunzhangcentos.sh.intel.com:8020/tmp/temp1826267952/tmp-1291059779:org.apache.pig.impl.io.InterStorage)
- scope-27
|
|---D: Package(Packager)[tuple]{bytearray} - scope-7--------
Global sort: false
----------------
MapReduce node scope-30
Map Plan
Union[tuple] - scope-31
|
|---E: Local Rearrange[tuple]{bytearray}(false) - scope-14
| | |
| | Project[bytearray][0] - scope-15
| |
|
|---Load(hdfs://liyunzhangcentos.sh.intel.com:8020/tmp/temp1826267952/tmp-787675348:org.apache.pig.impl.io.InterStorage)
- scope-24
|
|---E: Local Rearrange[tuple]{bytearray}(false) - scope-16
| |
| Project[bytearray][0] - scope-17
|
|---Load(hdfs://liyunzhangcentos.sh.intel.com:8020/tmp/temp1826267952/tmp-1291059779:org.apache.pig.impl.io.InterStorage)
- scope-28--------
Reduce Plan
E: Store(fakefile:org.apache.pig.builtin.PigStorage) - scope-21
|
|---E: Package(JoinPackager(true,true))[tuple]{bytearray} - scope-13--------
Global sort: false
----------------
MapReduce node scope-22
Map Plan
C: Local Rearrange[tuple]{bytearray}(false) - scope-3
| |
| Project[bytearray][0] - scope-4
|
|---A: Load(/SkewedJoinInput1.txt:org.apache.pig.builtin.PigStorage) -
scope-0--------
Reduce Plan
Store(hdfs://liyunzhangcentos.sh.intel.com:8020/tmp/temp1826267952/tmp-787675348:org.apache.pig.impl.io.InterStorage)
- scope-23
|
|---C: Package(Packager)[tuple]{bytearray} - scope-2--------
Global sort: false
----------------