Modified: 
pig/branches/spark/src/org/apache/pig/backend/hadoop/executionengine/spark/plan/SparkCompiler.java
URL: 
http://svn.apache.org/viewvc/pig/branches/spark/src/org/apache/pig/backend/hadoop/executionengine/spark/plan/SparkCompiler.java?rev=1665404&r1=1665403&r2=1665404&view=diff
==============================================================================
--- 
pig/branches/spark/src/org/apache/pig/backend/hadoop/executionengine/spark/plan/SparkCompiler.java
 (original)
+++ 
pig/branches/spark/src/org/apache/pig/backend/hadoop/executionengine/spark/plan/SparkCompiler.java
 Tue Mar 10 04:37:36 2015
@@ -56,7 +56,7 @@ import org.apache.pig.backend.hadoop.exe
 import 
org.apache.pig.backend.hadoop.executionengine.physicalLayer.relationalOperators.POUnion;
 import 
org.apache.pig.backend.hadoop.executionengine.physicalLayer.relationalOperators.Packager;
 import 
org.apache.pig.backend.hadoop.executionengine.physicalLayer.util.PlanHelper;
-import 
org.apache.pig.backend.hadoop.executionengine.spark.operator.NativeSparkOper;
+import 
org.apache.pig.backend.hadoop.executionengine.spark.operator.NativeSparkOperator;
 import 
org.apache.pig.backend.hadoop.executionengine.spark.operator.POStreamSpark;
 import org.apache.pig.impl.PigContext;
 import org.apache.pig.impl.io.FileSpec;
@@ -69,601 +69,652 @@ import org.apache.pig.impl.plan.PlanExce
 import org.apache.pig.impl.plan.VisitorException;
 
 /**
- * The compiler that compiles a given physical physicalPlan
- * into a DAG of Spark operators
+ * The compiler that compiles a given physical physicalPlan into a DAG of Spark
+ * operators
  */
 public class SparkCompiler extends PhyPlanVisitor {
-    private PigContext pigContext;
+       private PigContext pigContext;
 
-    //The physicalPlan that is being compiled
-    private PhysicalPlan physicalPlan;
+       // The physicalPlan that is being compiled
+       private PhysicalPlan physicalPlan;
 
-    //The physicalPlan of Spark Operators
-    private SparkOperPlan sparkPlan;
+       // The physicalPlan of Spark Operators
+       private SparkOperPlan sparkPlan;
 
-    private SparkOper curSparkOp;
-
-    private String scope;
-
-    private SparkOper[] compiledInputs  = null;
-
-    private Map<OperatorKey, SparkOper> splitsSeen;
-
-    private NodeIdGenerator nig;
-
-    private Map<PhysicalOperator,SparkOper> phyToSparkOpMap;
-    private UDFFinder udfFinder;
-
-    public SparkCompiler(PhysicalPlan physicalPlan,
-                      PigContext pigContext){
-        super(physicalPlan, new DepthFirstWalker<PhysicalOperator, 
PhysicalPlan>(physicalPlan));
-        this.physicalPlan = physicalPlan;
-        this.pigContext = pigContext;
-        this.sparkPlan = new SparkOperPlan();
-        this.phyToSparkOpMap = new HashMap<PhysicalOperator,SparkOper>();
-        this.udfFinder = new UDFFinder();
-        this.nig = NodeIdGenerator.getGenerator();
-        this.splitsSeen = new HashMap<OperatorKey, SparkOper>();
-
-    }
-
-    public void compile() throws IOException, PlanException, VisitorException {
-        List<PhysicalOperator> roots = physicalPlan.getRoots();
-        if((roots == null) || (roots.size() <= 0)) {
-            int errCode = 2053;
-            String msg = "Internal error. Did not find roots in the physical 
physicalPlan.";
-            throw new SparkCompilerException(msg, errCode, PigException.BUG);
-        }
-        scope = roots.get(0).getOperatorKey().getScope();
-        List<PhysicalOperator> leaves = physicalPlan.getLeaves();
-
-        if (!pigContext.inIllustrator)
-            for (PhysicalOperator op : leaves) {
-                if (!(op instanceof POStore)) {
-                    int errCode = 2025;
-                    String msg = "Expected leaf of reduce physicalPlan to " +
-                            "always be POStore. Found " + 
op.getClass().getSimpleName();
-                    throw new SparkCompilerException(msg, errCode, 
PigException.BUG);
-                }
-            }
-
-        // get all stores and nativeSpark operators, sort them in 
order(operator id)
-        // and compile their plans
-        List<POStore> stores = PlanHelper.getPhysicalOperators(physicalPlan, 
POStore.class);
-        List<PONative> nativeSparks= 
PlanHelper.getPhysicalOperators(physicalPlan, PONative.class);
-        List<PhysicalOperator> ops;
-        if (!pigContext.inIllustrator) {
-            ops = new ArrayList<PhysicalOperator>(stores.size() + 
nativeSparks.size());
-            ops.addAll(stores);
-        } else {
-            ops = new ArrayList<PhysicalOperator>(leaves.size() + 
nativeSparks.size());
-            ops.addAll(leaves);
-        }
-        ops.addAll(nativeSparks);
-        Collections.sort(ops);
-
-        for (PhysicalOperator op : ops) {
-            compile(op);
-        }
-    }
-
-    /**
-     * Compiles the physicalPlan below op into a Spark Operator
-     * and stores it in curSparkOp.
-     * @param op
-     * @throws IOException
-     * @throws PlanException
-     * @throws VisitorException
-     */
-    private void compile(PhysicalOperator op) throws IOException, 
PlanException, VisitorException {
-        SparkOper[] prevCompInp = compiledInputs;
-
-        List<PhysicalOperator> predecessors = physicalPlan.getPredecessors(op);
-        if(op instanceof PONative){
-            // the predecessor (store) has already been processed
-            // don't process it again
-        }
-        else if (predecessors != null && predecessors.size() > 0) {
-            // When processing an entire script (multiquery), we can
-            // get into a situation where a load has
-            // predecessors. This means that it depends on some store
-            // earlier in the physicalPlan. We need to take that dependency
-            // and connect the respective Spark operators, while at the
-            // same time removing the connection between the Physical
-            // operators. That way the jobs will run in the right
-            // order.
-            if (op instanceof POLoad) {
-
-                if (predecessors.size() != 1) {
-                    int errCode = 2125;
-                    String msg = "Expected at most one predecessor of load. 
Got "+predecessors.size();
-                    throw new PlanException(msg, errCode, PigException.BUG);
-                }
-
-                PhysicalOperator p = predecessors.get(0);
-                SparkOper oper = null;
-                if(p instanceof POStore || p instanceof PONative){
-                    oper = phyToSparkOpMap.get(p);
-                }else{
-                    int errCode = 2126;
-                    String msg = "Predecessor of load should be a store or 
spark operator. Got "+p.getClass();
-                    throw new PlanException(msg, errCode, PigException.BUG);
-                }
-
-                // Need new operator
-                curSparkOp = getSparkOp();
-                curSparkOp.add(op);
-                sparkPlan.add(curSparkOp);
-                physicalPlan.disconnect(op, p);
-                sparkPlan.connect(oper, curSparkOp);
-                phyToSparkOpMap.put(op, curSparkOp);
-                return;
-            }
-
-            Collections.sort(predecessors);
-            compiledInputs = new SparkOper[predecessors.size()];
-            int i = -1;
-            for (PhysicalOperator pred : predecessors) {
-                if(pred instanceof POSplit && 
splitsSeen.containsKey(pred.getOperatorKey())){
-                    compiledInputs[++i] = 
startNew(((POSplit)pred).getSplitStore(), 
splitsSeen.get(pred.getOperatorKey()));
-                    continue;
-                }
-                compile(pred);
-                compiledInputs[++i] = curSparkOp;
-            }
-        } else {
-            //No predecessors. Mostly a load. But this is where
-            //we start. We create a new sparkOp and add its first
-            //operator op. Also this should be added to the sparkPlan.
-            curSparkOp = getSparkOp();
-            curSparkOp.add(op);
-            if (op !=null && op instanceof POLoad)
-            {
-                if (((POLoad)op).getLFile()!=null && 
((POLoad)op).getLFile().getFuncSpec()!=null)
-                    
curSparkOp.UDFs.add(((POLoad)op).getLFile().getFuncSpec().toString());
-            }
-            sparkPlan.add(curSparkOp);
-            phyToSparkOpMap.put(op, curSparkOp);
-            return;
-        }
-        op.visit(this);
-        compiledInputs = prevCompInp;
-    }
-
-
-    private SparkOper getSparkOp() {
-        return new SparkOper(OperatorKey.genOpKey(scope));
-    }
-
-    public SparkOperPlan getSparkPlan() {
-        return sparkPlan;
-    }
-
-    public void connectSoftLink() throws PlanException, IOException {
-        for (PhysicalOperator op : physicalPlan) {
-            if (physicalPlan.getSoftLinkPredecessors(op)!=null) {
-                for (PhysicalOperator pred : 
physicalPlan.getSoftLinkPredecessors(op)) {
-                    SparkOper from = phyToSparkOpMap.get(pred);
-                    SparkOper to = phyToSparkOpMap.get(op);
-                    if (from==to)
-                        continue;
-                    if (sparkPlan.getPredecessors(to)==null || 
!sparkPlan.getPredecessors(to).contains(from)) {
-                        sparkPlan.connect(from, to);
-                    }
-                }
-            }
-        }
-    }
-
-    private SparkOper startNew(FileSpec fSpec, SparkOper old) throws 
PlanException{
-        POLoad ld = getLoad();
-        ld.setLFile(fSpec);
-        SparkOper ret = getSparkOp();
-        ret.add(ld);
-        sparkPlan.add(ret);
-        sparkPlan.connect(old, ret);
-        return ret;
-    }
-
-
-    private POLoad getLoad(){
-        POLoad ld = new POLoad(new 
OperatorKey(scope,nig.getNextNodeId(scope)));
-        ld.setPc(pigContext);
-        ld.setIsTmpLoad(true);
-        return ld;
-    }
-
-    @Override
-    public void visitSplit(POSplit op) throws VisitorException{
-        try{
-            FileSpec fSpec = op.getSplitStore();
-            SparkOper sparkOp = endSingleInputPlanWithStr(fSpec);
-            sparkOp.setSplitter(true);
-            splitsSeen.put(op.getOperatorKey(), sparkOp);
-            curSparkOp = startNew(fSpec, sparkOp);
-            phyToSparkOpMap.put(op, curSparkOp);
-        }catch(Exception e){
-            int errCode = 2034;
-            String msg = "Error compiling operator " + 
op.getClass().getSimpleName();
-            throw new SparkCompilerException(msg, errCode, PigException.BUG, 
e);
-        }
-    }
-
-    public void visitDistinct(PODistinct op) throws VisitorException{
-        try{
-            nonBlocking(op);
-        }catch(Exception e){
-            int errCode = 2034;
-            String msg = "Error compiling operator " + 
op.getClass().getSimpleName();
-            throw new SparkCompilerException(msg, errCode, PigException.BUG, 
e);
-        }
-    }
-
-    private SparkOper endSingleInputPlanWithStr(FileSpec fSpec) throws 
PlanException{
-        if(compiledInputs.length>1) {
-            int errCode = 2023;
-            String msg = "Received a multi input physicalPlan when expecting 
only a single input one.";
-            throw new PlanException(msg, errCode, PigException.BUG);
-        }
-        SparkOper sparkOp = compiledInputs[0];  //  Load
-        POStore str = getStore();
-        str.setSFile(fSpec);
-        sparkOp.plan.addAsLeaf(str);
-        return sparkOp;
-    }
-
-    private POStore getStore(){
-        POStore st = new POStore(new 
OperatorKey(scope,nig.getNextNodeId(scope)));
-        // mark store as tmp store. These could be removed by the
-        // optimizer, because it wasn't the user requesting it.
-        st.setIsTmpStore(true);
-        return st;
-    }
-
-    @Override
-    public void visitLoad(POLoad op) throws VisitorException{
-        try{
-            nonBlocking(op);
-            phyToSparkOpMap.put(op, curSparkOp);
-        }catch(Exception e){
-            int errCode = 2034;
-            String msg = "Error compiling operator " + 
op.getClass().getSimpleName();
-            throw new SparkCompilerException(msg, errCode, PigException.BUG, 
e);
-        }
-    }
-
-    @Override
-    public void visitNative(PONative op) throws VisitorException{
-        try{
-            SparkOper nativesparkOpper = getNativeSparkOp(op.getNativeMRjar(), 
op.getParams());
-            sparkPlan.add(nativesparkOpper);
-            sparkPlan.connect(curSparkOp, nativesparkOpper);
-            phyToSparkOpMap.put(op, nativesparkOpper);
-            curSparkOp = nativesparkOpper;
-        }catch(Exception e){
-            int errCode = 2034;
-            String msg = "Error compiling operator " + 
op.getClass().getSimpleName();
-            throw new SparkCompilerException(msg, errCode, PigException.BUG, 
e);
-        }
-    }
-
-    private NativeSparkOper getNativeSparkOp(String sparkJar, String[] 
parameters) {
-        return new NativeSparkOper(new 
OperatorKey(scope,nig.getNextNodeId(scope)), sparkJar, parameters);
-    }
-
-    @Override
-    public void visitStore(POStore op) throws VisitorException{
-        try{
-            nonBlocking(op);
-            phyToSparkOpMap.put(op, curSparkOp);
-            if (op.getSFile()!=null && op.getSFile().getFuncSpec()!=null)
-                curSparkOp.UDFs.add(op.getSFile().getFuncSpec().toString());
-        }catch(Exception e){
-            int errCode = 2034;
-            String msg = "Error compiling operator " + 
op.getClass().getSimpleName();
-            throw new SparkCompilerException(msg, errCode, PigException.BUG, 
e);
-        }
-    }
-
-    @Override
-    public void visitFilter(POFilter op) throws VisitorException{
-        try{
-            nonBlocking(op);
-            processUDFs(op.getPlan());
-            phyToSparkOpMap.put(op, curSparkOp);
-        }catch(Exception e){
-            int errCode = 2034;
-            String msg = "Error compiling operator " + 
op.getClass().getSimpleName();
-            throw new SparkCompilerException(msg, errCode, PigException.BUG, 
e);
-        }
-    }
-
-    @Override
-    public void visitCross(POCross op) throws VisitorException {
-        try {
-            nonBlocking(op);
-            phyToSparkOpMap.put(op, curSparkOp);
-        } catch (Exception e) {
-            int errCode = 2034;
-            String msg = "Error compiling operator " + 
op.getClass().getSimpleName();
-            throw new SparkCompilerException(msg, errCode, PigException.BUG, 
e);
-        }
-    }
-
-    @Override
-    public void visitStream(POStream op) throws VisitorException{
-        try{
-            POStreamSpark poStreamSpark = new POStreamSpark(op);
-            nonBlocking(poStreamSpark);
-            phyToSparkOpMap.put(op, curSparkOp);
-        }catch(Exception e){
-            int errCode = 2034;
-            String msg = "Error compiling operator " + 
op.getClass().getSimpleName();
-            throw new SparkCompilerException(msg, errCode, PigException.BUG, 
e);
-        }
-    }
-
-    @Override
-    public void visitSort(POSort op) throws  VisitorException{
-        try{
-            nonBlocking(op);
-            phyToSparkOpMap.put(op, curSparkOp);
-        }catch(Exception e){
-            int errCode = 2034;
-            String msg = "Error compiling operator " + 
op.getClass().getSimpleName();
-            throw new SparkCompilerException(msg, errCode, PigException.BUG, 
e);
-        }
-    }
-
-    @Override
-    public void visitLimit(POLimit op) throws VisitorException {
-        try {
-            nonBlocking(op);
-        } catch (Exception e) {
-            int errCode = 2034;
-            String msg = "Error compiling operator " + 
op.getClass().getSimpleName();
-            throw new SparkCompilerException(msg, errCode, PigException.BUG, 
e);
-        }
-    }
-
-    @Override
-    public void visitLocalRearrange(POLocalRearrange op) throws 
VisitorException {
-        try{
-            nonBlocking(op);
-            List<PhysicalPlan> plans = op.getPlans();
-            if(plans!=null)
-                for(PhysicalPlan ep : plans)
-                    processUDFs(ep);
-            phyToSparkOpMap.put(op, curSparkOp);
-        }catch(Exception e){
-            int errCode = 2034;
-            String msg = "Error compiling operator " + 
op.getClass().getSimpleName();
-            throw new SparkCompilerException(msg, errCode, PigException.BUG, 
e);
-        }
-    }
-
-    @Override
-    public void visitCollectedGroup(POCollectedGroup op) throws 
VisitorException {
-        List<PhysicalOperator> roots = curSparkOp.plan.getRoots();
-        if(roots.size() != 1){
-            int errCode = 2171;
-            String errMsg = "Expected one but found more then one root 
physical operator in physical physicalPlan.";
-            throw new SparkCompilerException(errMsg,errCode,PigException.BUG);
-        }
-
-        PhysicalOperator phyOp = roots.get(0);
-        if(! (phyOp instanceof POLoad)){
-            int errCode = 2172;
-            String errMsg = "Expected physical operator at root to be POLoad. 
Found : "+phyOp.getClass().getCanonicalName();
-            throw new SparkCompilerException(errMsg,errCode,PigException.BUG);
-        }
-
-        LoadFunc loadFunc = ((POLoad)phyOp).getLoadFunc();
-        try {
-            
if(!(CollectableLoadFunc.class.isAssignableFrom(loadFunc.getClass()))){
-                int errCode = 2249;
-                throw new SparkCompilerException("While using 'collected' on 
group; data must be loaded via loader implementing CollectableLoadFunc.", 
errCode);
-            }
-            ((CollectableLoadFunc)loadFunc).ensureAllKeyInstancesInSameSplit();
-        } catch (SparkCompilerException e){
-            throw (e);
-        } catch (IOException e) {
-            int errCode = 2034;
-            String msg = "Error compiling operator " + 
op.getClass().getSimpleName();
-            throw new SparkCompilerException(msg, errCode, PigException.BUG, 
e);
-        }
-
-        try{
-            nonBlocking(op);
-            phyToSparkOpMap.put(op, curSparkOp);
-        }catch(Exception e){
-            int errCode = 2034;
-            String msg = "Error compiling operator " + 
op.getClass().getSimpleName();
-            throw new SparkCompilerException(msg, errCode, PigException.BUG, 
e);
-        }
-    }
-
-    @Override
-    public void visitPOForEach(POForEach op) throws VisitorException{
-        try{
-            nonBlocking(op);
-            List<PhysicalPlan> plans = op.getInputPlans();
-            if (plans != null) {
-                for (PhysicalPlan ep : plans) {
-                    processUDFs(ep);
-                }
-            }
-            phyToSparkOpMap.put(op, curSparkOp);
-        } catch (Exception e) {
-            int errCode = 2034;
-            String msg = "Error compiling operator " + 
op.getClass().getSimpleName();
-            throw new SparkCompilerException(msg, errCode, PigException.BUG, 
e);
-        }
-    }
-
-    @Override
-    public void visitGlobalRearrange(POGlobalRearrange op) throws 
VisitorException{
-        try{
-            blocking(op);
-            curSparkOp.customPartitioner = op.getCustomPartitioner();
-            phyToSparkOpMap.put(op, curSparkOp);
-        }catch(Exception e){
-            int errCode = 2034;
-            String msg = "Error compiling operator " + 
op.getClass().getSimpleName();
-            throw new SparkCompilerException(msg, errCode, PigException.BUG, 
e);
-        }
-    }
-
-    @Override
-    public void visitPackage(POPackage op) throws VisitorException{
-        try{
-            nonBlocking(op);
-            phyToSparkOpMap.put(op, curSparkOp);
-            if (op.getPkgr().getPackageType() == Packager.PackageType.JOIN) {
-                curSparkOp.markRegularJoin();
-            } else if (op.getPkgr().getPackageType() == 
Packager.PackageType.GROUP) {
-                if (op.getNumInps() == 1) {
-                    curSparkOp.markGroupBy();
-                } else if (op.getNumInps() > 1) {
-                    curSparkOp.markCogroup();
-                }
-            }
-
-        }catch(Exception e){
-            int errCode = 2034;
-            String msg = "Error compiling operator " + 
op.getClass().getSimpleName();
-            throw new SparkCompilerException(msg, errCode, PigException.BUG, 
e);
-        }
-    }
-
-    @Override
-    public void visitUnion(POUnion op) throws VisitorException{
-        try{
-            nonBlocking(op);
-            phyToSparkOpMap.put(op, curSparkOp);
-        }catch(Exception e){
-            int errCode = 2034;
-            String msg = "Error compiling operator " + 
op.getClass().getSimpleName();
-            throw new SparkCompilerException(msg, errCode, PigException.BUG, 
e);
-        }
-    }
-
-
-    @Override
-    public void visitSkewedJoin(POSkewedJoin op) throws VisitorException {
-     //TODO
-    }
-
-    @Override
-    public void visitFRJoin(POFRJoin op) throws VisitorException {
-      //TODO
-    }
-
-    @Override
-    public void visitMergeJoin(POMergeJoin joinOp) throws VisitorException {
-      //TODO
-    }
-
-    private void processUDFs(PhysicalPlan plan) throws VisitorException{
-        if(plan!=null){
-            //Process Scalars (UDF with referencedOperators)
-            ScalarPhyFinder scalarPhyFinder = new ScalarPhyFinder(plan);
-            scalarPhyFinder.visit();
-            curSparkOp.scalars.addAll(scalarPhyFinder.getScalars());
-
-            //Process UDFs
-            udfFinder.setPlan(plan);
-            udfFinder.visit();
-            curSparkOp.UDFs.addAll(udfFinder.getUDFs());
-        }
-    }
-
-    private void nonBlocking(PhysicalOperator op) throws PlanException, 
IOException{
-        SparkOper sparkOp=null;
-        if (compiledInputs.length == 1) {
-            sparkOp = compiledInputs[0];
-        } else {
-           sparkOp = merge(compiledInputs);
-        }
-        sparkOp.plan.addAsLeaf(op);
-        curSparkOp = sparkOp;
-    }
-
-    private void blocking(PhysicalOperator op) throws PlanException, 
IOException{
-        SparkOper sparkOp = getSparkOp();
-        sparkPlan.add(sparkOp);
-        for(SparkOper compileInput: compiledInputs){
-            sparkPlan.connect(compileInput, sparkOp);
-        }
-        sparkOp.plan.addAsLeaf(op);
-        curSparkOp = sparkOp;
-    }
-
-    private SparkOper merge(SparkOper[] compiledInputs)throws PlanException  {
-        SparkOper ret = getSparkOp();
-        sparkPlan.add(ret);
-
-        Set<SparkOper> toBeConnected = new HashSet<SparkOper>();
-        List<SparkOper> toBeRemoved = new ArrayList<SparkOper>();
-
-        List<PhysicalPlan> toBeMerged = new ArrayList<PhysicalPlan>();
-
-        for (SparkOper sparkOp : compiledInputs) {
-             toBeRemoved.add(sparkOp);
-             toBeMerged.add(sparkOp.plan);
-             List<SparkOper> predecessors = sparkPlan.getPredecessors(sparkOp);
-             if( predecessors != null){
-                  for( SparkOper predecessorSparkOp: predecessors){
-                      toBeConnected.add(predecessorSparkOp);
-                  }
-             }
-        }
-        merge(ret.plan, toBeMerged);
-
-        Iterator<SparkOper> it = toBeConnected.iterator();
-        while(it.hasNext())
-            sparkPlan.connect(it.next(), ret);
-        for(SparkOper removeSparkOp : toBeRemoved){
-            if(removeSparkOp.requestedParallelism > ret.requestedParallelism)
-                ret.requestedParallelism = removeSparkOp.requestedParallelism;
-            for (String udf:removeSparkOp.UDFs)
-            {
-                if (!ret.UDFs.contains(udf))
-                    ret.UDFs.add(udf);
-            }
-            // We also need to change scalar marking
-            for(PhysicalOperator physOp: removeSparkOp.scalars) {
-                if(!ret.scalars.contains(physOp)) {
-                    ret.scalars.add(physOp);
-                }
-            }
-            Set<PhysicalOperator> opsToChange = new 
HashSet<PhysicalOperator>();
-            for (Map.Entry<PhysicalOperator, SparkOper> entry : 
phyToSparkOpMap.entrySet()) {
-                if (entry.getValue()==removeSparkOp) {
-                    opsToChange.add(entry.getKey());
-                }
-            }
-            for (PhysicalOperator op : opsToChange) {
-                phyToSparkOpMap.put(op, ret);
-            }
-
-            sparkPlan.remove(removeSparkOp);
-        }
-        return ret;
-    }
-
-    /**
-     * The merge of a list of plans into a single physicalPlan
-     * @param <O>
-     * @param <E>
-     * @param finPlan - Final Plan into which the list of plans is merged
-     * @param plans - list of plans to be merged
-     * @throws PlanException
-     */
-    private <O extends Operator<?>, E extends OperatorPlan<O>> void merge(
-            E finPlan, List<E> plans) throws PlanException {
-        for (E e : plans) {
-            finPlan.merge(e);
-        }
-    }
+       private SparkOperator curSparkOp;
+
+       private String scope;
+
+       private SparkOperator[] compiledInputs = null;
+
+       private Map<OperatorKey, SparkOperator> splitsSeen;
+
+       private NodeIdGenerator nig;
+
+       private Map<PhysicalOperator, SparkOperator> phyToSparkOpMap;
+       private UDFFinder udfFinder;
+
+       public SparkCompiler(PhysicalPlan physicalPlan, PigContext pigContext) {
+               super(physicalPlan,
+                               new DepthFirstWalker<PhysicalOperator, 
PhysicalPlan>(
+                                               physicalPlan));
+               this.physicalPlan = physicalPlan;
+               this.pigContext = pigContext;
+               this.sparkPlan = new SparkOperPlan();
+               this.phyToSparkOpMap = new HashMap<PhysicalOperator, 
SparkOperator>();
+               this.udfFinder = new UDFFinder();
+               this.nig = NodeIdGenerator.getGenerator();
+               this.splitsSeen = new HashMap<OperatorKey, SparkOperator>();
+
+       }
+
+       public void compile() throws IOException, PlanException, 
VisitorException {
+               List<PhysicalOperator> roots = physicalPlan.getRoots();
+               if ((roots == null) || (roots.size() <= 0)) {
+                       int errCode = 2053;
+                       String msg = "Internal error. Did not find roots in the 
physical physicalPlan.";
+                       throw new SparkCompilerException(msg, errCode, 
PigException.BUG);
+               }
+               scope = roots.get(0).getOperatorKey().getScope();
+               List<PhysicalOperator> leaves = physicalPlan.getLeaves();
+
+               if (!pigContext.inIllustrator)
+                       for (PhysicalOperator op : leaves) {
+                               if (!(op instanceof POStore)) {
+                                       int errCode = 2025;
+                                       String msg = "Expected leaf of reduce 
physicalPlan to "
+                                                       + "always be POStore. 
Found "
+                                                       + 
op.getClass().getSimpleName();
+                                       throw new SparkCompilerException(msg, 
errCode,
+                                                       PigException.BUG);
+                               }
+                       }
+
+               // get all stores and nativeSpark operators, sort them in 
order(operator
+               // id)
+               // and compile their plans
+               List<POStore> stores = 
PlanHelper.getPhysicalOperators(physicalPlan,
+                               POStore.class);
+               List<PONative> nativeSparks = PlanHelper.getPhysicalOperators(
+                               physicalPlan, PONative.class);
+               List<PhysicalOperator> ops;
+               if (!pigContext.inIllustrator) {
+                       ops = new ArrayList<PhysicalOperator>(stores.size()
+                                       + nativeSparks.size());
+                       ops.addAll(stores);
+               } else {
+                       ops = new ArrayList<PhysicalOperator>(leaves.size()
+                                       + nativeSparks.size());
+                       ops.addAll(leaves);
+               }
+               ops.addAll(nativeSparks);
+               Collections.sort(ops);
+
+               for (PhysicalOperator op : ops) {
+                       compile(op);
+               }
+       }
+
+       /**
+        * Compiles the physicalPlan below op into a Spark Operator and stores 
it in
+        * curSparkOp.
+        * 
+        * @param op
+        * @throws IOException
+        * @throws PlanException
+        * @throws VisitorException
+        */
+       private void compile(PhysicalOperator op) throws IOException,
+                       PlanException, VisitorException {
+               SparkOperator[] prevCompInp = compiledInputs;
+
+               List<PhysicalOperator> predecessors = 
physicalPlan.getPredecessors(op);
+               if (op instanceof PONative) {
+                       // the predecessor (store) has already been processed
+                       // don't process it again
+               } else if (predecessors != null && predecessors.size() > 0) {
+                       // When processing an entire script (multiquery), we can
+                       // get into a situation where a load has
+                       // predecessors. This means that it depends on some 
store
+                       // earlier in the physicalPlan. We need to take that 
dependency
+                       // and connect the respective Spark operators, while at 
the
+                       // same time removing the connection between the 
Physical
+                       // operators. That way the jobs will run in the right
+                       // order.
+                       if (op instanceof POLoad) {
+
+                               if (predecessors.size() != 1) {
+                                       int errCode = 2125;
+                                       String msg = "Expected at most one 
predecessor of load. Got "
+                                                       + predecessors.size();
+                                       throw new PlanException(msg, errCode, 
PigException.BUG);
+                               }
+
+                               PhysicalOperator p = predecessors.get(0);
+                               SparkOperator oper = null;
+                               if (p instanceof POStore || p instanceof 
PONative) {
+                                       oper = phyToSparkOpMap.get(p);
+                               } else {
+                                       int errCode = 2126;
+                                       String msg = "Predecessor of load 
should be a store or spark operator. Got "
+                                                       + p.getClass();
+                                       throw new PlanException(msg, errCode, 
PigException.BUG);
+                               }
+
+                               // Need new operator
+                               curSparkOp = getSparkOp();
+                               curSparkOp.add(op);
+                               sparkPlan.add(curSparkOp);
+                               physicalPlan.disconnect(op, p);
+                               sparkPlan.connect(oper, curSparkOp);
+                               phyToSparkOpMap.put(op, curSparkOp);
+                               return;
+                       }
+
+                       Collections.sort(predecessors);
+                       compiledInputs = new SparkOperator[predecessors.size()];
+                       int i = -1;
+                       for (PhysicalOperator pred : predecessors) {
+                               if (pred instanceof POSplit
+                                               && 
splitsSeen.containsKey(pred.getOperatorKey())) {
+                                       compiledInputs[++i] = startNew(
+                                                       ((POSplit) 
pred).getSplitStore(),
+                                                       
splitsSeen.get(pred.getOperatorKey()));
+                                       continue;
+                               }
+                               compile(pred);
+                               compiledInputs[++i] = curSparkOp;
+                       }
+               } else {
+                       // No predecessors. Mostly a load. But this is where
+                       // we start. We create a new sparkOp and add its first
+                       // operator op. Also this should be added to the 
sparkPlan.
+                       curSparkOp = getSparkOp();
+                       curSparkOp.add(op);
+                       if (op != null && op instanceof POLoad) {
+                               if (((POLoad) op).getLFile() != null
+                                               && ((POLoad) 
op).getLFile().getFuncSpec() != null)
+                                       curSparkOp.UDFs.add(((POLoad) 
op).getLFile().getFuncSpec()
+                                                       .toString());
+                       }
+                       sparkPlan.add(curSparkOp);
+                       phyToSparkOpMap.put(op, curSparkOp);
+                       return;
+               }
+               op.visit(this);
+               compiledInputs = prevCompInp;
+       }
+
+       private SparkOperator getSparkOp() {
+               return new SparkOperator(OperatorKey.genOpKey(scope));
+       }
+
+       public SparkOperPlan getSparkPlan() {
+               return sparkPlan;
+       }
+
+       public void connectSoftLink() throws PlanException, IOException {
+               for (PhysicalOperator op : physicalPlan) {
+                       if (physicalPlan.getSoftLinkPredecessors(op) != null) {
+                               for (PhysicalOperator pred : physicalPlan
+                                               .getSoftLinkPredecessors(op)) {
+                                       SparkOperator from = 
phyToSparkOpMap.get(pred);
+                                       SparkOperator to = 
phyToSparkOpMap.get(op);
+                                       if (from == to)
+                                               continue;
+                                       if (sparkPlan.getPredecessors(to) == 
null
+                                                       || 
!sparkPlan.getPredecessors(to).contains(from)) {
+                                               sparkPlan.connect(from, to);
+                                       }
+                               }
+                       }
+               }
+       }
+
+       private SparkOperator startNew(FileSpec fSpec, SparkOperator old)
+                       throws PlanException {
+               POLoad ld = getLoad();
+               ld.setLFile(fSpec);
+               SparkOperator ret = getSparkOp();
+               ret.add(ld);
+               sparkPlan.add(ret);
+               sparkPlan.connect(old, ret);
+               return ret;
+       }
+
+       private POLoad getLoad() {
+               POLoad ld = new POLoad(new OperatorKey(scope, 
nig.getNextNodeId(scope)));
+               ld.setPc(pigContext);
+               ld.setIsTmpLoad(true);
+               return ld;
+       }
+
+       @Override
+       public void visitSplit(POSplit op) throws VisitorException {
+               try {
+                       FileSpec fSpec = op.getSplitStore();
+                       SparkOperator sparkOp = 
endSingleInputPlanWithStr(fSpec);
+                       sparkOp.setSplitter(true);
+                       splitsSeen.put(op.getOperatorKey(), sparkOp);
+                       curSparkOp = startNew(fSpec, sparkOp);
+                       phyToSparkOpMap.put(op, curSparkOp);
+               } catch (Exception e) {
+                       int errCode = 2034;
+                       String msg = "Error compiling operator "
+                                       + op.getClass().getSimpleName();
+                       throw new SparkCompilerException(msg, errCode, 
PigException.BUG, e);
+               }
+       }
+
+       public void visitDistinct(PODistinct op) throws VisitorException {
+               try {
+                       nonBlocking(op);
+               } catch (Exception e) {
+                       int errCode = 2034;
+                       String msg = "Error compiling operator "
+                                       + op.getClass().getSimpleName();
+                       throw new SparkCompilerException(msg, errCode, 
PigException.BUG, e);
+               }
+       }
+
+       private SparkOperator endSingleInputPlanWithStr(FileSpec fSpec)
+                       throws PlanException {
+               if (compiledInputs.length > 1) {
+                       int errCode = 2023;
+                       String msg = "Received a multi input physicalPlan when 
expecting only a single input one.";
+                       throw new PlanException(msg, errCode, PigException.BUG);
+               }
+               SparkOperator sparkOp = compiledInputs[0]; // Load
+               POStore str = getStore();
+               str.setSFile(fSpec);
+               sparkOp.physicalPlan.addAsLeaf(str);
+               return sparkOp;
+       }
+
+       private POStore getStore() {
+               POStore st = new POStore(new OperatorKey(scope,
+                               nig.getNextNodeId(scope)));
+               // mark store as tmp store. These could be removed by the
+               // optimizer, because it wasn't the user requesting it.
+               st.setIsTmpStore(true);
+               return st;
+       }
+
+       @Override
+       public void visitLoad(POLoad op) throws VisitorException {
+               try {
+                       nonBlocking(op);
+                       phyToSparkOpMap.put(op, curSparkOp);
+               } catch (Exception e) {
+                       int errCode = 2034;
+                       String msg = "Error compiling operator "
+                                       + op.getClass().getSimpleName();
+                       throw new SparkCompilerException(msg, errCode, 
PigException.BUG, e);
+               }
+       }
+
+       @Override
+       public void visitNative(PONative op) throws VisitorException {
+               try {
+                       SparkOperator nativesparkOpper = getNativeSparkOp(
+                                       op.getNativeMRjar(), op.getParams());
+                       sparkPlan.add(nativesparkOpper);
+                       sparkPlan.connect(curSparkOp, nativesparkOpper);
+                       phyToSparkOpMap.put(op, nativesparkOpper);
+                       curSparkOp = nativesparkOpper;
+               } catch (Exception e) {
+                       int errCode = 2034;
+                       String msg = "Error compiling operator "
+                                       + op.getClass().getSimpleName();
+                       throw new SparkCompilerException(msg, errCode, 
PigException.BUG, e);
+               }
+       }
+
+       private NativeSparkOperator getNativeSparkOp(String sparkJar,
+                       String[] parameters) {
+               return new NativeSparkOperator(new OperatorKey(scope,
+                               nig.getNextNodeId(scope)), sparkJar, 
parameters);
+       }
+
+       @Override
+       public void visitStore(POStore op) throws VisitorException {
+               try {
+                       nonBlocking(op);
+                       phyToSparkOpMap.put(op, curSparkOp);
+                       if (op.getSFile() != null && 
op.getSFile().getFuncSpec() != null)
+                               
curSparkOp.UDFs.add(op.getSFile().getFuncSpec().toString());
+               } catch (Exception e) {
+                       int errCode = 2034;
+                       String msg = "Error compiling operator "
+                                       + op.getClass().getSimpleName();
+                       throw new SparkCompilerException(msg, errCode, 
PigException.BUG, e);
+               }
+       }
+
+       @Override
+       public void visitFilter(POFilter op) throws VisitorException {
+               try {
+                       nonBlocking(op);
+                       processUDFs(op.getPlan());
+                       phyToSparkOpMap.put(op, curSparkOp);
+               } catch (Exception e) {
+                       int errCode = 2034;
+                       String msg = "Error compiling operator "
+                                       + op.getClass().getSimpleName();
+                       throw new SparkCompilerException(msg, errCode, 
PigException.BUG, e);
+               }
+       }
+
+       @Override
+       public void visitCross(POCross op) throws VisitorException {
+               try {
+                       nonBlocking(op);
+                       phyToSparkOpMap.put(op, curSparkOp);
+               } catch (Exception e) {
+                       int errCode = 2034;
+                       String msg = "Error compiling operator "
+                                       + op.getClass().getSimpleName();
+                       throw new SparkCompilerException(msg, errCode, 
PigException.BUG, e);
+               }
+       }
+
+       @Override
+       public void visitStream(POStream op) throws VisitorException {
+               try {
+                       POStreamSpark poStreamSpark = new POStreamSpark(op);
+                       nonBlocking(poStreamSpark);
+                       phyToSparkOpMap.put(op, curSparkOp);
+               } catch (Exception e) {
+                       int errCode = 2034;
+                       String msg = "Error compiling operator "
+                                       + op.getClass().getSimpleName();
+                       throw new SparkCompilerException(msg, errCode, 
PigException.BUG, e);
+               }
+       }
+
+       @Override
+       public void visitSort(POSort op) throws VisitorException {
+               try {
+                       nonBlocking(op);
+                       phyToSparkOpMap.put(op, curSparkOp);
+               } catch (Exception e) {
+                       int errCode = 2034;
+                       String msg = "Error compiling operator "
+                                       + op.getClass().getSimpleName();
+                       throw new SparkCompilerException(msg, errCode, 
PigException.BUG, e);
+               }
+       }
+
+       @Override
+       public void visitLimit(POLimit op) throws VisitorException {
+               try {
+                       nonBlocking(op);
+               } catch (Exception e) {
+                       int errCode = 2034;
+                       String msg = "Error compiling operator "
+                                       + op.getClass().getSimpleName();
+                       throw new SparkCompilerException(msg, errCode, 
PigException.BUG, e);
+               }
+       }
+
+       @Override
+       public void visitLocalRearrange(POLocalRearrange op)
+                       throws VisitorException {
+               try {
+                       nonBlocking(op);
+                       List<PhysicalPlan> plans = op.getPlans();
+                       if (plans != null)
+                               for (PhysicalPlan ep : plans)
+                                       processUDFs(ep);
+                       phyToSparkOpMap.put(op, curSparkOp);
+               } catch (Exception e) {
+                       int errCode = 2034;
+                       String msg = "Error compiling operator "
+                                       + op.getClass().getSimpleName();
+                       throw new SparkCompilerException(msg, errCode, 
PigException.BUG, e);
+               }
+       }
+
+       @Override
+       public void visitCollectedGroup(POCollectedGroup op)
+                       throws VisitorException {
+               List<PhysicalOperator> roots = 
curSparkOp.physicalPlan.getRoots();
+               if (roots.size() != 1) {
+                       int errCode = 2171;
+                       String errMsg = "Expected one but found more then one 
root physical operator in physical physicalPlan.";
+                       throw new SparkCompilerException(errMsg, errCode, 
PigException.BUG);
+               }
+
+               PhysicalOperator phyOp = roots.get(0);
+               if (!(phyOp instanceof POLoad)) {
+                       int errCode = 2172;
+                       String errMsg = "Expected physical operator at root to 
be POLoad. Found : "
+                                       + phyOp.getClass().getCanonicalName();
+                       throw new SparkCompilerException(errMsg, errCode, 
PigException.BUG);
+               }
+
+               LoadFunc loadFunc = ((POLoad) phyOp).getLoadFunc();
+               try {
+                       if 
(!(CollectableLoadFunc.class.isAssignableFrom(loadFunc
+                                       .getClass()))) {
+                               int errCode = 2249;
+                               throw new SparkCompilerException(
+                                               "While using 'collected' on 
group; data must be loaded via loader implementing CollectableLoadFunc.",
+                                               errCode);
+                       }
+                       ((CollectableLoadFunc) 
loadFunc).ensureAllKeyInstancesInSameSplit();
+               } catch (SparkCompilerException e) {
+                       throw (e);
+               } catch (IOException e) {
+                       int errCode = 2034;
+                       String msg = "Error compiling operator "
+                                       + op.getClass().getSimpleName();
+                       throw new SparkCompilerException(msg, errCode, 
PigException.BUG, e);
+               }
+
+               try {
+                       nonBlocking(op);
+                       phyToSparkOpMap.put(op, curSparkOp);
+               } catch (Exception e) {
+                       int errCode = 2034;
+                       String msg = "Error compiling operator "
+                                       + op.getClass().getSimpleName();
+                       throw new SparkCompilerException(msg, errCode, 
PigException.BUG, e);
+               }
+       }
+
+       @Override
+       public void visitPOForEach(POForEach op) throws VisitorException {
+               try {
+                       nonBlocking(op);
+                       List<PhysicalPlan> plans = op.getInputPlans();
+                       if (plans != null) {
+                               for (PhysicalPlan ep : plans) {
+                                       processUDFs(ep);
+                               }
+                       }
+                       phyToSparkOpMap.put(op, curSparkOp);
+               } catch (Exception e) {
+                       int errCode = 2034;
+                       String msg = "Error compiling operator "
+                                       + op.getClass().getSimpleName();
+                       throw new SparkCompilerException(msg, errCode, 
PigException.BUG, e);
+               }
+       }
+
+       @Override
+       public void visitGlobalRearrange(POGlobalRearrange op)
+                       throws VisitorException {
+               try {
+                       blocking(op);
+                       curSparkOp.customPartitioner = 
op.getCustomPartitioner();
+                       phyToSparkOpMap.put(op, curSparkOp);
+               } catch (Exception e) {
+                       int errCode = 2034;
+                       String msg = "Error compiling operator "
+                                       + op.getClass().getSimpleName();
+                       throw new SparkCompilerException(msg, errCode, 
PigException.BUG, e);
+               }
+       }
+
+       @Override
+       public void visitPackage(POPackage op) throws VisitorException {
+               try {
+                       nonBlocking(op);
+                       phyToSparkOpMap.put(op, curSparkOp);
+                       if (op.getPkgr().getPackageType() == 
Packager.PackageType.JOIN) {
+                               curSparkOp.markRegularJoin();
+                       } else if (op.getPkgr().getPackageType() == 
Packager.PackageType.GROUP) {
+                               if (op.getNumInps() == 1) {
+                                       curSparkOp.markGroupBy();
+                               } else if (op.getNumInps() > 1) {
+                                       curSparkOp.markCogroup();
+                               }
+                       }
+
+               } catch (Exception e) {
+                       int errCode = 2034;
+                       String msg = "Error compiling operator "
+                                       + op.getClass().getSimpleName();
+                       throw new SparkCompilerException(msg, errCode, 
PigException.BUG, e);
+               }
+       }
+
+       @Override
+       public void visitUnion(POUnion op) throws VisitorException {
+               try {
+                       nonBlocking(op);
+                       phyToSparkOpMap.put(op, curSparkOp);
+               } catch (Exception e) {
+                       int errCode = 2034;
+                       String msg = "Error compiling operator "
+                                       + op.getClass().getSimpleName();
+                       throw new SparkCompilerException(msg, errCode, 
PigException.BUG, e);
+               }
+       }
+
+       @Override
+       public void visitSkewedJoin(POSkewedJoin op) throws VisitorException {
+               // TODO
+       }
+
+       @Override
+       public void visitFRJoin(POFRJoin op) throws VisitorException {
+               // TODO
+       }
+
+       @Override
+       public void visitMergeJoin(POMergeJoin joinOp) throws VisitorException {
+               // TODO
+       }
+
+       private void processUDFs(PhysicalPlan plan) throws VisitorException {
+               if (plan != null) {
+                       // Process Scalars (UDF with referencedOperators)
+                       ScalarPhyFinder scalarPhyFinder = new 
ScalarPhyFinder(plan);
+                       scalarPhyFinder.visit();
+                       curSparkOp.scalars.addAll(scalarPhyFinder.getScalars());
+
+                       // Process UDFs
+                       udfFinder.setPlan(plan);
+                       udfFinder.visit();
+                       curSparkOp.UDFs.addAll(udfFinder.getUDFs());
+               }
+       }
+
+       private void nonBlocking(PhysicalOperator op) throws PlanException,
+                       IOException {
+               SparkOperator sparkOp = null;
+               if (compiledInputs.length == 1) {
+                       sparkOp = compiledInputs[0];
+               } else {
+                       sparkOp = merge(compiledInputs);
+               }
+               sparkOp.physicalPlan.addAsLeaf(op);
+               curSparkOp = sparkOp;
+       }
+
+       private void blocking(PhysicalOperator op) throws PlanException,
+                       IOException {
+               SparkOperator sparkOp = getSparkOp();
+               sparkPlan.add(sparkOp);
+               for (SparkOperator compileInput : compiledInputs) {
+                       sparkPlan.connect(compileInput, sparkOp);
+               }
+               sparkOp.physicalPlan.addAsLeaf(op);
+               curSparkOp = sparkOp;
+       }
+
+       private SparkOperator merge(SparkOperator[] compiledInputs)
+                       throws PlanException {
+               SparkOperator ret = getSparkOp();
+               sparkPlan.add(ret);
+
+               Set<SparkOperator> toBeConnected = new HashSet<SparkOperator>();
+               List<SparkOperator> toBeRemoved = new 
ArrayList<SparkOperator>();
+
+               List<PhysicalPlan> toBeMerged = new ArrayList<PhysicalPlan>();
+
+               for (SparkOperator sparkOp : compiledInputs) {
+                       toBeRemoved.add(sparkOp);
+                       toBeMerged.add(sparkOp.physicalPlan);
+                       List<SparkOperator> predecessors = sparkPlan
+                                       .getPredecessors(sparkOp);
+                       if (predecessors != null) {
+                               for (SparkOperator predecessorSparkOp : 
predecessors) {
+                                       toBeConnected.add(predecessorSparkOp);
+                               }
+                       }
+               }
+               merge(ret.physicalPlan, toBeMerged);
+
+               Iterator<SparkOperator> it = toBeConnected.iterator();
+               while (it.hasNext())
+                       sparkPlan.connect(it.next(), ret);
+               for (SparkOperator removeSparkOp : toBeRemoved) {
+                       if (removeSparkOp.requestedParallelism > 
ret.requestedParallelism)
+                               ret.requestedParallelism = 
removeSparkOp.requestedParallelism;
+                       for (String udf : removeSparkOp.UDFs) {
+                               if (!ret.UDFs.contains(udf))
+                                       ret.UDFs.add(udf);
+                       }
+                       // We also need to change scalar marking
+                       for (PhysicalOperator physOp : removeSparkOp.scalars) {
+                               if (!ret.scalars.contains(physOp)) {
+                                       ret.scalars.add(physOp);
+                               }
+                       }
+                       Set<PhysicalOperator> opsToChange = new 
HashSet<PhysicalOperator>();
+                       for (Map.Entry<PhysicalOperator, SparkOperator> entry : 
phyToSparkOpMap
+                                       .entrySet()) {
+                               if (entry.getValue() == removeSparkOp) {
+                                       opsToChange.add(entry.getKey());
+                               }
+                       }
+                       for (PhysicalOperator op : opsToChange) {
+                               phyToSparkOpMap.put(op, ret);
+                       }
+
+                       sparkPlan.remove(removeSparkOp);
+               }
+               return ret;
+       }
+
+       /**
+        * The merge of a list of plans into a single physicalPlan
+        * 
+        * @param <O>
+        * @param <E>
+        * @param finPlan
+        *            - Final Plan into which the list of plans is merged
+        * @param plans
+        *            - list of plans to be merged
+        * @throws PlanException
+        */
+       private <O extends Operator<?>, E extends OperatorPlan<O>> void merge(
+                       E finPlan, List<E> plans) throws PlanException {
+               for (E e : plans) {
+                       finPlan.merge(e);
+               }
+       }
 }

Modified: 
pig/branches/spark/src/org/apache/pig/backend/hadoop/executionengine/spark/plan/SparkOpPlanVisitor.java
URL: 
http://svn.apache.org/viewvc/pig/branches/spark/src/org/apache/pig/backend/hadoop/executionengine/spark/plan/SparkOpPlanVisitor.java?rev=1665404&r1=1665403&r2=1665404&view=diff
==============================================================================
--- 
pig/branches/spark/src/org/apache/pig/backend/hadoop/executionengine/spark/plan/SparkOpPlanVisitor.java
 (original)
+++ 
pig/branches/spark/src/org/apache/pig/backend/hadoop/executionengine/spark/plan/SparkOpPlanVisitor.java
 Tue Mar 10 04:37:36 2015
@@ -24,15 +24,18 @@ import org.apache.pig.impl.plan.VisitorE
 /**
  * A visitor for the SparkOperPlan class
  */
-public class SparkOpPlanVisitor  extends PlanVisitor<SparkOper, SparkOperPlan> 
{
+public class SparkOpPlanVisitor extends
+               PlanVisitor<SparkOperator, SparkOperPlan> {
 
-    public SparkOpPlanVisitor(SparkOperPlan plan, PlanWalker<SparkOper, 
SparkOperPlan> walker) {
-        super(plan, walker);
-        // TODO Auto-generated constructor stub
-    }
+       public SparkOpPlanVisitor(SparkOperPlan plan,
+                       PlanWalker<SparkOperator, SparkOperPlan> walker) {
+               super(plan, walker);
+               // TODO Auto-generated constructor stub
+       }
 
-    public void visitSparkOp(SparkOper sparkOper) throws VisitorException {
-        // TODO Auto-generated method stub
-    }
+       public void visitSparkOp(SparkOperator sparkOperator)
+                       throws VisitorException {
+               // TODO Auto-generated method stub
+       }
 
 }

Modified: 
pig/branches/spark/src/org/apache/pig/backend/hadoop/executionengine/spark/plan/SparkOperPlan.java
URL: 
http://svn.apache.org/viewvc/pig/branches/spark/src/org/apache/pig/backend/hadoop/executionengine/spark/plan/SparkOperPlan.java?rev=1665404&r1=1665403&r2=1665404&view=diff
==============================================================================
--- 
pig/branches/spark/src/org/apache/pig/backend/hadoop/executionengine/spark/plan/SparkOperPlan.java
 (original)
+++ 
pig/branches/spark/src/org/apache/pig/backend/hadoop/executionengine/spark/plan/SparkOperPlan.java
 Tue Mar 10 04:37:36 2015
@@ -24,23 +24,23 @@ import org.apache.pig.impl.plan.Operator
 import org.apache.pig.impl.plan.VisitorException;
 
 /**
- * A Plan used to create the plan of
- * Spark Operators
+ * A Plan used to create the physicalPlan of Spark Operators
  */
-public class SparkOperPlan extends OperatorPlan<SparkOper> {
+public class SparkOperPlan extends OperatorPlan<SparkOperator> {
 
-    @Override
-    public String toString() {
-        ByteArrayOutputStream baos = new ByteArrayOutputStream();
-        PrintStream ps = new PrintStream(baos);
-        SparkPrinter printer = new SparkPrinter(ps, this);
-        printer.setVerbose(true);
-        try {
-            printer.visit();
-        } catch (VisitorException e) {
-            // TODO Auto-generated catch block
-            throw new RuntimeException("Unable to get String representation of 
plan:"+e, e );
-        }
-        return baos.toString();
-    }
+       @Override
+       public String toString() {
+               ByteArrayOutputStream baos = new ByteArrayOutputStream();
+               PrintStream ps = new PrintStream(baos);
+               SparkPrinter printer = new SparkPrinter(ps, this);
+               printer.setVerbose(true);
+               try {
+                       printer.visit();
+               } catch (VisitorException e) {
+                       // TODO Auto-generated catch block
+                       throw new RuntimeException(
+                                       "Unable to get String representation of 
plan:" + e, e);
+               }
+               return baos.toString();
+       }
 }

Added: 
pig/branches/spark/src/org/apache/pig/backend/hadoop/executionengine/spark/plan/SparkOperator.java
URL: 
http://svn.apache.org/viewvc/pig/branches/spark/src/org/apache/pig/backend/hadoop/executionengine/spark/plan/SparkOperator.java?rev=1665404&view=auto
==============================================================================
--- 
pig/branches/spark/src/org/apache/pig/backend/hadoop/executionengine/spark/plan/SparkOperator.java
 (added)
+++ 
pig/branches/spark/src/org/apache/pig/backend/hadoop/executionengine/spark/plan/SparkOperator.java
 Tue Mar 10 04:37:36 2015
@@ -0,0 +1,195 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.pig.backend.hadoop.executionengine.spark.plan;
+
+import java.util.HashSet;
+import java.util.Set;
+
+import 
org.apache.pig.backend.hadoop.executionengine.physicalLayer.PhysicalOperator;
+import 
org.apache.pig.backend.hadoop.executionengine.physicalLayer.plans.PhysicalPlan;
+import org.apache.pig.impl.plan.Operator;
+import org.apache.pig.impl.plan.OperatorKey;
+import org.apache.pig.impl.plan.VisitorException;
+
+/**
+ * An operator model for a Spark job. Acts as a host to the plans that will
+ * execute in spark.
+ */
+public class SparkOperator extends Operator<SparkOpPlanVisitor> {
+       private static enum OPER_FEATURE {
+               NONE,
+               // Indicate if this job is a sampling job
+               SAMPLER,
+               // Indicate if this job is a merge indexer
+               INDEXER,
+               // Indicate if this job is a group by job
+               GROUPBY,
+               // Indicate if this job is a cogroup job
+               COGROUP,
+               // Indicate if this job is a regular join job
+               HASHJOIN;
+       };
+
+       public PhysicalPlan physicalPlan;
+
+       public Set<String> UDFs;
+
+       /* Name of the Custom Partitioner used */
+       public String customPartitioner = null;
+
+       public Set<PhysicalOperator> scalars;
+
+       public boolean isUDFComparatorUsed = false;
+
+       public int requestedParallelism = -1;
+
+       private OPER_FEATURE feature = OPER_FEATURE.NONE;
+
+       private boolean splitter = false;
+
+       // Name of the partition file generated by sampling process,
+       // Used by Skewed Join
+       private String skewedJoinPartitionFile;
+
+       private boolean usingTypedComparator = false;
+
+       private boolean combineSmallSplits = true;
+
+       public SparkOperator(OperatorKey k) {
+               super(k);
+               physicalPlan = new PhysicalPlan();
+               UDFs = new HashSet<String>();
+               scalars = new HashSet<PhysicalOperator>();
+       }
+
+       @Override
+       public boolean supportsMultipleInputs() {
+               return true;
+       }
+
+       @Override
+       public boolean supportsMultipleOutputs() {
+               return true;
+       }
+
+       @Override
+       public String name() {
+               String udfStr = getUDFsAsStr();
+               StringBuilder sb = new StringBuilder("Spark" + "("
+                               + requestedParallelism + (udfStr.equals("") ? 
"" : ",")
+                               + udfStr + ")" + " - " + mKey.toString());
+               return sb.toString();
+       }
+
+       private String getUDFsAsStr() {
+               StringBuilder sb = new StringBuilder();
+               if (UDFs != null && UDFs.size() > 0) {
+                       for (String str : UDFs) {
+                               sb.append(str.substring(str.lastIndexOf('.') + 
1));
+                               sb.append(',');
+                       }
+                       sb.deleteCharAt(sb.length() - 1);
+               }
+               return sb.toString();
+       }
+
+       public void add(PhysicalOperator physicalOper) {
+               this.physicalPlan.add(physicalOper);
+       }
+
+       @Override
+       public void visit(SparkOpPlanVisitor v) throws VisitorException {
+               v.visitSparkOp(this);
+       }
+
+       public boolean isGroupBy() {
+               return (feature == OPER_FEATURE.GROUPBY);
+       }
+
+       public void markGroupBy() {
+               feature = OPER_FEATURE.GROUPBY;
+       }
+
+       public boolean isCogroup() {
+               return (feature == OPER_FEATURE.COGROUP);
+       }
+
+       public void markCogroup() {
+               feature = OPER_FEATURE.COGROUP;
+       }
+
+       public boolean isRegularJoin() {
+               return (feature == OPER_FEATURE.HASHJOIN);
+       }
+
+       public void markRegularJoin() {
+               feature = OPER_FEATURE.HASHJOIN;
+       }
+
+       public int getRequestedParallelism() {
+               return requestedParallelism;
+       }
+
+       public void setSplitter(boolean spl) {
+               splitter = spl;
+       }
+
+       public boolean isSplitter() {
+               return splitter;
+       }
+
+       public boolean isSampler() {
+               return (feature == OPER_FEATURE.SAMPLER);
+       }
+
+       public void markSampler() {
+               feature = OPER_FEATURE.SAMPLER;
+       }
+
+       public void setSkewedJoinPartitionFile(String file) {
+               skewedJoinPartitionFile = file;
+       }
+
+       public String getSkewedJoinPartitionFile() {
+               return skewedJoinPartitionFile;
+       }
+
+       protected boolean usingTypedComparator() {
+               return usingTypedComparator;
+       }
+
+       protected void useTypedComparator(boolean useTypedComparator) {
+               this.usingTypedComparator = useTypedComparator;
+       }
+
+       protected void noCombineSmallSplits() {
+               combineSmallSplits = false;
+       }
+
+       public boolean combineSmallSplits() {
+               return combineSmallSplits;
+       }
+
+       public boolean isIndexer() {
+               return (feature == OPER_FEATURE.INDEXER);
+       }
+
+       public void markIndexer() {
+               feature = OPER_FEATURE.INDEXER;
+       }
+}

Modified: 
pig/branches/spark/src/org/apache/pig/backend/hadoop/executionengine/spark/plan/SparkPOPackageAnnotator.java
URL: 
http://svn.apache.org/viewvc/pig/branches/spark/src/org/apache/pig/backend/hadoop/executionengine/spark/plan/SparkPOPackageAnnotator.java?rev=1665404&r1=1665403&r2=1665404&view=diff
==============================================================================
--- 
pig/branches/spark/src/org/apache/pig/backend/hadoop/executionengine/spark/plan/SparkPOPackageAnnotator.java
 (original)
+++ 
pig/branches/spark/src/org/apache/pig/backend/hadoop/executionengine/spark/plan/SparkPOPackageAnnotator.java
 Tue Mar 10 04:37:36 2015
@@ -34,132 +34,141 @@ import org.apache.pig.impl.plan.optimize
 import org.apache.pig.impl.util.Pair;
 
 /**
- * This visitor visits the SparkPlan and does the following
- * for each SparkOper
- * - visits the POPackage in the plan and finds the corresponding
- * POLocalRearrange(s). It then annotates the POPackage
- * with information about which columns in the "value" are present in the
- * "key" and will need to stitched in to the "value"
+ * This visitor visits the SparkPlan and does the following for each
+ * SparkOperator - visits the POPackage in the plan and finds the corresponding
+ * POLocalRearrange(s). It then annotates the POPackage with information about
+ * which columns in the "value" are present in the "key" and will need to
+ * stitched in to the "value"
  */
 public class SparkPOPackageAnnotator extends SparkOpPlanVisitor {
-    public SparkPOPackageAnnotator(SparkOperPlan plan) {
-        super(plan, new DepthFirstWalker<SparkOper, SparkOperPlan>(plan));
-    }
-
-    @Override
-    public void visitSparkOp(SparkOper sparkOp) throws VisitorException {
-        if(!sparkOp.plan.isEmpty()) {
-            PackageDiscoverer pkgDiscoverer = new 
PackageDiscoverer(sparkOp.plan);
-            pkgDiscoverer.visit();
-            POPackage pkg = pkgDiscoverer.getPkg();
-            if(pkg != null) {
-                handlePackage(sparkOp, pkg);
-            }
-        }
-    }
-
-    private void handlePackage(SparkOper pkgSparkOp, POPackage pkg) throws 
VisitorException {
-        int lrFound = 0;
-        List<SparkOper> predecessors = this.mPlan.getPredecessors(pkgSparkOp);
-        if (predecessors != null && predecessors.size() > 0) {
-            for (SparkOper pred : predecessors) {
-                lrFound += patchPackage(pred, pkgSparkOp, pkg);
-                if(lrFound == pkg.getNumInps()) {
-                    break;
-                }
-            }
-        }
-        if (lrFound != pkg.getNumInps()) {
-            int errCode = 2086;
-            String msg = "Unexpected problem during optimization. Could not 
find all LocalRearrange operators.";
-            throw new OptimizerException(msg, errCode, PigException.BUG);
-        }
-    }
-
-    private int patchPackage(SparkOper pred , SparkOper pkgSparkOp, POPackage 
pkg) throws VisitorException {
-        LoRearrangeDiscoverer lrDiscoverer = new 
LoRearrangeDiscoverer(pred.plan, pkg);
-        lrDiscoverer.visit();
-        // let our caller know if we managed to patch
-        // the package
-        return lrDiscoverer.getLoRearrangeFound();
-    }
-
-
-    static class PackageDiscoverer extends PhyPlanVisitor {
-
-        private POPackage pkg;
-
-        public PackageDiscoverer(PhysicalPlan plan) {
-            super(plan, new DepthFirstWalker<PhysicalOperator, 
PhysicalPlan>(plan));
-        }
-
-        @Override
-        public void visitPackage(POPackage pkg) throws VisitorException {
-            this.pkg = pkg;
-        };
-
-        /**
-         * @return the pkg
-         */
-        public POPackage getPkg() {
-            return pkg;
-        }
-
-    }
-
-
-    static class LoRearrangeDiscoverer extends PhyPlanVisitor {
-
-        private int loRearrangeFound = 0;
-        private POPackage pkg;
-
-        public LoRearrangeDiscoverer(PhysicalPlan plan, POPackage pkg) {
-            super(plan, new DepthFirstWalker<PhysicalOperator, 
PhysicalPlan>(plan));
-            this.pkg = pkg;
-        }
-
-        @Override
-        public void visitLocalRearrange(POLocalRearrange lrearrange) throws 
VisitorException {
-            loRearrangeFound++;
-            Map<Integer,Pair<Boolean, Map<Integer, Integer>>> keyInfo;
-
-            if (pkg.getPkgr() instanceof LitePackager) {
-                if(lrearrange.getIndex() != 0) {
-                    // Throw some exception here
-                    throw new RuntimeException("POLocalRearrange for 
POPackageLite cannot have index other than 0, but has index - 
"+lrearrange.getIndex());
-                }
-            }
-
-            // annotate the package with information from the LORearrange
-            // update the keyInfo information if already present in the 
POPackage
-            keyInfo = pkg.getPkgr().getKeyInfo();
-            if(keyInfo == null)
-                keyInfo = new HashMap<Integer, Pair<Boolean, Map<Integer, 
Integer>>>();
-
-            if(keyInfo.get(Integer.valueOf(lrearrange.getIndex())) != null) {
-                // something is wrong - we should not be getting key info
-                // for the same index from two different Local Rearranges
-                int errCode = 2087;
-                String msg = "Unexpected problem during optimization." +
-                        " Found index:" + lrearrange.getIndex() +
-                        " in multiple LocalRearrange operators.";
-                throw new OptimizerException(msg, errCode, PigException.BUG);
-
-            }
-            keyInfo.put(Integer.valueOf(lrearrange.getIndex()),
-                    new Pair<Boolean, Map<Integer, Integer>>(
-                            lrearrange.isProjectStar(), 
lrearrange.getProjectedColsMap()));
-            pkg.getPkgr().setKeyInfo(keyInfo);
-            pkg.getPkgr().setKeyTuple(lrearrange.isKeyTuple());
-            pkg.getPkgr().setKeyCompound(lrearrange.isKeyCompound());
-        }
-
-        /**
-         * @return the loRearrangeFound
-         */
-        public int getLoRearrangeFound() {
-            return loRearrangeFound;
-        }
+       public SparkPOPackageAnnotator(SparkOperPlan plan) {
+               super(plan, new DepthFirstWalker<SparkOperator, 
SparkOperPlan>(plan));
+       }
+
+       @Override
+       public void visitSparkOp(SparkOperator sparkOp) throws VisitorException 
{
+               if (!sparkOp.physicalPlan.isEmpty()) {
+                       PackageDiscoverer pkgDiscoverer = new PackageDiscoverer(
+                                       sparkOp.physicalPlan);
+                       pkgDiscoverer.visit();
+                       POPackage pkg = pkgDiscoverer.getPkg();
+                       if (pkg != null) {
+                               handlePackage(sparkOp, pkg);
+                       }
+               }
+       }
+
+       private void handlePackage(SparkOperator pkgSparkOp, POPackage pkg)
+                       throws VisitorException {
+               int lrFound = 0;
+               List<SparkOperator> predecessors = this.mPlan
+                               .getPredecessors(pkgSparkOp);
+               if (predecessors != null && predecessors.size() > 0) {
+                       for (SparkOperator pred : predecessors) {
+                               lrFound += patchPackage(pred, pkgSparkOp, pkg);
+                               if (lrFound == pkg.getNumInps()) {
+                                       break;
+                               }
+                       }
+               }
+               if (lrFound != pkg.getNumInps()) {
+                       int errCode = 2086;
+                       String msg = "Unexpected problem during optimization. 
Could not find all LocalRearrange operators.";
+                       throw new OptimizerException(msg, errCode, 
PigException.BUG);
+               }
+       }
+
+       private int patchPackage(SparkOperator pred, SparkOperator pkgSparkOp,
+                       POPackage pkg) throws VisitorException {
+               LoRearrangeDiscoverer lrDiscoverer = new LoRearrangeDiscoverer(
+                               pred.physicalPlan, pkg);
+               lrDiscoverer.visit();
+               // let our caller know if we managed to patch
+               // the package
+               return lrDiscoverer.getLoRearrangeFound();
+       }
+
+       static class PackageDiscoverer extends PhyPlanVisitor {
+
+               private POPackage pkg;
+
+               public PackageDiscoverer(PhysicalPlan plan) {
+                       super(plan, new DepthFirstWalker<PhysicalOperator, 
PhysicalPlan>(
+                                       plan));
+               }
+
+               @Override
+               public void visitPackage(POPackage pkg) throws VisitorException 
{
+                       this.pkg = pkg;
+               };
+
+               /**
+                * @return the pkg
+                */
+               public POPackage getPkg() {
+                       return pkg;
+               }
+
+       }
+
+       static class LoRearrangeDiscoverer extends PhyPlanVisitor {
+
+               private int loRearrangeFound = 0;
+               private POPackage pkg;
+
+               public LoRearrangeDiscoverer(PhysicalPlan plan, POPackage pkg) {
+                       super(plan, new DepthFirstWalker<PhysicalOperator, 
PhysicalPlan>(
+                                       plan));
+                       this.pkg = pkg;
+               }
+
+               @Override
+               public void visitLocalRearrange(POLocalRearrange lrearrange)
+                               throws VisitorException {
+                       loRearrangeFound++;
+                       Map<Integer, Pair<Boolean, Map<Integer, Integer>>> 
keyInfo;
+
+                       if (pkg.getPkgr() instanceof LitePackager) {
+                               if (lrearrange.getIndex() != 0) {
+                                       // Throw some exception here
+                                       throw new RuntimeException(
+                                                       "POLocalRearrange for 
POPackageLite cannot have index other than 0, but has index - "
+                                                                       + 
lrearrange.getIndex());
+                               }
+                       }
+
+                       // annotate the package with information from the 
LORearrange
+                       // update the keyInfo information if already present in 
the
+                       // POPackage
+                       keyInfo = pkg.getPkgr().getKeyInfo();
+                       if (keyInfo == null)
+                               keyInfo = new HashMap<Integer, Pair<Boolean, 
Map<Integer, Integer>>>();
+
+                       if (keyInfo.get(Integer.valueOf(lrearrange.getIndex())) 
!= null) {
+                               // something is wrong - we should not be 
getting key info
+                               // for the same index from two different Local 
Rearranges
+                               int errCode = 2087;
+                               String msg = "Unexpected problem during 
optimization."
+                                               + " Found index:" + 
lrearrange.getIndex()
+                                               + " in multiple LocalRearrange 
operators.";
+                               throw new OptimizerException(msg, errCode, 
PigException.BUG);
+
+                       }
+                       keyInfo.put(
+                                       Integer.valueOf(lrearrange.getIndex()),
+                                       new Pair<Boolean, Map<Integer, 
Integer>>(lrearrange
+                                                       .isProjectStar(), 
lrearrange.getProjectedColsMap()));
+                       pkg.getPkgr().setKeyInfo(keyInfo);
+                       pkg.getPkgr().setKeyTuple(lrearrange.isKeyTuple());
+                       
pkg.getPkgr().setKeyCompound(lrearrange.isKeyCompound());
+               }
+
+               /**
+                * @return the loRearrangeFound
+                */
+               public int getLoRearrangeFound() {
+                       return loRearrangeFound;
+               }
 
-    }
+       }
 }

Modified: 
pig/branches/spark/src/org/apache/pig/backend/hadoop/executionengine/spark/plan/SparkPrinter.java
URL: 
http://svn.apache.org/viewvc/pig/branches/spark/src/org/apache/pig/backend/hadoop/executionengine/spark/plan/SparkPrinter.java?rev=1665404&r1=1665403&r2=1665404&view=diff
==============================================================================
--- 
pig/branches/spark/src/org/apache/pig/backend/hadoop/executionengine/spark/plan/SparkPrinter.java
 (original)
+++ 
pig/branches/spark/src/org/apache/pig/backend/hadoop/executionengine/spark/plan/SparkPrinter.java
 Tue Mar 10 04:37:36 2015
@@ -22,7 +22,7 @@ import java.io.PrintStream;
 import 
org.apache.pig.backend.hadoop.executionengine.physicalLayer.PhysicalOperator;
 import 
org.apache.pig.backend.hadoop.executionengine.physicalLayer.plans.PhysicalPlan;
 import 
org.apache.pig.backend.hadoop.executionengine.physicalLayer.plans.PlanPrinter;
-import 
org.apache.pig.backend.hadoop.executionengine.spark.operator.NativeSparkOper;
+import 
org.apache.pig.backend.hadoop.executionengine.spark.operator.NativeSparkOperator;
 import org.apache.pig.impl.plan.DepthFirstWalker;
 import org.apache.pig.impl.plan.VisitorException;
 
@@ -31,36 +31,36 @@ import org.apache.pig.impl.plan.VisitorE
  */
 public class SparkPrinter extends SparkOpPlanVisitor {
 
+       private PrintStream mStream = null;
+       private boolean isVerbose = true;
 
-    private PrintStream mStream = null;
-    private boolean isVerbose = true;
-
-    public SparkPrinter(PrintStream ps, SparkOperPlan plan) {
-        super(plan, new DepthFirstWalker<SparkOper, SparkOperPlan>(plan));
-        mStream = ps;
-        mStream.println("#--------------------------------------------------");
-        mStream.println("# Spark Plan                                  ");
-        mStream.println("#--------------------------------------------------");
-    }
-
-    public void setVerbose(boolean verbose) {
-        isVerbose = verbose;
-    }
-
-    @Override
-    public void visitSparkOp(SparkOper sparkOp) throws VisitorException {
-        mStream.println("");
-        mStream.println("Spark node " + sparkOp.getOperatorKey().toString());
-        if(sparkOp instanceof NativeSparkOper) {
-            mStream.println("--------");
-            mStream.println();
-            return;
-        }
-        if (sparkOp.plan != null && sparkOp.plan.size() > 0) {
-            PlanPrinter<PhysicalOperator, PhysicalPlan> printer = new 
PlanPrinter<PhysicalOperator, PhysicalPlan>(sparkOp.plan, mStream);
-            printer.setVerbose(isVerbose);
-            printer.visit();
-            mStream.println("--------");
-        }
-    }
+       public SparkPrinter(PrintStream ps, SparkOperPlan plan) {
+               super(plan, new DepthFirstWalker<SparkOperator, 
SparkOperPlan>(plan));
+               mStream = ps;
+               
mStream.println("#--------------------------------------------------");
+               mStream.println("# Spark Plan                                  
");
+               
mStream.println("#--------------------------------------------------");
+       }
+
+       public void setVerbose(boolean verbose) {
+               isVerbose = verbose;
+       }
+
+       @Override
+       public void visitSparkOp(SparkOperator sparkOp) throws VisitorException 
{
+               mStream.println("");
+               mStream.println("Spark node " + 
sparkOp.getOperatorKey().toString());
+               if (sparkOp instanceof NativeSparkOperator) {
+                       mStream.println("--------");
+                       mStream.println();
+                       return;
+               }
+               if (sparkOp.physicalPlan != null && sparkOp.physicalPlan.size() 
> 0) {
+                       PlanPrinter<PhysicalOperator, PhysicalPlan> printer = 
new PlanPrinter<PhysicalOperator, PhysicalPlan>(
+                                       sparkOp.physicalPlan, mStream);
+                       printer.setVerbose(isVerbose);
+                       printer.visit();
+                       mStream.println("--------");
+               }
+       }
 }


Reply via email to