Modified: 
pig/branches/spark/src/org/apache/pig/backend/hadoop/executionengine/spark/plan/SparkCompiler.java
URL: 
http://svn.apache.org/viewvc/pig/branches/spark/src/org/apache/pig/backend/hadoop/executionengine/spark/plan/SparkCompiler.java?rev=1791060&r1=1791059&r2=1791060&view=diff
==============================================================================
--- 
pig/branches/spark/src/org/apache/pig/backend/hadoop/executionengine/spark/plan/SparkCompiler.java
 (original)
+++ 
pig/branches/spark/src/org/apache/pig/backend/hadoop/executionengine/spark/plan/SparkCompiler.java
 Wed Apr 12 02:20:20 2017
@@ -25,8 +25,8 @@ import java.util.HashMap;
 import java.util.HashSet;
 import java.util.Iterator;
 import java.util.List;
-import java.util.Properties;
 import java.util.Map;
+import java.util.Properties;
 import java.util.Random;
 import java.util.Set;
 
@@ -112,53 +112,53 @@ public class SparkCompiler extends PhyPl
     private static final Log LOG = LogFactory.getLog(SparkCompiler.class);
 
     private PigContext pigContext;
-       private Properties pigProperties;
+    private Properties pigProperties;
 
-       // The physicalPlan that is being compiled
-       private PhysicalPlan physicalPlan;
+    // The physicalPlan that is being compiled
+    private PhysicalPlan physicalPlan;
 
-       // The physicalPlan of Spark Operators
-       private SparkOperPlan sparkPlan;
+    // The physicalPlan of Spark Operators
+    private SparkOperPlan sparkPlan;
 
-       private SparkOperator curSparkOp;
+    private SparkOperator curSparkOp;
 
-       private String scope;
+    private String scope;
 
-       private SparkOperator[] compiledInputs = null;
+    private SparkOperator[] compiledInputs = null;
 
-       private Map<OperatorKey, SparkOperator> splitsSeen;
+    private Map<OperatorKey, SparkOperator> splitsSeen;
 
-       private NodeIdGenerator nig;
+    private NodeIdGenerator nig;
 
-       private Map<PhysicalOperator, SparkOperator> phyToSparkOpMap;
-       private UDFFinder udfFinder;
+    private Map<PhysicalOperator, SparkOperator> phyToSparkOpMap;
+    private UDFFinder udfFinder;
 
-       public SparkCompiler(PhysicalPlan physicalPlan, PigContext pigContext) {
-               super(physicalPlan,
-                               new DepthFirstWalker<PhysicalOperator, 
PhysicalPlan>(
-                                               physicalPlan));
-               this.physicalPlan = physicalPlan;
-               this.pigContext = pigContext;
-               this.pigProperties = pigContext.getProperties();
-               this.sparkPlan = new SparkOperPlan();
-               this.phyToSparkOpMap = new HashMap<PhysicalOperator, 
SparkOperator>();
-               this.udfFinder = new UDFFinder();
-               this.nig = NodeIdGenerator.getGenerator();
-               this.splitsSeen = new HashMap<OperatorKey, SparkOperator>();
+    public SparkCompiler(PhysicalPlan physicalPlan, PigContext pigContext) {
+        super(physicalPlan,
+                new DepthFirstWalker<PhysicalOperator, PhysicalPlan>(
+                        physicalPlan));
+        this.physicalPlan = physicalPlan;
+        this.pigContext = pigContext;
+        this.pigProperties = pigContext.getProperties();
+        this.sparkPlan = new SparkOperPlan();
+        this.phyToSparkOpMap = new HashMap<PhysicalOperator, SparkOperator>();
+        this.udfFinder = new UDFFinder();
+        this.nig = NodeIdGenerator.getGenerator();
+        this.splitsSeen = new HashMap<OperatorKey, SparkOperator>();
 
-       }
+    }
 
-       public void compile() throws IOException, PlanException, 
VisitorException {
-               List<PhysicalOperator> roots = physicalPlan.getRoots();
-               if ((roots == null) || (roots.size() <= 0)) {
-                       int errCode = 2053;
-                       String msg = "Internal error. Did not find roots in the 
physical physicalPlan.";
-                       throw new SparkCompilerException(msg, errCode, 
PigException.BUG);
-               }
-               scope = roots.get(0).getOperatorKey().getScope();
-               List<PhysicalOperator> leaves = physicalPlan.getLeaves();
+    public void compile() throws IOException, PlanException, VisitorException {
+        List<PhysicalOperator> roots = physicalPlan.getRoots();
+        if ((roots == null) || (roots.size() <= 0)) {
+            int errCode = 2053;
+            String msg = "Internal error. Did not find roots in the physical 
physicalPlan.";
+            throw new SparkCompilerException(msg, errCode, PigException.BUG);
+        }
+        scope = roots.get(0).getOperatorKey().getScope();
+        List<PhysicalOperator> leaves = physicalPlan.getLeaves();
 
-               if (!pigContext.inIllustrator) {
+        if (!pigContext.inIllustrator) {
             for (PhysicalOperator op : leaves) {
                 if (!(op instanceof POStore)) {
                     int errCode = 2025;
@@ -171,321 +171,321 @@ public class SparkCompiler extends PhyPl
             }
         }
 
-               // get all stores and nativeSpark operators, sort them in 
order(operator
-               // id)
-               // and compile their plans
-               List<POStore> stores = 
PlanHelper.getPhysicalOperators(physicalPlan,
-                               POStore.class);
-               List<PONative> nativeSparks = PlanHelper.getPhysicalOperators(
-                               physicalPlan, PONative.class);
-               List<PhysicalOperator> ops;
-               if (!pigContext.inIllustrator) {
-                       ops = new ArrayList<PhysicalOperator>(stores.size()
-                                       + nativeSparks.size());
-                       ops.addAll(stores);
-               } else {
-                       ops = new ArrayList<PhysicalOperator>(leaves.size()
-                                       + nativeSparks.size());
-                       ops.addAll(leaves);
-               }
-               ops.addAll(nativeSparks);
-               Collections.sort(ops);
+        // get all stores and nativeSpark operators, sort them in 
order(operator
+        // id)
+        // and compile their plans
+        List<POStore> stores = PlanHelper.getPhysicalOperators(physicalPlan,
+                POStore.class);
+        List<PONative> nativeSparks = PlanHelper.getPhysicalOperators(
+                physicalPlan, PONative.class);
+        List<PhysicalOperator> ops;
+        if (!pigContext.inIllustrator) {
+            ops = new ArrayList<PhysicalOperator>(stores.size()
+                    + nativeSparks.size());
+            ops.addAll(stores);
+        } else {
+            ops = new ArrayList<PhysicalOperator>(leaves.size()
+                    + nativeSparks.size());
+            ops.addAll(leaves);
+        }
+        ops.addAll(nativeSparks);
+        Collections.sort(ops);
 
-               for (PhysicalOperator op : ops) {
+        for (PhysicalOperator op : ops) {
             if (LOG.isDebugEnabled())
                 LOG.debug("Starting compile of leaf-level operator " + op);
             compile(op);
-               }
-       }
+        }
+    }
 
-       /**
-        * Compiles the physicalPlan below op into a Spark Operator and stores 
it in
-        * curSparkOp.
-        * 
-        * @param op
-        * @throws IOException
-        * @throws PlanException
-        * @throws VisitorException
-        */
-       private void compile(PhysicalOperator op) throws IOException,
-                       PlanException, VisitorException {
-               SparkOperator[] prevCompInp = compiledInputs;
+    /**
+     * Compiles the physicalPlan below op into a Spark Operator and stores it 
in
+     * curSparkOp.
+     * 
+     * @param op
+     * @throws IOException
+     * @throws PlanException
+     * @throws VisitorException
+     */
+    private void compile(PhysicalOperator op) throws IOException,
+            PlanException, VisitorException {
+        SparkOperator[] prevCompInp = compiledInputs;
 
         if (LOG.isDebugEnabled())
             LOG.debug("Compiling physical operator " + op +
                 ". Current spark operator is " + curSparkOp);
 
-               List<PhysicalOperator> predecessors = 
physicalPlan.getPredecessors(op);
-               if (op instanceof PONative) {
-                       // the predecessor (store) has already been processed
-                       // don't process it again
-               } else if (predecessors != null && predecessors.size() > 0) {
-                       // When processing an entire script (multiquery), we can
-                       // get into a situation where a load has
-                       // predecessors. This means that it depends on some 
store
-                       // earlier in the physicalPlan. We need to take that 
dependency
-                       // and connect the respective Spark operators, while at 
the
-                       // same time removing the connection between the 
Physical
-                       // operators. That way the jobs will run in the right
-                       // order.
-                       if (op instanceof POLoad) {
-
-                               if (predecessors.size() != 1) {
-                                       int errCode = 2125;
-                                       String msg = "Expected at most one 
predecessor of load. Got "
-                                                       + predecessors.size();
-                                       throw new PlanException(msg, errCode, 
PigException.BUG);
-                               }
-
-                               PhysicalOperator p = predecessors.get(0);
-                               SparkOperator oper = null;
-                               if (p instanceof POStore || p instanceof 
PONative) {
-                                       oper = phyToSparkOpMap.get(p);
-                               } else {
-                                       int errCode = 2126;
-                                       String msg = "Predecessor of load 
should be a store or spark operator. Got "
-                                                       + p.getClass();
-                                       throw new PlanException(msg, errCode, 
PigException.BUG);
-                               }
-
-                               // Need new operator
-                               curSparkOp = getSparkOp();
-                               curSparkOp.add(op);
-                               sparkPlan.add(curSparkOp);
-                               physicalPlan.disconnect(op, p);
-                               sparkPlan.connect(oper, curSparkOp);
-                               phyToSparkOpMap.put(op, curSparkOp);
-                               return;
-                       }
-
-                       Collections.sort(predecessors);
-                       compiledInputs = new SparkOperator[predecessors.size()];
-                       int i = -1;
-                       for (PhysicalOperator pred : predecessors) {
-                               if (pred instanceof POSplit
-                                               && 
splitsSeen.containsKey(pred.getOperatorKey())) {
-                                       compiledInputs[++i] = startNew(
-                                                       ((POSplit) 
pred).getSplitStore(),
-                                                       
splitsSeen.get(pred.getOperatorKey()));
-                                       continue;
-                               }
-                               compile(pred);
-                               compiledInputs[++i] = curSparkOp;
-                       }
-               } else {
-                       // No predecessors. Mostly a load. But this is where
-                       // we start. We create a new sparkOp and add its first
-                       // operator op. Also this should be added to the 
sparkPlan.
-                       curSparkOp = getSparkOp();
-                       curSparkOp.add(op);
-                       if (op != null && op instanceof POLoad) {
-                               if (((POLoad) op).getLFile() != null
-                                               && ((POLoad) 
op).getLFile().getFuncSpec() != null)
-                                       curSparkOp.UDFs.add(((POLoad) 
op).getLFile().getFuncSpec()
-                                                       .toString());
-                       }
-                       sparkPlan.add(curSparkOp);
-                       phyToSparkOpMap.put(op, curSparkOp);
-                       return;
-               }
-               op.visit(this);
-               compiledInputs = prevCompInp;
-       }
+        List<PhysicalOperator> predecessors = physicalPlan.getPredecessors(op);
+        if (op instanceof PONative) {
+            // the predecessor (store) has already been processed
+            // don't process it again
+        } else if (predecessors != null && predecessors.size() > 0) {
+            // When processing an entire script (multiquery), we can
+            // get into a situation where a load has
+            // predecessors. This means that it depends on some store
+            // earlier in the physicalPlan. We need to take that dependency
+            // and connect the respective Spark operators, while at the
+            // same time removing the connection between the Physical
+            // operators. That way the jobs will run in the right
+            // order.
+            if (op instanceof POLoad) {
+
+                if (predecessors.size() != 1) {
+                    int errCode = 2125;
+                    String msg = "Expected at most one predecessor of load. 
Got "
+                            + predecessors.size();
+                    throw new PlanException(msg, errCode, PigException.BUG);
+                }
+
+                PhysicalOperator p = predecessors.get(0);
+                SparkOperator oper = null;
+                if (p instanceof POStore || p instanceof PONative) {
+                    oper = phyToSparkOpMap.get(p);
+                } else {
+                    int errCode = 2126;
+                    String msg = "Predecessor of load should be a store or 
spark operator. Got "
+                            + p.getClass();
+                    throw new PlanException(msg, errCode, PigException.BUG);
+                }
+
+                // Need new operator
+                curSparkOp = getSparkOp();
+                curSparkOp.add(op);
+                sparkPlan.add(curSparkOp);
+                physicalPlan.disconnect(op, p);
+                sparkPlan.connect(oper, curSparkOp);
+                phyToSparkOpMap.put(op, curSparkOp);
+                return;
+            }
+
+            Collections.sort(predecessors);
+            compiledInputs = new SparkOperator[predecessors.size()];
+            int i = -1;
+            for (PhysicalOperator pred : predecessors) {
+                if (pred instanceof POSplit
+                        && splitsSeen.containsKey(pred.getOperatorKey())) {
+                    compiledInputs[++i] = startNew(
+                            ((POSplit) pred).getSplitStore(),
+                            splitsSeen.get(pred.getOperatorKey()));
+                    continue;
+                }
+                compile(pred);
+                compiledInputs[++i] = curSparkOp;
+            }
+        } else {
+            // No predecessors. Mostly a load. But this is where
+            // we start. We create a new sparkOp and add its first
+            // operator op. Also this should be added to the sparkPlan.
+            curSparkOp = getSparkOp();
+            curSparkOp.add(op);
+            if (op != null && op instanceof POLoad) {
+                if (((POLoad) op).getLFile() != null
+                        && ((POLoad) op).getLFile().getFuncSpec() != null)
+                    curSparkOp.UDFs.add(((POLoad) op).getLFile().getFuncSpec()
+                            .toString());
+            }
+            sparkPlan.add(curSparkOp);
+            phyToSparkOpMap.put(op, curSparkOp);
+            return;
+        }
+        op.visit(this);
+        compiledInputs = prevCompInp;
+    }
 
-       private SparkOperator getSparkOp() {
-               SparkOperator op = new 
SparkOperator(OperatorKey.genOpKey(scope));
+    private SparkOperator getSparkOp() {
+        SparkOperator op = new SparkOperator(OperatorKey.genOpKey(scope));
         if (LOG.isDebugEnabled())
             LOG.debug("Created new Spark operator " + op);
         return op;
-       }
+    }
+
+    public SparkOperPlan getSparkPlan() {
+        return sparkPlan;
+    }
+
+    public void connectSoftLink() throws PlanException, IOException {
+        for (PhysicalOperator op : physicalPlan) {
+            if (physicalPlan.getSoftLinkPredecessors(op) != null) {
+                for (PhysicalOperator pred : physicalPlan
+                        .getSoftLinkPredecessors(op)) {
+                    SparkOperator from = phyToSparkOpMap.get(pred);
+                    SparkOperator to = phyToSparkOpMap.get(op);
+                    if (from == to)
+                        continue;
+                    if (sparkPlan.getPredecessors(to) == null
+                            || !sparkPlan.getPredecessors(to).contains(from)) {
+                        sparkPlan.connect(from, to);
+                    }
+                }
+            }
+        }
+    }
 
-       public SparkOperPlan getSparkPlan() {
-               return sparkPlan;
-       }
-
-       public void connectSoftLink() throws PlanException, IOException {
-               for (PhysicalOperator op : physicalPlan) {
-                       if (physicalPlan.getSoftLinkPredecessors(op) != null) {
-                               for (PhysicalOperator pred : physicalPlan
-                                               .getSoftLinkPredecessors(op)) {
-                                       SparkOperator from = 
phyToSparkOpMap.get(pred);
-                                       SparkOperator to = 
phyToSparkOpMap.get(op);
-                                       if (from == to)
-                                               continue;
-                                       if (sparkPlan.getPredecessors(to) == 
null
-                                                       || 
!sparkPlan.getPredecessors(to).contains(from)) {
-                                               sparkPlan.connect(from, to);
-                                       }
-                               }
-                       }
-               }
-       }
-
-       private SparkOperator startNew(FileSpec fSpec, SparkOperator old)
-                       throws PlanException {
-               POLoad ld = getLoad();
-               ld.setLFile(fSpec);
-               SparkOperator ret = getSparkOp();
-               ret.add(ld);
-               sparkPlan.add(ret);
-               sparkPlan.connect(old, ret);
-               return ret;
-       }
-
-       private POLoad getLoad() {
-               POLoad ld = new POLoad(new OperatorKey(scope, 
nig.getNextNodeId(scope)));
-               ld.setPc(pigContext);
-               ld.setIsTmpLoad(true);
-               return ld;
-       }
-
-       @Override
-       public void visitSplit(POSplit op) throws VisitorException {
-               try {
-                       FileSpec fSpec = op.getSplitStore();
-                       SparkOperator sparkOp = 
endSingleInputPlanWithStr(fSpec);
-                       sparkOp.setSplitter(true);
-                       splitsSeen.put(op.getOperatorKey(), sparkOp);
-                       curSparkOp = startNew(fSpec, sparkOp);
-                       phyToSparkOpMap.put(op, curSparkOp);
-               } catch (Exception e) {
-                       int errCode = 2034;
-                       String msg = "Error compiling operator "
-                                       + op.getClass().getSimpleName();
-                       throw new SparkCompilerException(msg, errCode, 
PigException.BUG, e);
-               }
-       }
-
-       public void visitDistinct(PODistinct op) throws VisitorException {
-               try {
-                       addToPlan(op);
+    private SparkOperator startNew(FileSpec fSpec, SparkOperator old)
+            throws PlanException {
+        POLoad ld = getLoad();
+        ld.setLFile(fSpec);
+        SparkOperator ret = getSparkOp();
+        ret.add(ld);
+        sparkPlan.add(ret);
+        sparkPlan.connect(old, ret);
+        return ret;
+    }
+
+    private POLoad getLoad() {
+        POLoad ld = new POLoad(new OperatorKey(scope, 
nig.getNextNodeId(scope)));
+        ld.setPc(pigContext);
+        ld.setIsTmpLoad(true);
+        return ld;
+    }
+
+    @Override
+    public void visitSplit(POSplit op) throws VisitorException {
+        try {
+            FileSpec fSpec = op.getSplitStore();
+            SparkOperator sparkOp = endSingleInputPlanWithStr(fSpec);
+            sparkOp.setSplitter(true);
+            splitsSeen.put(op.getOperatorKey(), sparkOp);
+            curSparkOp = startNew(fSpec, sparkOp);
+            phyToSparkOpMap.put(op, curSparkOp);
+        } catch (Exception e) {
+            int errCode = 2034;
+            String msg = "Error compiling operator "
+                    + op.getClass().getSimpleName();
+            throw new SparkCompilerException(msg, errCode, PigException.BUG, 
e);
+        }
+    }
+
+    public void visitDistinct(PODistinct op) throws VisitorException {
+        try {
+            addToPlan(op);
             phyToSparkOpMap.put(op, curSparkOp);
-               } catch (Exception e) {
-                       int errCode = 2034;
-                       String msg = "Error compiling operator "
-                                       + op.getClass().getSimpleName();
-                       throw new SparkCompilerException(msg, errCode, 
PigException.BUG, e);
-               }
-       }
-
-       private SparkOperator endSingleInputPlanWithStr(FileSpec fSpec)
-                       throws PlanException {
-               if (compiledInputs.length > 1) {
-                       int errCode = 2023;
-                       String msg = "Received a multi input physicalPlan when 
expecting only a single input one.";
-                       throw new PlanException(msg, errCode, PigException.BUG);
-               }
-               SparkOperator sparkOp = compiledInputs[0]; // Load
-               POStore str = getStore();
-               str.setSFile(fSpec);
-               sparkOp.physicalPlan.addAsLeaf(str);
-               return sparkOp;
-       }
-
-       private POStore getStore() {
-               POStore st = new POStore(new OperatorKey(scope,
-                               nig.getNextNodeId(scope)));
-               // mark store as tmp store. These could be removed by the
-               // optimizer, because it wasn't the user requesting it.
-               st.setIsTmpStore(true);
-               return st;
-       }
-
-       @Override
-       public void visitLoad(POLoad op) throws VisitorException {
-               try {
-                       addToPlan(op);
-                       phyToSparkOpMap.put(op, curSparkOp);
-               } catch (Exception e) {
-                       int errCode = 2034;
-                       String msg = "Error compiling operator "
-                                       + op.getClass().getSimpleName();
-                       throw new SparkCompilerException(msg, errCode, 
PigException.BUG, e);
-               }
-       }
-
-       @Override
-       public void visitNative(PONative op) throws VisitorException {
-               try {
-                       SparkOperator nativesparkOpper = getNativeSparkOp(
-                                       op.getNativeMRjar(), op.getParams());
+        } catch (Exception e) {
+            int errCode = 2034;
+            String msg = "Error compiling operator "
+                    + op.getClass().getSimpleName();
+            throw new SparkCompilerException(msg, errCode, PigException.BUG, 
e);
+        }
+    }
+
+    private SparkOperator endSingleInputPlanWithStr(FileSpec fSpec)
+            throws PlanException {
+        if (compiledInputs.length > 1) {
+            int errCode = 2023;
+            String msg = "Received a multi input physicalPlan when expecting 
only a single input one.";
+            throw new PlanException(msg, errCode, PigException.BUG);
+        }
+        SparkOperator sparkOp = compiledInputs[0]; // Load
+        POStore str = getStore();
+        str.setSFile(fSpec);
+        sparkOp.physicalPlan.addAsLeaf(str);
+        return sparkOp;
+    }
+
+    private POStore getStore() {
+        POStore st = new POStore(new OperatorKey(scope,
+                nig.getNextNodeId(scope)));
+        // mark store as tmp store. These could be removed by the
+        // optimizer, because it wasn't the user requesting it.
+        st.setIsTmpStore(true);
+        return st;
+    }
+
+    @Override
+    public void visitLoad(POLoad op) throws VisitorException {
+        try {
+            addToPlan(op);
+            phyToSparkOpMap.put(op, curSparkOp);
+        } catch (Exception e) {
+            int errCode = 2034;
+            String msg = "Error compiling operator "
+                    + op.getClass().getSimpleName();
+            throw new SparkCompilerException(msg, errCode, PigException.BUG, 
e);
+        }
+    }
+
+    @Override
+    public void visitNative(PONative op) throws VisitorException {
+        try {
+            SparkOperator nativesparkOpper = getNativeSparkOp(
+                    op.getNativeMRjar(), op.getParams());
             nativesparkOpper.markNative();
-                       sparkPlan.add(nativesparkOpper);
-                       sparkPlan.connect(curSparkOp, nativesparkOpper);
-                       phyToSparkOpMap.put(op, nativesparkOpper);
-                       curSparkOp = nativesparkOpper;
-               } catch (Exception e) {
-                       int errCode = 2034;
-                       String msg = "Error compiling operator "
-                                       + op.getClass().getSimpleName();
-                       throw new SparkCompilerException(msg, errCode, 
PigException.BUG, e);
-               }
-       }
-
-       private NativeSparkOperator getNativeSparkOp(String sparkJar,
-                       String[] parameters) {
-               return new NativeSparkOperator(new OperatorKey(scope,
-                               nig.getNextNodeId(scope)), sparkJar, 
parameters);
-       }
-
-       @Override
-       public void visitStore(POStore op) throws VisitorException {
-               try {
-                       addToPlan(op);
-                       phyToSparkOpMap.put(op, curSparkOp);
-                       if (op.getSFile() != null && 
op.getSFile().getFuncSpec() != null)
-                               
curSparkOp.UDFs.add(op.getSFile().getFuncSpec().toString());
-               } catch (Exception e) {
-                       int errCode = 2034;
-                       String msg = "Error compiling operator "
-                                       + op.getClass().getSimpleName();
-                       throw new SparkCompilerException(msg, errCode, 
PigException.BUG, e);
-               }
-       }
-
-       @Override
-       public void visitFilter(POFilter op) throws VisitorException {
-               try {
-                       addToPlan(op);
-                       processUDFs(op.getPlan());
-                       phyToSparkOpMap.put(op, curSparkOp);
-               } catch (Exception e) {
-                       int errCode = 2034;
-                       String msg = "Error compiling operator "
-                                       + op.getClass().getSimpleName();
-                       throw new SparkCompilerException(msg, errCode, 
PigException.BUG, e);
-               }
-       }
-
-       @Override
-       public void visitCross(POCross op) throws VisitorException {
-               try {
-                       addToPlan(op);
-                       phyToSparkOpMap.put(op, curSparkOp);
-               } catch (Exception e) {
-                       int errCode = 2034;
-                       String msg = "Error compiling operator "
-                                       + op.getClass().getSimpleName();
-                       throw new SparkCompilerException(msg, errCode, 
PigException.BUG, e);
-               }
-       }
-
-       @Override
-       public void visitStream(POStream op) throws VisitorException {
-               try {
-                       addToPlan(op);
-                       phyToSparkOpMap.put(op, curSparkOp);
-               } catch (Exception e) {
-                       int errCode = 2034;
-                       String msg = "Error compiling operator "
-                                       + op.getClass().getSimpleName();
-                       throw new SparkCompilerException(msg, errCode, 
PigException.BUG, e);
-               }
-       }
-
-       @Override
-       public void visitSort(POSort op) throws VisitorException {
-               try {
+            sparkPlan.add(nativesparkOpper);
+            sparkPlan.connect(curSparkOp, nativesparkOpper);
+            phyToSparkOpMap.put(op, nativesparkOpper);
+            curSparkOp = nativesparkOpper;
+        } catch (Exception e) {
+            int errCode = 2034;
+            String msg = "Error compiling operator "
+                    + op.getClass().getSimpleName();
+            throw new SparkCompilerException(msg, errCode, PigException.BUG, 
e);
+        }
+    }
+
+    private NativeSparkOperator getNativeSparkOp(String sparkJar,
+            String[] parameters) {
+        return new NativeSparkOperator(new OperatorKey(scope,
+                nig.getNextNodeId(scope)), sparkJar, parameters);
+    }
+
+    @Override
+    public void visitStore(POStore op) throws VisitorException {
+        try {
+            addToPlan(op);
+            phyToSparkOpMap.put(op, curSparkOp);
+            if (op.getSFile() != null && op.getSFile().getFuncSpec() != null)
+                curSparkOp.UDFs.add(op.getSFile().getFuncSpec().toString());
+        } catch (Exception e) {
+            int errCode = 2034;
+            String msg = "Error compiling operator "
+                    + op.getClass().getSimpleName();
+            throw new SparkCompilerException(msg, errCode, PigException.BUG, 
e);
+        }
+    }
+
+    @Override
+    public void visitFilter(POFilter op) throws VisitorException {
+        try {
+            addToPlan(op);
+            processUDFs(op.getPlan());
+            phyToSparkOpMap.put(op, curSparkOp);
+        } catch (Exception e) {
+            int errCode = 2034;
+            String msg = "Error compiling operator "
+                    + op.getClass().getSimpleName();
+            throw new SparkCompilerException(msg, errCode, PigException.BUG, 
e);
+        }
+    }
+
+    @Override
+    public void visitCross(POCross op) throws VisitorException {
+        try {
+            addToPlan(op);
+            phyToSparkOpMap.put(op, curSparkOp);
+        } catch (Exception e) {
+            int errCode = 2034;
+            String msg = "Error compiling operator "
+                    + op.getClass().getSimpleName();
+            throw new SparkCompilerException(msg, errCode, PigException.BUG, 
e);
+        }
+    }
+
+    @Override
+    public void visitStream(POStream op) throws VisitorException {
+        try {
+            addToPlan(op);
+            phyToSparkOpMap.put(op, curSparkOp);
+        } catch (Exception e) {
+            int errCode = 2034;
+            String msg = "Error compiling operator "
+                    + op.getClass().getSimpleName();
+            throw new SparkCompilerException(msg, errCode, PigException.BUG, 
e);
+        }
+    }
+
+    @Override
+    public void visitSort(POSort op) throws VisitorException {
+        try {
             addToPlan(op);
             POSort sort = op;
             long limit = sort.getLimit();
@@ -496,143 +496,143 @@ public class SparkCompiler extends PhyPl
                 curSparkOp.markLimitAfterSort();
             }
             phyToSparkOpMap.put(op, curSparkOp);
-               } catch (Exception e) {
-                       int errCode = 2034;
-                       String msg = "Error compiling operator "
-                                       + op.getClass().getSimpleName();
-                       throw new SparkCompilerException(msg, errCode, 
PigException.BUG, e);
-               }
-       }
-
-       @Override
-       public void visitLimit(POLimit op) throws VisitorException {
-               try {
-                       addToPlan(op);
+        } catch (Exception e) {
+            int errCode = 2034;
+            String msg = "Error compiling operator "
+                    + op.getClass().getSimpleName();
+            throw new SparkCompilerException(msg, errCode, PigException.BUG, 
e);
+        }
+    }
+
+    @Override
+    public void visitLimit(POLimit op) throws VisitorException {
+        try {
+            addToPlan(op);
             curSparkOp.markLimit();
             phyToSparkOpMap.put(op, curSparkOp);
         } catch (Exception e) {
-                       int errCode = 2034;
-                       String msg = "Error compiling operator "
-                                       + op.getClass().getSimpleName();
-                       throw new SparkCompilerException(msg, errCode, 
PigException.BUG, e);
-               }
-       }
-
-       @Override
-       public void visitLocalRearrange(POLocalRearrange op)
-                       throws VisitorException {
-               try {
-                       addToPlan(op);
-                       List<PhysicalPlan> plans = op.getPlans();
-                       if (plans != null)
-                               for (PhysicalPlan ep : plans)
-                                       processUDFs(ep);
-                       phyToSparkOpMap.put(op, curSparkOp);
-               } catch (Exception e) {
-                       int errCode = 2034;
-                       String msg = "Error compiling operator "
-                                       + op.getClass().getSimpleName();
-                       throw new SparkCompilerException(msg, errCode, 
PigException.BUG, e);
-               }
-       }
-
-       @Override
-       public void visitCollectedGroup(POCollectedGroup op)
-                       throws VisitorException {
-               List<PhysicalOperator> roots = 
curSparkOp.physicalPlan.getRoots();
-               if (roots.size() != 1) {
-                       int errCode = 2171;
-                       String errMsg = "Expected one but found more then one 
root physical operator in physical physicalPlan.";
-                       throw new SparkCompilerException(errMsg, errCode, 
PigException.BUG);
-               }
-
-               PhysicalOperator phyOp = roots.get(0);
-               if (!(phyOp instanceof POLoad)) {
-                       int errCode = 2172;
-                       String errMsg = "Expected physical operator at root to 
be POLoad. Found : "
-                                       + phyOp.getClass().getCanonicalName();
-                       throw new SparkCompilerException(errMsg, errCode, 
PigException.BUG);
-               }
-
-               LoadFunc loadFunc = ((POLoad) phyOp).getLoadFunc();
-               try {
-                       if 
(!(CollectableLoadFunc.class.isAssignableFrom(loadFunc
-                                       .getClass()))) {
-                               int errCode = 2249;
-                               throw new SparkCompilerException(
-                                               "While using 'collected' on 
group; data must be loaded via loader implementing CollectableLoadFunc.",
-                                               errCode);
-                       }
-                       ((CollectableLoadFunc) 
loadFunc).ensureAllKeyInstancesInSameSplit();
-               } catch (SparkCompilerException e) {
-                       throw (e);
-               } catch (IOException e) {
-                       int errCode = 2034;
-                       String msg = "Error compiling operator "
-                                       + op.getClass().getSimpleName();
-                       throw new SparkCompilerException(msg, errCode, 
PigException.BUG, e);
-               }
-
-               try {
-                       addToPlan(op);
-                       phyToSparkOpMap.put(op, curSparkOp);
-               } catch (Exception e) {
-                       int errCode = 2034;
-                       String msg = "Error compiling operator "
-                                       + op.getClass().getSimpleName();
-                       throw new SparkCompilerException(msg, errCode, 
PigException.BUG, e);
-               }
-       }
-
-       @Override
-       public void visitPOForEach(POForEach op) throws VisitorException {
-               try {
-                       addToPlan(op);
-                       List<PhysicalPlan> plans = op.getInputPlans();
-                       if (plans != null) {
-                               for (PhysicalPlan ep : plans) {
-                                       processUDFs(ep);
-                               }
-                       }
-                       phyToSparkOpMap.put(op, curSparkOp);
-               } catch (Exception e) {
-                       int errCode = 2034;
-                       String msg = "Error compiling operator "
-                                       + op.getClass().getSimpleName();
-                       throw new SparkCompilerException(msg, errCode, 
PigException.BUG, e);
-               }
-       }
-
-       @Override
-       public void visitCounter(POCounter op) throws VisitorException {
-               try {
-                       addToPlan(op);
-                       phyToSparkOpMap.put(op, curSparkOp);
-               } catch (Exception e) {
-                       int errCode = 2034;
-                       String msg = "Error compiling operator "
-                                       + op.getClass().getSimpleName();
-                       throw new SparkCompilerException(msg, errCode, 
PigException.BUG, e);
-               }
-       }
-
-       @Override
-       public void visitRank(PORank op) throws VisitorException {
-               try {
-                       addToPlan(op);
-                       phyToSparkOpMap.put(op, curSparkOp);
-               } catch (Exception e) {
-                       int errCode = 2034;
-                       String msg = "Error compiling operator "
-                                       + op.getClass().getSimpleName();
-                       throw new SparkCompilerException(msg, errCode, 
PigException.BUG, e);
-               }
-       }
-
-       @Override
-       public void visitGlobalRearrange(POGlobalRearrange op)
-                       throws VisitorException {
-               try {
+            int errCode = 2034;
+            String msg = "Error compiling operator "
+                    + op.getClass().getSimpleName();
+            throw new SparkCompilerException(msg, errCode, PigException.BUG, 
e);
+        }
+    }
+
+    @Override
+    public void visitLocalRearrange(POLocalRearrange op)
+            throws VisitorException {
+        try {
+            addToPlan(op);
+            List<PhysicalPlan> plans = op.getPlans();
+            if (plans != null)
+                for (PhysicalPlan ep : plans)
+                    processUDFs(ep);
+            phyToSparkOpMap.put(op, curSparkOp);
+        } catch (Exception e) {
+            int errCode = 2034;
+            String msg = "Error compiling operator "
+                    + op.getClass().getSimpleName();
+            throw new SparkCompilerException(msg, errCode, PigException.BUG, 
e);
+        }
+    }
+
+    @Override
+    public void visitCollectedGroup(POCollectedGroup op)
+            throws VisitorException {
+        List<PhysicalOperator> roots = curSparkOp.physicalPlan.getRoots();
+        if (roots.size() != 1) {
+            int errCode = 2171;
+            String errMsg = "Expected one but found more then one root 
physical operator in physical physicalPlan.";
+            throw new SparkCompilerException(errMsg, errCode, 
PigException.BUG);
+        }
+
+        PhysicalOperator phyOp = roots.get(0);
+        if (!(phyOp instanceof POLoad)) {
+            int errCode = 2172;
+            String errMsg = "Expected physical operator at root to be POLoad. 
Found : "
+                    + phyOp.getClass().getCanonicalName();
+            throw new SparkCompilerException(errMsg, errCode, 
PigException.BUG);
+        }
+
+        LoadFunc loadFunc = ((POLoad) phyOp).getLoadFunc();
+        try {
+            if (!(CollectableLoadFunc.class.isAssignableFrom(loadFunc
+                    .getClass()))) {
+                int errCode = 2249;
+                throw new SparkCompilerException(
+                        "While using 'collected' on group; data must be loaded 
via loader implementing CollectableLoadFunc.",
+                        errCode);
+            }
+            ((CollectableLoadFunc) 
loadFunc).ensureAllKeyInstancesInSameSplit();
+        } catch (SparkCompilerException e) {
+            throw (e);
+        } catch (IOException e) {
+            int errCode = 2034;
+            String msg = "Error compiling operator "
+                    + op.getClass().getSimpleName();
+            throw new SparkCompilerException(msg, errCode, PigException.BUG, 
e);
+        }
+
+        try {
+            addToPlan(op);
+            phyToSparkOpMap.put(op, curSparkOp);
+        } catch (Exception e) {
+            int errCode = 2034;
+            String msg = "Error compiling operator "
+                    + op.getClass().getSimpleName();
+            throw new SparkCompilerException(msg, errCode, PigException.BUG, 
e);
+        }
+    }
+
+    @Override
+    public void visitPOForEach(POForEach op) throws VisitorException {
+        try {
+            addToPlan(op);
+            List<PhysicalPlan> plans = op.getInputPlans();
+            if (plans != null) {
+                for (PhysicalPlan ep : plans) {
+                    processUDFs(ep);
+                }
+            }
+            phyToSparkOpMap.put(op, curSparkOp);
+        } catch (Exception e) {
+            int errCode = 2034;
+            String msg = "Error compiling operator "
+                    + op.getClass().getSimpleName();
+            throw new SparkCompilerException(msg, errCode, PigException.BUG, 
e);
+        }
+    }
+
+    @Override
+    public void visitCounter(POCounter op) throws VisitorException {
+        try {
+            addToPlan(op);
+            phyToSparkOpMap.put(op, curSparkOp);
+        } catch (Exception e) {
+            int errCode = 2034;
+            String msg = "Error compiling operator "
+                    + op.getClass().getSimpleName();
+            throw new SparkCompilerException(msg, errCode, PigException.BUG, 
e);
+        }
+    }
+
+    @Override
+    public void visitRank(PORank op) throws VisitorException {
+        try {
+            addToPlan(op);
+            phyToSparkOpMap.put(op, curSparkOp);
+        } catch (Exception e) {
+            int errCode = 2034;
+            String msg = "Error compiling operator "
+                    + op.getClass().getSimpleName();
+            throw new SparkCompilerException(msg, errCode, PigException.BUG, 
e);
+        }
+    }
+
+    @Override
+    public void visitGlobalRearrange(POGlobalRearrange op)
+            throws VisitorException {
+        try {
             POGlobalRearrangeSpark glbOp = new POGlobalRearrangeSpark(op);
             addToPlan(glbOp);
             if (op.isCross()) {
@@ -641,50 +641,50 @@ public class SparkCompiler extends PhyPl
 
             curSparkOp.customPartitioner = op.getCustomPartitioner();
             phyToSparkOpMap.put(op, curSparkOp);
-               } catch (Exception e) {
-                       int errCode = 2034;
-                       String msg = "Error compiling operator "
-                                       + op.getClass().getSimpleName();
-                       throw new SparkCompilerException(msg, errCode, 
PigException.BUG, e);
-               }
-       }
-
-       @Override
-       public void visitPackage(POPackage op) throws VisitorException {
-               try {
-                       addToPlan(op);
-                       phyToSparkOpMap.put(op, curSparkOp);
-                       if (op.getPkgr().getPackageType() == 
Packager.PackageType.JOIN) {
-                               curSparkOp.markRegularJoin();
-                       } else if (op.getPkgr().getPackageType() == 
Packager.PackageType.GROUP) {
-                               if (op.getNumInps() == 1) {
-                                       curSparkOp.markGroupBy();
-                               } else if (op.getNumInps() > 1) {
-                                       curSparkOp.markCogroup();
-                               }
-                       }
-
-               } catch (Exception e) {
-                       int errCode = 2034;
-                       String msg = "Error compiling operator "
-                                       + op.getClass().getSimpleName();
-                       throw new SparkCompilerException(msg, errCode, 
PigException.BUG, e);
-               }
-       }
-
-       @Override
-       public void visitUnion(POUnion op) throws VisitorException {
-               try {
-                       addToPlan(op);
-                       phyToSparkOpMap.put(op, curSparkOp);
+        } catch (Exception e) {
+            int errCode = 2034;
+            String msg = "Error compiling operator "
+                    + op.getClass().getSimpleName();
+            throw new SparkCompilerException(msg, errCode, PigException.BUG, 
e);
+        }
+    }
+
+    @Override
+    public void visitPackage(POPackage op) throws VisitorException {
+        try {
+            addToPlan(op);
+            phyToSparkOpMap.put(op, curSparkOp);
+            if (op.getPkgr().getPackageType() == Packager.PackageType.JOIN) {
+                curSparkOp.markRegularJoin();
+            } else if (op.getPkgr().getPackageType() == 
Packager.PackageType.GROUP) {
+                if (op.getNumInps() == 1) {
+                    curSparkOp.markGroupBy();
+                } else if (op.getNumInps() > 1) {
+                    curSparkOp.markCogroup();
+                }
+            }
+
+        } catch (Exception e) {
+            int errCode = 2034;
+            String msg = "Error compiling operator "
+                    + op.getClass().getSimpleName();
+            throw new SparkCompilerException(msg, errCode, PigException.BUG, 
e);
+        }
+    }
+
+    @Override
+    public void visitUnion(POUnion op) throws VisitorException {
+        try {
+            addToPlan(op);
+            phyToSparkOpMap.put(op, curSparkOp);
             curSparkOp.markUnion();
-               } catch (Exception e) {
-                       int errCode = 2034;
-                       String msg = "Error compiling operator "
-                                       + op.getClass().getSimpleName();
-                       throw new SparkCompilerException(msg, errCode, 
PigException.BUG, e);
-               }
-       }
+        } catch (Exception e) {
+            int errCode = 2034;
+            String msg = "Error compiling operator "
+                    + op.getClass().getSimpleName();
+            throw new SparkCompilerException(msg, errCode, PigException.BUG, 
e);
+        }
+    }
 
     /**
      * currently use regular join to replace skewedJoin
@@ -725,292 +725,292 @@ public class SparkCompiler extends PhyPl
 
     @Override
     public void visitFRJoin(POFRJoin op) throws VisitorException {
-               try {
-                       curSparkOp = 
phyToSparkOpMap.get(op.getInputs().get(op.getFragment()));
-                       for (int i = 0; i < compiledInputs.length; i++) {
-                               SparkOperator sparkOperator = compiledInputs[i];
-                               if (curSparkOp.equals(sparkOperator)) {
-                                       continue;
-                               }
-
-                               OperatorKey broadcastKey = new 
OperatorKey(scope, nig.getNextNodeId(scope));
-                               POBroadcastSpark poBroadcastSpark = new 
POBroadcastSpark(broadcastKey);
-                               
poBroadcastSpark.setBroadcastedVariableName(broadcastKey.toString());
-
-                               
sparkOperator.physicalPlan.addAsLeaf(poBroadcastSpark);
-                       }
-
-                       POFRJoinSpark poFRJoinSpark = new POFRJoinSpark(op);
-                       addToPlan(poFRJoinSpark);
-                       phyToSparkOpMap.put(op, curSparkOp);
-               } catch (Exception e) {
-                       int errCode = 2034;
-                       String msg = "Error compiling operator " + 
op.getClass().getSimpleName();
-                       throw new SparkCompilerException(msg, errCode, 
PigException.BUG, e);
-               }
-    }
-
-       @Override
-       public void visitMergeJoin(POMergeJoin joinOp) throws VisitorException {
-               try {
-                       if (compiledInputs.length != 2 || 
joinOp.getInputs().size() != 2){
-                               int errCode=1101;
-                               throw new SparkCompilerException("Merge Join 
must have exactly two inputs. Found : "+compiledInputs.length, errCode);
-                       }
-
-                       curSparkOp = 
phyToSparkOpMap.get(joinOp.getInputs().get(0));
-                       SparkOperator rightSparkOp;
-                       if(curSparkOp.equals(compiledInputs[0])) {
-                               rightSparkOp = compiledInputs[1];
-                       } else {
-                               rightSparkOp = compiledInputs[0];
-                       }
-
-                       PhysicalPlan rightPipelinePlan;
-                       PhysicalPlan rightPhyPlan = rightSparkOp.physicalPlan;
-                       if (rightPhyPlan.getRoots().size() != 1) {
-                               int errCode = 2171;
-                               String errMsg = "Expected one but found more 
then one root physical operator in physical plan.";
-                               throw new 
SparkCompilerException(errMsg,errCode);
-                       }
-                       PhysicalOperator rightPhyLoader = 
rightPhyPlan.getRoots().get(0);
-                       if (!(rightPhyLoader instanceof POLoad)) {
-                               int errCode = 2172;
-                               String errMsg = "Expected physical operator at 
root to be POLoad. Found : "+rightPhyLoader.getClass().getCanonicalName();
-                               throw new 
SparkCompilerException(errMsg,errCode);
-                       }
-                       if (rightPhyPlan.getSuccessors(rightPhyLoader) == null 
|| rightPhyPlan.getSuccessors(rightPhyLoader).isEmpty()) {
-                               // Load - Join case.
-                               rightPipelinePlan = null;
-                       } else{ // We got something on right side. Yank it and 
set it as inner plan of right input.
-                               rightPipelinePlan = rightPhyPlan.clone();
-                               PhysicalOperator root = 
rightPipelinePlan.getRoots().get(0);
-                               rightPipelinePlan.disconnect(root, 
rightPipelinePlan.getSuccessors(root).get(0));
-                               rightPipelinePlan.remove(root);
-                               rightPhyPlan.trimBelow(rightPhyLoader);
-                       }
-
-                       joinOp.setupRightPipeline(rightPipelinePlan);
-                       rightSparkOp.setRequestedParallelism(1); // for 
indexing job
-
-                       POLoad rightLoader = 
(POLoad)rightSparkOp.physicalPlan.getRoots().get(0);
-                       joinOp.setSignature(rightLoader.getSignature());
-                       LoadFunc rightLoadFunc = rightLoader.getLoadFunc();
-
-                       
if(IndexableLoadFunc.class.isAssignableFrom(rightLoadFunc.getClass())) {
-                               
joinOp.setRightLoaderFuncSpec(rightLoader.getLFile().getFuncSpec());
-                               
joinOp.setRightInputFileName(rightLoader.getLFile().getFileName());
-                               
curSparkOp.UDFs.add(rightLoader.getLFile().getFuncSpec().toString());
-
-                               // we don't need the right rightSparkOp since
-                               // the right loader is an IndexableLoadFunc 
which can handle the index itself
-                               sparkPlan.remove(rightSparkOp);
-                               if(rightSparkOp == compiledInputs[0]) {
-                                       compiledInputs[0] = null;
-                               } else if(rightSparkOp == compiledInputs[1]) {
-                                       compiledInputs[1] = null;
-                               }
-
-                               // validate that the join keys in merge join 
are only
-                               // simple column projections or '*' and not 
expression - expressions
-                               // cannot be handled when the index is built by 
the storage layer on the sorted
-                               // data when the sorted data (and corresponding 
index) is written.
-                               // So merge join will be restricted not have 
expressions as join keys
-                               int numInputs = 
mPlan.getPredecessors(joinOp).size(); // should be 2
-                               for(int i = 0; i < numInputs; i++) {
-                                       List<PhysicalPlan> keyPlans = 
joinOp.getInnerPlansOf(i);
-                                       for (PhysicalPlan keyPlan : keyPlans) {
-                                               for(PhysicalOperator op : 
keyPlan) {
-                                                       if(!(op instanceof 
POProject)) {
-                                                               int errCode = 
1106;
-                                                               String errMsg = 
"Merge join is possible only for simple column or '*' join keys when using " +
-                                                                               
rightLoader.getLFile().getFuncSpec() + " as the loader";
-                                                               throw new 
SparkCompilerException(errMsg, errCode, PigException.INPUT);
-                                                       }
-                                               }
-                                       }
-                               }
-
-                       } else {
-                               //Replacing POLoad with indexer is disabled for 
'merge-sparse' joins.  While
-                               //this feature would be useful, the current 
implementation of DefaultIndexableLoader
-                               //is not designed to handle multiple calls to 
seekNear.  Specifically, it rereads the entire index
-                               //for each call.  Some refactoring of this 
class is required - and then the check below could be removed.
-                               if (joinOp.getJoinType() == 
LOJoin.JOINTYPE.MERGESPARSE) {
-                                       int errCode = 1104;
-                                       String errMsg = "Right input of 
merge-join must implement IndexableLoadFunc. " +
-                                                       "The specified loader " 
+ rightLoadFunc + " doesn't implement it";
-                                       throw new 
SparkCompilerException(errMsg,errCode);
-                               }
-
-                               // Replace POLoad with  indexer.
-                               if (! 
(OrderedLoadFunc.class.isAssignableFrom(rightLoadFunc.getClass()))){
-                                       int errCode = 1104;
-                                       String errMsg = "Right input of 
merge-join must implement " +
-                                                       "OrderedLoadFunc 
interface. The specified loader "
-                                                       + rightLoadFunc + " 
doesn't implement it";
-                                       throw new 
SparkCompilerException(errMsg,errCode);
-                               }
-
-                               String[] indexerArgs = new String[6];
-                               List<PhysicalPlan> rightInpPlans = 
joinOp.getInnerPlansOf(1);
-                               FileSpec origRightLoaderFileSpec = 
rightLoader.getLFile();
-
-                               indexerArgs[0] = 
origRightLoaderFileSpec.getFuncSpec().toString();
-                               indexerArgs[1] = 
ObjectSerializer.serialize((Serializable)rightInpPlans);
-                               indexerArgs[2] = 
ObjectSerializer.serialize(rightPipelinePlan);
-                               indexerArgs[3] = rightLoader.getSignature();
-                               indexerArgs[4] = 
rightLoader.getOperatorKey().scope;
-                               indexerArgs[5] = Boolean.toString(true);
-
-                               FileSpec lFile = new 
FileSpec(rightLoader.getLFile().getFileName(),new 
FuncSpec(MergeJoinIndexer.class.getName(), indexerArgs));
-                               rightLoader.setLFile(lFile);
-
-                               // (keyFirst1, keyFirst2, .. , position, 
splitIndex) See MergeJoinIndexer
-                               rightSparkOp.useTypedComparator(true);
-                               POStore idxStore = getStore();
-                               FileSpec idxStrFile = getTempFileSpec();
-                               idxStore.setSFile(idxStrFile);
-                               rightSparkOp.physicalPlan.addAsLeaf(idxStore);
-                               rightSparkOp.markIndexer();
-
-                               
curSparkOp.UDFs.add(origRightLoaderFileSpec.getFuncSpec().toString());
-
-                               // We want to ensure indexing job runs prior to 
actual join job.
-                               // So, connect them in order.
-                               sparkPlan.connect(rightSparkOp, curSparkOp);
-
-                               // set up the DefaultIndexableLoader for the 
join operator
-                               String[] defaultIndexableLoaderArgs = new 
String[5];
-                               defaultIndexableLoaderArgs[0] = 
origRightLoaderFileSpec.getFuncSpec().toString();
-                               defaultIndexableLoaderArgs[1] = 
idxStrFile.getFileName();
-                               defaultIndexableLoaderArgs[2] = 
idxStrFile.getFuncSpec().toString();
-                               defaultIndexableLoaderArgs[3] = 
joinOp.getOperatorKey().scope;
-                               defaultIndexableLoaderArgs[4] = 
origRightLoaderFileSpec.getFileName();
-                               joinOp.setRightLoaderFuncSpec((new 
FuncSpec(DefaultIndexableLoader.class.getName(), defaultIndexableLoaderArgs)));
-                               
joinOp.setRightInputFileName(origRightLoaderFileSpec.getFileName());
-
-                               joinOp.setIndexFile(idxStrFile.getFileName());
-                       }
-
-                       curSparkOp.physicalPlan.addAsLeaf(joinOp);
-                       phyToSparkOpMap.put(joinOp, curSparkOp);
-
-               } catch (Exception e) {
-                       int errCode = 2034;
-                       String msg = "Error compiling operator "
-                                       + joinOp.getClass().getSimpleName();
-                       throw new SparkCompilerException(msg, errCode, 
PigException.BUG, e);
-               }
-       }
-
-       private void processUDFs(PhysicalPlan plan) throws VisitorException {
-               if (plan != null) {
-                       // Process Scalars (UDF with referencedOperators)
-                       ScalarPhyFinder scalarPhyFinder = new 
ScalarPhyFinder(plan);
-                       scalarPhyFinder.visit();
-                       curSparkOp.scalars.addAll(scalarPhyFinder.getScalars());
-
-                       // Process UDFs
-                       udfFinder.setPlan(plan);
-                       udfFinder.visit();
-                       curSparkOp.UDFs.addAll(udfFinder.getUDFs());
-               }
-       }
-
-       private void addToPlan(PhysicalOperator op) throws PlanException,
-                       IOException {
-               SparkOperator sparkOp = null;
-               if (compiledInputs.length == 1) {
-                       sparkOp = compiledInputs[0];
-               } else {
-                       sparkOp = merge(compiledInputs);
-               }
-               sparkOp.physicalPlan.addAsLeaf(op);
-               curSparkOp = sparkOp;
-       }
-
-       private SparkOperator merge(SparkOperator[] compiledInputs)
-                       throws PlanException {
-               SparkOperator ret = getSparkOp();
-               sparkPlan.add(ret);
+        try {
+            curSparkOp = 
phyToSparkOpMap.get(op.getInputs().get(op.getFragment()));
+            for (int i = 0; i < compiledInputs.length; i++) {
+                SparkOperator sparkOperator = compiledInputs[i];
+                if (curSparkOp.equals(sparkOperator)) {
+                    continue;
+                }
+
+                OperatorKey broadcastKey = new OperatorKey(scope, 
nig.getNextNodeId(scope));
+                POBroadcastSpark poBroadcastSpark = new 
POBroadcastSpark(broadcastKey);
+                
poBroadcastSpark.setBroadcastedVariableName(broadcastKey.toString());
+
+                sparkOperator.physicalPlan.addAsLeaf(poBroadcastSpark);
+            }
+
+            POFRJoinSpark poFRJoinSpark = new POFRJoinSpark(op);
+            addToPlan(poFRJoinSpark);
+            phyToSparkOpMap.put(op, curSparkOp);
+        } catch (Exception e) {
+            int errCode = 2034;
+            String msg = "Error compiling operator " + 
op.getClass().getSimpleName();
+            throw new SparkCompilerException(msg, errCode, PigException.BUG, 
e);
+        }
+    }
+
+    @Override
+    public void visitMergeJoin(POMergeJoin joinOp) throws VisitorException {
+        try {
+            if (compiledInputs.length != 2 || joinOp.getInputs().size() != 2){
+                int errCode=1101;
+                throw new SparkCompilerException("Merge Join must have exactly 
two inputs. Found : "+compiledInputs.length, errCode);
+            }
+
+            curSparkOp = phyToSparkOpMap.get(joinOp.getInputs().get(0));
+            SparkOperator rightSparkOp;
+            if(curSparkOp.equals(compiledInputs[0])) {
+                rightSparkOp = compiledInputs[1];
+            } else {
+                rightSparkOp = compiledInputs[0];
+            }
+
+            PhysicalPlan rightPipelinePlan;
+            PhysicalPlan rightPhyPlan = rightSparkOp.physicalPlan;
+            if (rightPhyPlan.getRoots().size() != 1) {
+                int errCode = 2171;
+                String errMsg = "Expected one but found more then one root 
physical operator in physical plan.";
+                throw new SparkCompilerException(errMsg,errCode);
+            }
+            PhysicalOperator rightPhyLoader = rightPhyPlan.getRoots().get(0);
+            if (!(rightPhyLoader instanceof POLoad)) {
+                int errCode = 2172;
+                String errMsg = "Expected physical operator at root to be 
POLoad. Found : "+rightPhyLoader.getClass().getCanonicalName();
+                throw new SparkCompilerException(errMsg,errCode);
+            }
+            if (rightPhyPlan.getSuccessors(rightPhyLoader) == null || 
rightPhyPlan.getSuccessors(rightPhyLoader).isEmpty()) {
+                // Load - Join case.
+                rightPipelinePlan = null;
+            } else{ // We got something on right side. Yank it and set it as 
inner plan of right input.
+                rightPipelinePlan = rightPhyPlan.clone();
+                PhysicalOperator root = rightPipelinePlan.getRoots().get(0);
+                rightPipelinePlan.disconnect(root, 
rightPipelinePlan.getSuccessors(root).get(0));
+                rightPipelinePlan.remove(root);
+                rightPhyPlan.trimBelow(rightPhyLoader);
+            }
+
+            joinOp.setupRightPipeline(rightPipelinePlan);
+            rightSparkOp.setRequestedParallelism(1); // for indexing job
+
+            POLoad rightLoader = 
(POLoad)rightSparkOp.physicalPlan.getRoots().get(0);
+            joinOp.setSignature(rightLoader.getSignature());
+            LoadFunc rightLoadFunc = rightLoader.getLoadFunc();
+
+            
if(IndexableLoadFunc.class.isAssignableFrom(rightLoadFunc.getClass())) {
+                
joinOp.setRightLoaderFuncSpec(rightLoader.getLFile().getFuncSpec());
+                
joinOp.setRightInputFileName(rightLoader.getLFile().getFileName());
+                
curSparkOp.UDFs.add(rightLoader.getLFile().getFuncSpec().toString());
+
+                // we don't need the right rightSparkOp since
+                // the right loader is an IndexableLoadFunc which can handle 
the index itself
+                sparkPlan.remove(rightSparkOp);
+                if(rightSparkOp == compiledInputs[0]) {
+                    compiledInputs[0] = null;
+                } else if(rightSparkOp == compiledInputs[1]) {
+                    compiledInputs[1] = null;
+                }
+
+                // validate that the join keys in merge join are only
+                // simple column projections or '*' and not expression - 
expressions
+                // cannot be handled when the index is built by the storage 
layer on the sorted
+                // data when the sorted data (and corresponding index) is 
written.
+                // So merge join will be restricted not have expressions as 
join keys
+                int numInputs = mPlan.getPredecessors(joinOp).size(); // 
should be 2
+                for(int i = 0; i < numInputs; i++) {
+                    List<PhysicalPlan> keyPlans = joinOp.getInnerPlansOf(i);
+                    for (PhysicalPlan keyPlan : keyPlans) {
+                        for(PhysicalOperator op : keyPlan) {
+                            if(!(op instanceof POProject)) {
+                                int errCode = 1106;
+                                String errMsg = "Merge join is possible only 
for simple column or '*' join keys when using " +
+                                        rightLoader.getLFile().getFuncSpec() + 
" as the loader";
+                                throw new SparkCompilerException(errMsg, 
errCode, PigException.INPUT);
+                            }
+                        }
+                    }
+                }
+
+            } else {
+                //Replacing POLoad with indexer is disabled for 'merge-sparse' 
joins.  While
+                //this feature would be useful, the current implementation of 
DefaultIndexableLoader
+                //is not designed to handle multiple calls to seekNear.  
Specifically, it rereads the entire index
+                //for each call.  Some refactoring of this class is required - 
and then the check below could be removed.
+                if (joinOp.getJoinType() == LOJoin.JOINTYPE.MERGESPARSE) {
+                    int errCode = 1104;
+                    String errMsg = "Right input of merge-join must implement 
IndexableLoadFunc. " +
+                            "The specified loader " + rightLoadFunc + " 
doesn't implement it";
+                    throw new SparkCompilerException(errMsg,errCode);
+                }
+
+                // Replace POLoad with  indexer.
+                if (! 
(OrderedLoadFunc.class.isAssignableFrom(rightLoadFunc.getClass()))){
+                    int errCode = 1104;
+                    String errMsg = "Right input of merge-join must implement 
" +
+                            "OrderedLoadFunc interface. The specified loader "
+                            + rightLoadFunc + " doesn't implement it";
+                    throw new SparkCompilerException(errMsg,errCode);
+                }
+
+                String[] indexerArgs = new String[6];
+                List<PhysicalPlan> rightInpPlans = joinOp.getInnerPlansOf(1);
+                FileSpec origRightLoaderFileSpec = rightLoader.getLFile();
+
+                indexerArgs[0] = 
origRightLoaderFileSpec.getFuncSpec().toString();
+                indexerArgs[1] = 
ObjectSerializer.serialize((Serializable)rightInpPlans);
+                indexerArgs[2] = ObjectSerializer.serialize(rightPipelinePlan);
+                indexerArgs[3] = rightLoader.getSignature();
+                indexerArgs[4] = rightLoader.getOperatorKey().scope;
+                indexerArgs[5] = Boolean.toString(true);
+
+                FileSpec lFile = new 
FileSpec(rightLoader.getLFile().getFileName(),new 
FuncSpec(MergeJoinIndexer.class.getName(), indexerArgs));
+                rightLoader.setLFile(lFile);
+
+                // (keyFirst1, keyFirst2, .. , position, splitIndex) See 
MergeJoinIndexer
+                rightSparkOp.useTypedComparator(true);
+                POStore idxStore = getStore();
+                FileSpec idxStrFile = getTempFileSpec();
+                idxStore.setSFile(idxStrFile);
+                rightSparkOp.physicalPlan.addAsLeaf(idxStore);
+                rightSparkOp.markIndexer();
+
+                
curSparkOp.UDFs.add(origRightLoaderFileSpec.getFuncSpec().toString());
+
+                // We want to ensure indexing job runs prior to actual join 
job.
+                // So, connect them in order.
+                sparkPlan.connect(rightSparkOp, curSparkOp);
+
+                // set up the DefaultIndexableLoader for the join operator
+                String[] defaultIndexableLoaderArgs = new String[5];
+                defaultIndexableLoaderArgs[0] = 
origRightLoaderFileSpec.getFuncSpec().toString();
+                defaultIndexableLoaderArgs[1] = idxStrFile.getFileName();
+                defaultIndexableLoaderArgs[2] = 
idxStrFile.getFuncSpec().toString();
+                defaultIndexableLoaderArgs[3] = joinOp.getOperatorKey().scope;
+                defaultIndexableLoaderArgs[4] = 
origRightLoaderFileSpec.getFileName();
+                joinOp.setRightLoaderFuncSpec((new 
FuncSpec(DefaultIndexableLoader.class.getName(), defaultIndexableLoaderArgs)));
+                
joinOp.setRightInputFileName(origRightLoaderFileSpec.getFileName());
+
+                joinOp.setIndexFile(idxStrFile.getFileName());
+            }
+
+            curSparkOp.physicalPlan.addAsLeaf(joinOp);
+            phyToSparkOpMap.put(joinOp, curSparkOp);
+
+        } catch (Exception e) {
+            int errCode = 2034;
+            String msg = "Error compiling operator "
+                    + joinOp.getClass().getSimpleName();
+            throw new SparkCompilerException(msg, errCode, PigException.BUG, 
e);
+        }
+    }
+
+    private void processUDFs(PhysicalPlan plan) throws VisitorException {
+        if (plan != null) {
+            // Process Scalars (UDF with referencedOperators)
+            ScalarPhyFinder scalarPhyFinder = new ScalarPhyFinder(plan);
+            scalarPhyFinder.visit();
+            curSparkOp.scalars.addAll(scalarPhyFinder.getScalars());
+
+            // Process UDFs
+            udfFinder.setPlan(plan);
+            udfFinder.visit();
+            curSparkOp.UDFs.addAll(udfFinder.getUDFs());
+        }
+    }
+
+    private void addToPlan(PhysicalOperator op) throws PlanException,
+            IOException {
+        SparkOperator sparkOp = null;
+        if (compiledInputs.length == 1) {
+            sparkOp = compiledInputs[0];
+        } else {
+            sparkOp = merge(compiledInputs);
+        }
+        sparkOp.physicalPlan.addAsLeaf(op);
+        curSparkOp = sparkOp;
+    }
+
+    private SparkOperator merge(SparkOperator[] compiledInputs)
+            throws PlanException {
+        SparkOperator ret = getSparkOp();
+        sparkPlan.add(ret);
 
         Set<SparkOperator> toBeConnected = new HashSet<SparkOperator>();
-               List<SparkOperator> toBeRemoved = new 
ArrayList<SparkOperator>();
+        List<SparkOperator> toBeRemoved = new ArrayList<SparkOperator>();
 
-               List<PhysicalPlan> toBeMerged = new ArrayList<PhysicalPlan>();
+        List<PhysicalPlan> toBeMerged = new ArrayList<PhysicalPlan>();
 
-               for (SparkOperator sparkOp : compiledInputs) {
+        for (SparkOperator sparkOp : compiledInputs) {
             if (LOG.isDebugEnabled())
                 LOG.debug("Merging Spark operator" + sparkOp);
-                       toBeRemoved.add(sparkOp);
-                       toBeMerged.add(sparkOp.physicalPlan);
-                       List<SparkOperator> predecessors = sparkPlan
-                                       .getPredecessors(sparkOp);
-                       if (predecessors != null) {
-                               for (SparkOperator predecessorSparkOp : 
predecessors) {
-                                       toBeConnected.add(predecessorSparkOp);
-                               }
-                       }
-               }
-               merge(ret.physicalPlan, toBeMerged);
-
-               Iterator<SparkOperator> it = toBeConnected.iterator();
-               while (it.hasNext())
-                       sparkPlan.connect(it.next(), ret);
-               for (SparkOperator removeSparkOp : toBeRemoved) {
-                       if (removeSparkOp.requestedParallelism > 
ret.requestedParallelism)
-                               ret.requestedParallelism = 
removeSparkOp.requestedParallelism;
-                       for (String udf : removeSparkOp.UDFs) {
-                               if (!ret.UDFs.contains(udf))
-                                       ret.UDFs.add(udf);
-                       }
-                       // We also need to change scalar marking
-                       for (PhysicalOperator physOp : removeSparkOp.scalars) {
-                               if (!ret.scalars.contains(physOp)) {
-                                       ret.scalars.add(physOp);
-                               }
-                       }
-                       
-                       if(removeSparkOp.getCrossKeys()!=null){
-                               for(String crossKey: 
removeSparkOp.getCrossKeys())
-                                  ret.addCrossKey(crossKey);
-                       }
-                       
-                       
-                       Set<PhysicalOperator> opsToChange = new 
HashSet<PhysicalOperator>();
-                       for (Map.Entry<PhysicalOperator, SparkOperator> entry : 
phyToSparkOpMap
-                                       .entrySet()) {
-                               if (entry.getValue() == removeSparkOp) {
-                                       opsToChange.add(entry.getKey());
-                               }
-                       }
-                       for (PhysicalOperator op : opsToChange) {
-                               phyToSparkOpMap.put(op, ret);
-                       }
-
-                       sparkPlan.remove(removeSparkOp);
-               }
-               return ret;
-       }
-
-       /**
-        * The merge of a list of plans into a single physicalPlan
-        * 
-        * @param <O>
-        * @param <E>
-        * @param finPlan
-        *            - Final Plan into which the list of plans is merged
-        * @param plans
-        *            - list of plans to be merged
-        * @throws PlanException
-        */
-       private <O extends Operator<?>, E extends OperatorPlan<O>> void merge(
-                       E finPlan, List<E> plans) throws PlanException {
-               for (E e : plans) {
-                       finPlan.merge(e);
-               }
-       }
+            toBeRemoved.add(sparkOp);
+            toBeMerged.add(sparkOp.physicalPlan);
+            List<SparkOperator> predecessors = sparkPlan
+                    .getPredecessors(sparkOp);
+            if (predecessors != null) {
+                for (SparkOperator predecessorSparkOp : predecessors) {
+                    toBeConnected.add(predecessorSparkOp);
+                }
+            }
+        }
+        merge(ret.physicalPlan, toBeMerged);
+
+        Iterator<SparkOperator> it = toBeConnected.iterator();
+        while (it.hasNext())
+            sparkPlan.connect(it.next(), ret);
+        for (SparkOperator removeSparkOp : toBeRemoved) {
+            if (removeSparkOp.requestedParallelism > ret.requestedParallelism)
+                ret.requestedParallelism = removeSparkOp.requestedParallelism;
+            for (String udf : removeSparkOp.UDFs) {
+                if (!ret.UDFs.contains(udf))
+                    ret.UDFs.add(udf);
+            }
+            // We also need to change scalar marking
+            for (PhysicalOperator physOp : removeSparkOp.scalars) {
+                if (!ret.scalars.contains(physOp)) {
+                    ret.scalars.add(physOp);
+                }
+            }
+            
+            if(removeSparkOp.getCrossKeys()!=null){
+                for(String crossKey: removeSparkOp.getCrossKeys())
+                   ret.addCrossKey(crossKey);
+            }
+            
+            
+            Set<PhysicalOperator> opsToChange = new 
HashSet<PhysicalOperator>();
+            for (Map.Entry<PhysicalOperator, SparkOperator> entry : 
phyToSparkOpMap
+                    .entrySet()) {
+                if (entry.getValue() == removeSparkOp) {
+                    opsToChange.add(entry.getKey());
+                }
+            }
+            for (PhysicalOperator op : opsToChange) {
+                phyToSparkOpMap.put(op, ret);
+            }
+
+            sparkPlan.remove(removeSparkOp);
+        }
+        return ret;
+    }
+
+    /**
+     * The merge of a list of plans into a single physicalPlan
+     * 
+     * @param <O>
+     * @param <E>
+     * @param finPlan
+     *            - Final Plan into which the list of plans is merged
+     * @param plans
+     *            - list of plans to be merged
+     * @throws PlanException
+     */
+    private <O extends Operator<?>, E extends OperatorPlan<O>> void merge(
+            E finPlan, List<E> plans) throws PlanException {
+        for (E e : plans) {
+            finPlan.merge(e);
+        }
+    }
 
     @Override
     public void visitMergeCoGroup(POMergeCogroup poCoGrp) throws 
VisitorException {
@@ -1200,24 +1200,24 @@ public class SparkCompiler extends PhyPl
     /**
      * build a POPoissonSampleSpark operator for SkewedJoin's sampling job
      */
-       private void addSampleOperatorForSkewedJoin(SparkOperator sampleSparkOp)
-                       throws PlanException {
-               Configuration conf = 
ConfigurationUtil.toConfiguration(pigProperties);
-               int sampleRate = conf.getInt(
-                               
PigConfiguration.PIG_POISSON_SAMPLER_SAMPLE_RATE,
-                               POPoissonSampleSpark.DEFAULT_SAMPLE_RATE);
-               float heapPerc = conf.getFloat(
-                               PigConfiguration.PIG_SKEWEDJOIN_REDUCE_MEMUSAGE,
-                               PartitionSkewedKeys.DEFAULT_PERCENT_MEMUSAGE);
-               long totalMemory = conf.getLong(
-                               PigConfiguration.PIG_SKEWEDJOIN_REDUCE_MEM, -1);
-
-               POPoissonSampleSpark poSample = new POPoissonSampleSpark(
-                               new OperatorKey(scope, 
nig.getNextNodeId(scope)), -1,
-                               sampleRate, heapPerc, totalMemory);
+    private void addSampleOperatorForSkewedJoin(SparkOperator sampleSparkOp)
+            throws PlanException {
+        Configuration conf = ConfigurationUtil.toConfiguration(pigProperties);
+        int sampleRate = conf.getInt(
+                PigConfiguration.PIG_POISSON_SAMPLER_SAMPLE_RATE,
+                POPoissonSampleSpark.DEFAULT_SAMPLE_RATE);
+        float heapPerc = conf.getFloat(
+                PigConfiguration.PIG_SKEWEDJOIN_REDUCE_MEMUSAGE,
+                PartitionSkewedKeys.DEFAULT_PERCENT_MEMUSAGE);
+        long totalMemory = conf.getLong(
+                PigConfiguration.PIG_SKEWEDJOIN_REDUCE_MEM, -1);
+
+        POPoissonSampleSpark poSample = new POPoissonSampleSpark(
+                new OperatorKey(scope, nig.getNextNodeId(scope)), -1,
+                sampleRate, heapPerc, totalMemory);
 
-               sampleSparkOp.physicalPlan.addAsLeaf(poSample);
-       }
+        sampleSparkOp.physicalPlan.addAsLeaf(poSample);
+    }
 
     private SparkOperator getSortJob(
             POSort sort,
@@ -1480,17 +1480,17 @@ public class SparkCompiler extends PhyPl
         throw new PlanException(msg, errCode, PigException.BUG);
     }
 
-       /**
-        * Add POBroadcastSpark operator to broadcast key distribution for 
SkewedJoin's sampling job
-        * @param sampleSparkOp
-        * @throws PlanException
-        */
-       private void buildBroadcastForSkewedJoin(SparkOperator sampleSparkOp, 
String pigKeyDistFile) throws PlanException {
-
-               POBroadcastSpark poBroadcast = new POBroadcastSpark(new 
OperatorKey(scope, nig.getNextNodeId(scope)));
-               poBroadcast.setBroadcastedVariableName(pigKeyDistFile);
-               sampleSparkOp.physicalPlan.addAsLeaf(poBroadcast);
-       }
+    /**
+     * Add POBroadcastSpark operator to broadcast key distribution for 
SkewedJoin's sampling job
+     * @param sampleSparkOp
+     * @throws PlanException
+     */
+    private void buildBroadcastForSkewedJoin(SparkOperator sampleSparkOp, 
String pigKeyDistFile) throws PlanException {
+
+        POBroadcastSpark poBroadcast = new POBroadcastSpark(new 
OperatorKey(scope, nig.getNextNodeId(scope)));
+        poBroadcast.setBroadcastedVariableName(pigKeyDistFile);
+        sampleSparkOp.physicalPlan.addAsLeaf(poBroadcast);
+    }
 
     /**
      * Create Sampling job for skewed join.

Modified: 
pig/branches/spark/src/org/apache/pig/backend/hadoop/executionengine/spark/plan/SparkOpPlanVisitor.java
URL: 
http://svn.apache.org/viewvc/pig/branches/spark/src/org/apache/pig/backend/hadoop/executionengine/spark/plan/SparkOpPlanVisitor.java?rev=1791060&r1=1791059&r2=1791060&view=diff
==============================================================================
--- 
pig/branches/spark/src/org/apache/pig/backend/hadoop/executionengine/spark/plan/SparkOpPlanVisitor.java
 (original)
+++ 
pig/branches/spark/src/org/apache/pig/backend/hadoop/executionengine/spark/plan/SparkOpPlanVisitor.java
 Wed Apr 12 02:20:20 2017
@@ -25,17 +25,15 @@ import org.apache.pig.impl.plan.VisitorE
  * A visitor for the SparkOperPlan class
  */
 public class SparkOpPlanVisitor extends
-               PlanVisitor<SparkOperator, SparkOperPlan> {
+        PlanVisitor<SparkOperator, SparkOperPlan> {
 
-       public SparkOpPlanVisitor(SparkOperPlan plan,
-                       PlanWalker<SparkOperator, SparkOperPlan> walker) {
-               super(plan, walker);
-               // TODO Auto-generated constructor stub
-       }
+    public SparkOpPlanVisitor(SparkOperPlan plan,
+            PlanWalker<SparkOperator, SparkOperPlan> walker) {
+        super(plan, walker);
+    }
 
-       public void visitSparkOp(SparkOperator sparkOperator)
-                       throws VisitorException {
-               // TODO Auto-generated method stub
-       }
+    public void visitSparkOp(SparkOperator sparkOperator)
+            throws VisitorException {
+    }
 
 }

Modified: 
pig/branches/spark/src/org/apache/pig/backend/hadoop/executionengine/spark/plan/SparkOperator.java
URL: 
http://svn.apache.org/viewvc/pig/branches/spark/src/org/apache/pig/backend/hadoop/executionengine/spark/plan/SparkOperator.java?rev=1791060&r1=1791059&r2=1791060&view=diff
==============================================================================
--- 
pig/branches/spark/src/org/apache/pig/backend/hadoop/executionengine/spark/plan/SparkOperator.java
 (original)
+++ 
pig/branches/spark/src/org/apache/pig/backend/hadoop/executionengine/spark/plan/SparkOperator.java
 Wed Apr 12 02:20:20 2017
@@ -35,18 +35,18 @@ import org.apache.pig.impl.util.MultiMap
  * execute in spark.
  */
 public class SparkOperator extends Operator<SparkOpPlanVisitor> {
-       private static enum OPER_FEATURE {
-               NONE,
-               // Indicate if this job is a sampling job
-               SAMPLER,
-               // Indicate if this job is a merge indexer
-               INDEXER,
-               // Indicate if this job is a group by job
-               GROUPBY,
-               // Indicate if this job is a cogroup job
-               COGROUP,
-               // Indicate if this job is a regular join job
-               HASHJOIN,
+    private static enum OPER_FEATURE {
+        NONE,
+        // Indicate if this job is a sampling job
+        SAMPLER,
+        // Indicate if this job is a merge indexer
+        INDEXER,
+        // Indicate if this job is a group by job
+        GROUPBY,
+        // Indicate if this job is a cogroup job
+        COGROUP,
+        // Indicate if this job is a regular join job
+        HASHJOIN,
         // Indicate if this job is a union job
         UNION,
         // Indicate if this job is a native job
@@ -55,32 +55,32 @@ public class SparkOperator extends Opera
         LIMIT,
         // Indicate if this job is a limit job after sort
         LIMIT_AFTER_SORT;
-       };
+    };
 
-       public PhysicalPlan physicalPlan;
+    public PhysicalPlan physicalPlan;
 
-       public Set<String> UDFs;
+    public Set<String> UDFs;
 
-       /* Name of the Custom Partitioner used */
-       public String customPartitioner = null;
+    /* Name of the Custom Partitioner used */
+    public String customPartitioner = null;
 
-       public Set<PhysicalOperator> scalars;
+    public Set<PhysicalOperator> scalars;
 
-       public int requestedParallelism = -1;
+    public int requestedParallelism = -1;
 
     private BitSet feature = new BitSet();
 
-       private boolean splitter = false;
+    private boolean splitter = false;
 
-       // Name of the partition file generated by sampling process,
-       // Used by Skewed Join
-       private String skewedJoinPartitionFile;
+    // Name of the partition file generated by sampling process,
+    // Used by Skewed Join
+    private String skewedJoinPartitionFile;
 
-       private boolean usingTypedComparator = false;
+    private boolean usingTypedComparator = false;
 
-       private boolean combineSmallSplits = true;
+    private boolean combineSmallSplits = true;
 
-       private List<String> crossKeys = null;
+    private List<String> crossKeys = null;
 
     private MultiMap<OperatorKey, OperatorKey> multiQueryOptimizeConnectionMap 
= new MultiMap<OperatorKey, OperatorKey>();
 
@@ -100,126 +100,126 @@ public class SparkOperator extends Opera
         scalars = new HashSet<PhysicalOperator>();
     }
 
-       @Override
-       public boolean supportsMultipleInputs() {
-               return true;
-       }
-
-       @Override
-       public boolean supportsMultipleOutputs() {
-               return true;
-       }
-
-       @Override
-       public String name() {
-               String udfStr = getUDFsAsStr();
-               StringBuilder sb = new StringBuilder("Spark" + "("
-                               + requestedParallelism + (udfStr.equals("") ? 
"" : ",")
-                               + udfStr + ")" + " - " + mKey.toString());
-               return sb.toString();
-       }
-
-       private String getUDFsAsStr() {
-               StringBuilder sb = new StringBuilder();
-               if (UDFs != null && UDFs.size() > 0) {
-                       for (String str : UDFs) {
-                               sb.append(str.substring(str.lastIndexOf('.') + 
1));
-                               sb.append(',');
-                       }
-                       sb.deleteCharAt(sb.length() - 1);
-               }
-               return sb.toString();
-       }
-
-       public void add(PhysicalOperator physicalOper) {
-               this.physicalPlan.add(physicalOper);
-       }
-
-       @Override
-       public void visit(SparkOpPlanVisitor v) throws VisitorException {
-               v.visitSparkOp(this);
-       }
-
-       public void addCrossKey(String key) {
-               if (crossKeys == null) {
-                       crossKeys = new ArrayList<String>();
-               }
-               crossKeys.add(key);
-       }
-
-       public List<String> getCrossKeys() {
-               return crossKeys;
-       }
+    @Override
+    public boolean supportsMultipleInputs() {
+        return true;
+    }
+
+    @Override
+    public boolean supportsMultipleOutputs() {
+        return true;
+    }
+
+    @Override
+    public String name() {
+        String udfStr = getUDFsAsStr();
+        StringBuilder sb = new StringBuilder("Spark" + "("
+                + requestedParallelism + (udfStr.equals("") ? "" : ",")
+                + udfStr + ")" + " - " + mKey.toString());
+        return sb.toString();
+    }
+
+    private String getUDFsAsStr() {
+        StringBuilder sb = new StringBuilder();
+        if (UDFs != null && UDFs.size() > 0) {
+            for (String str : UDFs) {
+                sb.append(str.substring(str.lastIndexOf('.') + 1));
+                sb.append(',');
+            }
+            sb.deleteCharAt(sb.length() - 1);
+        }
+        return sb.toString();
+    }
+
+    public void add(PhysicalOperator physicalOper) {
+        this.physicalPlan.add(physicalOper);
+    }
+
+    @Override
+    public void visit(SparkOpPlanVisitor v) throws VisitorException {
+        v.visitSparkOp(this);
+    }
+
+    public void addCrossKey(String key) {
+        if (crossKeys == null) {
+            crossKeys = new ArrayList<String>();
+        }
+        crossKeys.add(key);
+    }
+
+    public List<String> getCrossKeys() {
+        return crossKeys;
+    }
 
-       public boolean isGroupBy() {
+    public boolean isGroupBy() {
         return feature.get(OPER_FEATURE.GROUPBY.ordinal());
     }
 
-       public void markGroupBy() {
+    public void markGroupBy() {
         feature.set(OPER_FEATURE.GROUPBY.ordinal());
     }
 
-       public boolean isCogroup() {
+    public boolean isCogroup() {
         return feature.get(OPER_FEATURE.COGROUP.ordinal());
     }
 
-       public void markCogroup() {
+    public void markCogroup() {
         feature.set(OPER_FEATURE.COGROUP.ordinal());
     }
 
-       public boolean isRegularJoin() {
+    public boolean isRegularJoin() {
         return feature.get(OPER_FEATURE.HASHJOIN.ordinal());
     }
 
-       public void markRegularJoin() {
+    public void markRegularJoin() {
         feature.set(OPER_FEATURE.HASHJOIN.ordinal());
     }
 
-       public int getRequestedParallelism() {
-               return requestedParallelism;
-       }
-
-       public void setSplitter(boolean spl) {
-               splitter = spl;
-       }
-
-       public boolean isSplitter() {
-               return splitter;
-       }
+    public int getRequestedParallelism() {
+        return requestedParallelism;
+    }
 
-       public boolean isSampler() {
+    public void setSplitter(boolean spl) {
+        splitter = spl;
+    }
+
+    public boolean isSplitter() {
+        return splitter;
+    }
+
+    public boolean isSampler() {
         return feature.get(OPER_FEATURE.SAMPLER.ordinal());
     }
 
-       public void markSampler() {
+    public void markSampler() {
         feature.set(OPER_FEATURE.SAMPLER.ordinal());
     }
 
-       public void setSkewedJoinPartitionFile(String file) {
-               skewedJoinPartitionFile = file;
-       }
-
-       public String getSkewedJoinPartitionFile() {
-               return skewedJoinPartitionFile;
-       }
-
-       protected boolean usingTypedComparator() {
-               return usingTypedComparator;
-       }
-
-       protected void useTypedComparator(boolean useTypedComparator) {
-               this.usingTypedComparator = useTypedComparator;
-       }
-
-       protected void noCombineSmallSplits() {
-               combineSmallSplits = false;
-       }
-
-       public boolean combineSmallSplits() {
-               return combineSmallSplits;
-       }
+    public void setSkewedJoinPartitionFile(String file) {
+        skewedJoinPartitionFile = file;
+    }
+
+    public String getSkewedJoinPartitionFile() {
+        return skewedJoinPartitionFile;
+    }
+
+    protected boolean usingTypedComparator() {
+        return usingTypedComparator;
+    }
+
+    protected void useTypedComparator(boolean useTypedComparator) {
+        this.usingTypedComparator = useTypedComparator;
+    }
+
+    protected void noCombineSmallSplits() {
+        combineSmallSplits = false;
+    }
+
+    public boolean combineSmallSplits() {
+        return combineSmallSplits;
+    }
 
-       public boolean isIndexer() {
+    public boolean isIndexer() {
         return feature.get(OPER_FEATURE.INDEXER.ordinal());
     }
 

Modified: 
pig/branches/spark/src/org/apache/pig/backend/hadoop/executionengine/spark/plan/SparkPOPackageAnnotator.java
URL: 
http://svn.apache.org/viewvc/pig/branches/spark/src/org/apache/pig/backend/hadoop/executionengine/spark/plan/SparkPOPackageAnnotator.java?rev=1791060&r1=1791059&r2=1791060&view=diff
==============================================================================
--- 
pig/branches/spark/src/org/apache/pig/backend/hadoop/executionengine/spark/plan/SparkPOPackageAnnotator.java
 (original)
+++ 
pig/branches/spark/src/org/apache/pig/backend/hadoop/executionengine/spark/plan/SparkPOPackageAnnotator.java
 Wed Apr 12 02:20:20 2017
@@ -45,101 +45,101 @@ import org.apache.pig.impl.util.Pair;
  * stitched in to the "value"
  */
 public class SparkPOPackageAnnotator extends SparkOpPlanVisitor {
-       private static final Log LOG = 
LogFactory.getLog(SparkPOPackageAnnotator.class);
+    private static final Log LOG = 
LogFactory.getLog(SparkPOPackageAnnotator.class);
 
-       public SparkPOPackageAnnotator(SparkOperPlan plan) {
-               super(plan, new DepthFirstWalker<SparkOperator, 
SparkOperPlan>(plan));
-       }
-
-       @Override
-       public void visitSparkOp(SparkOperator sparkOp) throws VisitorException 
{
-               if (!sparkOp.physicalPlan.isEmpty()) {
-                       PackageDiscoverer pkgDiscoverer = new PackageDiscoverer(
-                                       sparkOp.physicalPlan);
-                       pkgDiscoverer.visit();
-               }
-       }
-
-       static class PackageDiscoverer extends PhyPlanVisitor {
-               private POPackage pkg;
-               private PhysicalPlan plan;
-
-               public PackageDiscoverer(PhysicalPlan plan) {
-                       super(plan, new DepthFirstWalker<PhysicalOperator, 
PhysicalPlan>(
-                                       plan));
-                       this.plan = plan;
-               }
-
-               @Override
-               public void visitPackage(POPackage pkg) throws VisitorException 
{
-                       this.pkg = pkg;
-
-                       // Find POLocalRearrange(s) corresponding to this 
POPackage
-                       PhysicalOperator graOp = 
plan.getPredecessors(pkg).get(0);
-                       if (! (graOp instanceof POGlobalRearrange)) {
-                                 throw new OptimizerException("Package 
operator is not preceded by " +
-                                           "GlobalRearrange operator in Spark 
Plan", 2087, PigException.BUG);
-                       }
+    public SparkPOPackageAnnotator(SparkOperPlan plan) {
+        super(plan, new DepthFirstWalker<SparkOperator, SparkOperPlan>(plan));
+    }
+
+    @Override
+    public void visitSparkOp(SparkOperator sparkOp) throws VisitorException {
+        if (!sparkOp.physicalPlan.isEmpty()) {
+            PackageDiscoverer pkgDiscoverer = new PackageDiscoverer(
+                    sparkOp.physicalPlan);
+            pkgDiscoverer.visit();
+        }
+    }
+
+    static class PackageDiscoverer extends PhyPlanVisitor {
+        private POPackage pkg;
+        private PhysicalPlan plan;
+
+        public PackageDiscoverer(PhysicalPlan plan) {
+            super(plan, new DepthFirstWalker<PhysicalOperator, PhysicalPlan>(
+                    plan));
+            this.plan = plan;
+        }
+
+        @Override
+        public void visitPackage(POPackage pkg) throws VisitorException {
+            this.pkg = pkg;
+
+            // Find POLocalRearrange(s) corresponding to this POPackage
+            PhysicalOperator graOp = plan.getPredecessors(pkg).get(0);
+            if (! (graOp instanceof POGlobalRearrange)) {
+                  throw new OptimizerException("Package operator is not 
preceded by " +
+                        "GlobalRearrange operator in Spark Plan", 2087, 
PigException.BUG);
+            }
 
-                       List<PhysicalOperator> lraOps = 
plan.getPredecessors(graOp);
-                       if (pkg.getNumInps() != lraOps.size()) {
+            List<PhysicalOperator> lraOps = plan.getPredecessors(graOp);
+            if (pkg.getNumInps() != lraOps.size()) {
           throw new OptimizerException("Unexpected problem during 
optimization. " +
-                                                       "Could not find all 
LocalRearrange operators. Expected " + pkg.getNumInps() +
-                                           ". Got " + lraOps.size() + ".", 
2086, PigException.BUG);
-                       }
-                       Collections.sort(lraOps);
-                       for (PhysicalOperator op : lraOps) {
-                               if (! (op instanceof POLocalRearrange)) {
-                                       throw new 
OptimizerException("GlobalRearrange operator can only be preceded by " +
-                                                       "LocalRearrange 
operator(s) in Spark Plan", 2087, PigException.BUG);
-                               }
-                               annotatePkgWithLRA((POLocalRearrange)op);
-                       }
-               };
-
-               private void annotatePkgWithLRA(POLocalRearrange lrearrange)
-                               throws VisitorException {
-
-                       Map<Integer, Pair<Boolean, Map<Integer, Integer>>> 
keyInfo;
-                       if (LOG.isDebugEnabled())
-                            LOG.debug("Annotating package " + pkg + " with 
localrearrange operator "
+                            "Could not find all LocalRearrange operators. 
Expected " + pkg.getNumInps() +
+                        ". Got " + lraOps.size() + ".", 2086, 
PigException.BUG);
+            }
+            Collections.sort(lraOps);
+            for (PhysicalOperator op : lraOps) {
+                if (! (op instanceof POLocalRearrange)) {
+                    throw new OptimizerException("GlobalRearrange operator can 
only be preceded by " +
+                            "LocalRearrange operator(s) in Spark Plan", 2087, 
PigException.BUG);
+                }
+                annotatePkgWithLRA((POLocalRearrange)op);
+            }
+        };
+
+        private void annotatePkgWithLRA(POLocalRearrange lrearrange)
+                throws VisitorException {
+
+            Map<Integer, Pair<Boolean, Map<Integer, Integer>>> keyInfo;
+            if (LOG.isDebugEnabled())
+                 LOG.debug("Annotating package " + pkg + " with localrearrange 
operator "
                + lrearrange + " with index " + lrearrange.getIndex());
 
-                       if (pkg.getPkgr() instanceof LitePackager) {
-                               if (lrearrange.getIndex() != 0) {
-                                       throw new RuntimeException(
-                                                       "POLocalRearrange for 
POPackageLite cannot have index other than 0, but has index - "
-                                                                       + 
lrearrange.getIndex());
-                               }
-                       }
-
-                       // annotate the package with information from the 
LORearrange
-                       // update the keyInfo information if already present in 
the
-                       // POPackage
-                       keyInfo = pkg.getPkgr().getKeyInfo();
-                       if (keyInfo == null)
-                               keyInfo = new HashMap<Integer, Pair<Boolean, 
Map<Integer, Integer>>>();
-
-                       if (keyInfo.get(Integer.valueOf(lrearrange.getIndex())) 
!= null) {
-                               // something is wrong - we should not be 
getting key info
-                               // for the same index from two different Local 
Rearranges
-                               int errCode = 2087;
-                               String msg = "Unexpected problem during 
optimization."
-                                               + " Found index:" + 
lrearrange.getIndex()
-                                               + " in multiple LocalRearrange 
operators.";
-                               throw new OptimizerException(msg, errCode, 
PigException.BUG);
-
-                       }
-                       keyInfo.put(
-                                       Integer.valueOf(lrearrange.getIndex()),
-                                       new Pair<Boolean, Map<Integer, 
Integer>>(lrearrange
-                                                       .isProjectStar(), 
lrearrange.getProjectedColsMap()));
-                       if (LOG.isDebugEnabled())
+            if (pkg.getPkgr() instanceof LitePackager) {
+                if (lrearrange.getIndex() != 0) {
+                    throw new RuntimeException(
+                            "POLocalRearrange for POPackageLite cannot have 
index other than 0, but has index - "
+                                    + lrearrange.getIndex());
+                }
+            }
+
+            // annotate the package with information from the LORearrange
+            // update the keyInfo information if already present in the
+            // POPackage
+            keyInfo = pkg.getPkgr().getKeyInfo();
+            if (keyInfo == null)
+                keyInfo = new HashMap<Integer, Pair<Boolean, Map<Integer, 
Integer>>>();
+
+            if (keyInfo.get(Integer.valueOf(lrearrange.getIndex())) != null) {
+                // something is wrong - we should not be getting key info
+                // for the same index from two different Local Rearranges
+                int errCode = 2087;
+                String msg = "Unexpected problem during optimization."
+                        + " Found index:" + lrearrange.getIndex()
+                        + " in multiple LocalRearrange operators.";
+                throw new OptimizerException(msg, errCode, PigException.BUG);
+
+            }
+            keyInfo.put(
+                    Integer.valueOf(lrearrange.getIndex()),
+                    new Pair<Boolean, Map<Integer, Integer>>(lrearrange
+                            .isProjectStar(), 
lrearrange.getProjectedColsMap()));
+            if (LOG.isDebugEnabled())
           LOG.debug("KeyInfo for packager for package operator " + pkg + " is "
               + keyInfo );
-                       pkg.getPkgr().setKeyInfo(keyInfo);
-                       pkg.getPkgr().setKeyTuple(lrearrange.isKeyTuple());
-                       
pkg.getPkgr().setKeyCompound(lrearrange.isKeyCompound());
-               }
-       }
+            pkg.getPkgr().setKeyInfo(keyInfo);
+            pkg.getPkgr().setKeyTuple(lrearrange.isKeyTuple());
+            pkg.getPkgr().setKeyCompound(lrearrange.isKeyCompound());
+        }
+    }
 }
\ No newline at end of file

Modified: 
pig/branches/spark/src/org/apache/pig/backend/hadoop/executionengine/spark/plan/SparkPrinter.java
URL: 
http://svn.apache.org/viewvc/pig/branches/spark/src/org/apache/pig/backend/hadoop/executionengine/spark/plan/SparkPrinter.java?rev=1791060&r1=1791059&r2=1791060&view=diff
==============================================================================
--- 
pig/branches/spark/src/org/apache/pig/backend/hadoop/executionengine/spark/plan/SparkPrinter.java
 (original)
+++ 
pig/branches/spark/src/org/apache/pig/backend/hadoop/executionengine/spark/plan/SparkPrinter.java
 Wed Apr 12 02:20:20 2017
@@ -34,20 +34,20 @@ import org.apache.pig.impl.plan.VisitorE
  */
 public class SparkPrinter extends SparkOpPlanVisitor {
 
-       private PrintStream mStream = null;
-       private boolean isVerbose = true;
+    private PrintStream mStream = null;
+    private boolean isVerbose = true;
 
-       public SparkPrinter(PrintStream ps, SparkOperPlan plan) {
-               super(plan, new DepthFirstWalker<SparkOperator, 
SparkOperPlan>(plan));
-               mStream = ps;
-               
mStream.println("#--------------------------------------------------");
-               mStream.println("# Spark Plan                                  
");
-               
mStream.println("#--------------------------------------------------");
-       }
+    public SparkPrinter(PrintStream ps, SparkOperPlan plan) {
+        super(plan, new DepthFirstWalker<SparkOperator, SparkOperPlan>(plan));
+        mStream = ps;
+        mStream.println("#--------------------------------------------------");
+        mStream.println("# Spark Plan                                  ");
+        mStream.println("#--------------------------------------------------");
+    }
 
-       public void setVerbose(boolean verbose) {
-               isVerbose = verbose;
-       }
+    public void setVerbose(boolean verbose) {
+        isVerbose = verbose;
+    }
 
     @Override
     public void visitSparkOp(SparkOperator sparkOp) throws VisitorException {


Reply via email to