Hi all:
Now i'm working on PIG-4374<https://issues.apache.org/jira/browse/PIG-4374>(Add 
SparkPlan in spark package). I met problem in following scripts in spark mode.
Join.pig
A = load '/SkewedJoinInput1.txt' as (id,name,n);
B = load '/SkewedJoinInput2.txt' as (id,name);
C = group A by id;
D = group B by id;
E = join C by group, D by group;
store E into '/skewedjoin.out';
explain E;

The physical plan will change to a mr plan which contains 3 mapreduce nodes 
(see attached mr_join.txt)
"logroup" will converts to  "poLocalRearrange","poGlobalRearrange", "poPackage"
"lojoin" will converts to "poLocalRearrange","poGlobalRearrange", 
"poPackage","poPackage"

   in mapreduce mode, In MapReduceOper, there is mapplan, reduceplan, 
combineplan.
   
org.apache.pig.backend.hadoop.executionengine.mapReduceLayer.MRCompiler.visitLocalRearrange
   
org.apache.pig.backend.hadoop.executionengine.mapReduceLayer.MRCompiler.addToMap
   private void addToMap(PhysicalOperator op) throws PlanException, IOException{
         if (compiledInputs.length == 1) {
             //For speed
             MapReduceOper mro = compiledInputs[0];
             if (!mro.isMapDone()) {
                 mro.mapPlan.addAsLeaf(op);
             } else if (mro.isMapDone() && !mro.isReduceDone()) {
                 FileSpec fSpec = getTempFileSpec();

                 POStore st = getStore();     // MyComment: It will first add a 
POStore in mro.reducePlan and store the mro result in a tmp file.
                                                    // Then create a new MROper 
which contains a poload which loads previous tmp file
                 st.setSFile(fSpec);
                 mro.reducePlan.addAsLeaf(st);
                 mro.setReduceDone(true);
                 mro = startNew(fSpec, mro);
                 mro.mapPlan.addAsLeaf(op);
                 compiledInputs[0] = mro;
             } else {
                 int errCode = 2022;
                 String msg = "Both map and reduce phases have been done. This 
is unexpected while compiling.";
                 throw new PlanException(msg, errCode, PigException.BUG);
             }
             curMROp = mro;

          ....
          }

      In SparkOper I created, there is only plan.
      How can i deal with the situation i mentioned above? Now I use following 
ways to deal with:
      
org.apache.pig.backend.hadoop.executionengine.spark.plan.SparkCompiler.visitLocalRearrange
      
org.apache.pig.backend.hadoop.executionengine.spark.plan.SparkCompiler.addToMap
      private void addToMap(POLocalRearrange op) throws PlanException, 
IOException {
              if (compiledInputs.length == 1) {
                  SparkOper sparkOp = compiledInputs[0];
                  List<PhysicalOperator> preds =  plan.getPredecessors(op); 
//MyComment: It will first search the predecessor of POLocalRearrange,
                  if( preds!=null && preds.size() >0 && preds.size() == 1){
                      if(!( preds.get(0) instanceof  POLoad)  ){     // If 
predecessor is not a poload(usually the precessor of polocalrearrange is poload 
when using "group", "join")
                          FileSpec fSpec = getTempFileSpec();        //it will 
add a POStore in sparkOper.plan and store the sparkOper result in a tmp file
                          POStore st = getStore();                   // Then 
create a new SparkOper which contains a poload which loads previous tmp file
                          st.setSFile(fSpec);
                          sparkOp.plan.addAsLeaf(st);
                          sparkOp = startNew(fSpec, sparkOp);
                          compiledInputs[0] = sparkOp;
                      }
                  }
                  sparkOp.plan.addAsLeaf(op);
                  curSparkOp = sparkOp;
              } else {
              }
                  .....
              }


Can anyone tell me how tez deal with this situation, I want to reference 
something from other execution mode like mapreduce, tez.


Best regards
Zhang,Liyun

A = load '/SkewedJoinInput1.txt' as (id,name,n);
B = load '/SkewedJoinInput2.txt' as (id,name);
C = group A by id;
D = group B by id;
E = join C by group, D by group;
store E into '/skewedjoin.out';
explain E;



#-----------------------------------------------
# New Logical Plan:
#-----------------------------------------------
E: (Name: LOStore Schema: 
C::group#10:bytearray,C::A#23:bag{#29:tuple(id#10:bytearray,name#11:bytearray,n#12:bytearray)},D::group#15:bytearray,D::B#25:bag{#30:tuple(id#15:bytearray,name#16:bytearray)})
|
|---E: (Name: LOJoin(HASH) Schema: 
C::group#10:bytearray,C::A#23:bag{#29:tuple(id#10:bytearray,name#11:bytearray,n#12:bytearray)},D::group#15:bytearray,D::B#25:bag{#30:tuple(id#15:bytearray,name#16:bytearray)})
    |   |
    |   group:(Name: Project Type: bytearray Uid: 10 Input: 0 Column: 0)
    |   |
    |   group:(Name: Project Type: bytearray Uid: 15 Input: 1 Column: 0)
    |
    |---C: (Name: LOCogroup Schema: 
group#10:bytearray,A#23:bag{#29:tuple(id#10:bytearray,name#11:bytearray,n#12:bytearray)})
    |   |   |
    |   |   id:(Name: Project Type: bytearray Uid: 10 Input: 0 Column: 0)
    |   |
    |   |---A: (Name: LOLoad Schema: 
id#10:bytearray,name#11:bytearray,n#12:bytearray)RequiredFields:[0, 1, 2]
    |
    |---D: (Name: LOCogroup Schema: 
group#15:bytearray,B#25:bag{#30:tuple(id#15:bytearray,name#16:bytearray)})
        |   |
        |   id:(Name: Project Type: bytearray Uid: 15 Input: 0 Column: 0)
        |
        |---B: (Name: LOLoad Schema: 
id#15:bytearray,name#16:bytearray)RequiredFields:[0, 1]
#-----------------------------------------------
# Physical Plan:
#-----------------------------------------------
E: Store(fakefile:org.apache.pig.builtin.PigStorage) - scope-21
|
|---E: New For Each(true,true)[tuple] - scope-20
    |   |
    |   Project[bag][1] - scope-18
    |   |
    |   Project[bag][2] - scope-19
    |
    |---E: Package(Packager)[tuple]{bytearray} - scope-13
        |
        |---E: Global Rearrange[tuple] - scope-12
            |
            |---E: Local Rearrange[tuple]{bytearray}(false) - scope-14
            |   |   |
            |   |   Project[bytearray][0] - scope-15
            |   |
            |   |---C: Package(Packager)[tuple]{bytearray} - scope-2
            |       |
            |       |---C: Global Rearrange[tuple] - scope-1
            |           |
            |           |---C: Local Rearrange[tuple]{bytearray}(false) - 
scope-3
            |               |   |
            |               |   Project[bytearray][0] - scope-4
            |               |
            |               |---A: 
Load(/SkewedJoinInput1.txt:org.apache.pig.builtin.PigStorage) - scope-0
            |
            |---E: Local Rearrange[tuple]{bytearray}(false) - scope-16
                |   |
                |   Project[bytearray][0] - scope-17
                |
                |---D: Package(Packager)[tuple]{bytearray} - scope-7
                    |
                    |---D: Global Rearrange[tuple] - scope-6
                        |
                        |---D: Local Rearrange[tuple]{bytearray}(false) - 
scope-8
                            |   |
                            |   Project[bytearray][0] - scope-9
                            |
                            |---B: 
Load(/SkewedJoinInput2.txt:org.apache.pig.builtin.PigStorage) - scope-5

#--------------------------------------------------
# Map Reduce Plan                                  
#--------------------------------------------------
MapReduce node scope-26
Map Plan
D: Local Rearrange[tuple]{bytearray}(false) - scope-8
|   |
|   Project[bytearray][0] - scope-9
|
|---B: Load(/SkewedJoinInput2.txt:org.apache.pig.builtin.PigStorage) - 
scope-5--------
Reduce Plan
Store(hdfs://liyunzhangcentos.sh.intel.com:8020/tmp/temp1826267952/tmp-1291059779:org.apache.pig.impl.io.InterStorage)
 - scope-27
|
|---D: Package(Packager)[tuple]{bytearray} - scope-7--------
Global sort: false
----------------

MapReduce node scope-30
Map Plan
Union[tuple] - scope-31
|
|---E: Local Rearrange[tuple]{bytearray}(false) - scope-14
|   |   |
|   |   Project[bytearray][0] - scope-15
|   |
|   
|---Load(hdfs://liyunzhangcentos.sh.intel.com:8020/tmp/temp1826267952/tmp-787675348:org.apache.pig.impl.io.InterStorage)
 - scope-24
|
|---E: Local Rearrange[tuple]{bytearray}(false) - scope-16
    |   |
    |   Project[bytearray][0] - scope-17
    |
    
|---Load(hdfs://liyunzhangcentos.sh.intel.com:8020/tmp/temp1826267952/tmp-1291059779:org.apache.pig.impl.io.InterStorage)
 - scope-28--------
Reduce Plan
E: Store(fakefile:org.apache.pig.builtin.PigStorage) - scope-21
|
|---E: Package(JoinPackager(true,true))[tuple]{bytearray} - scope-13--------
Global sort: false
----------------

MapReduce node scope-22
Map Plan
C: Local Rearrange[tuple]{bytearray}(false) - scope-3
|   |
|   Project[bytearray][0] - scope-4
|
|---A: Load(/SkewedJoinInput1.txt:org.apache.pig.builtin.PigStorage) - 
scope-0--------
Reduce Plan
Store(hdfs://liyunzhangcentos.sh.intel.com:8020/tmp/temp1826267952/tmp-787675348:org.apache.pig.impl.io.InterStorage)
 - scope-23
|
|---C: Package(Packager)[tuple]{bytearray} - scope-2--------
Global sort: false
----------------

Reply via email to