Modified: pig/branches/spark/src/org/apache/pig/backend/hadoop/executionengine/spark/plan/SparkCompiler.java URL: http://svn.apache.org/viewvc/pig/branches/spark/src/org/apache/pig/backend/hadoop/executionengine/spark/plan/SparkCompiler.java?rev=1791060&r1=1791059&r2=1791060&view=diff ============================================================================== --- pig/branches/spark/src/org/apache/pig/backend/hadoop/executionengine/spark/plan/SparkCompiler.java (original) +++ pig/branches/spark/src/org/apache/pig/backend/hadoop/executionengine/spark/plan/SparkCompiler.java Wed Apr 12 02:20:20 2017 @@ -25,8 +25,8 @@ import java.util.HashMap; import java.util.HashSet; import java.util.Iterator; import java.util.List; -import java.util.Properties; import java.util.Map; +import java.util.Properties; import java.util.Random; import java.util.Set; @@ -112,53 +112,53 @@ public class SparkCompiler extends PhyPl private static final Log LOG = LogFactory.getLog(SparkCompiler.class); private PigContext pigContext; - private Properties pigProperties; + private Properties pigProperties; - // The physicalPlan that is being compiled - private PhysicalPlan physicalPlan; + // The physicalPlan that is being compiled + private PhysicalPlan physicalPlan; - // The physicalPlan of Spark Operators - private SparkOperPlan sparkPlan; + // The physicalPlan of Spark Operators + private SparkOperPlan sparkPlan; - private SparkOperator curSparkOp; + private SparkOperator curSparkOp; - private String scope; + private String scope; - private SparkOperator[] compiledInputs = null; + private SparkOperator[] compiledInputs = null; - private Map<OperatorKey, SparkOperator> splitsSeen; + private Map<OperatorKey, SparkOperator> splitsSeen; - private NodeIdGenerator nig; + private NodeIdGenerator nig; - private Map<PhysicalOperator, SparkOperator> phyToSparkOpMap; - private UDFFinder udfFinder; + private Map<PhysicalOperator, SparkOperator> phyToSparkOpMap; + private UDFFinder udfFinder; - public SparkCompiler(PhysicalPlan physicalPlan, PigContext pigContext) { - super(physicalPlan, - new DepthFirstWalker<PhysicalOperator, PhysicalPlan>( - physicalPlan)); - this.physicalPlan = physicalPlan; - this.pigContext = pigContext; - this.pigProperties = pigContext.getProperties(); - this.sparkPlan = new SparkOperPlan(); - this.phyToSparkOpMap = new HashMap<PhysicalOperator, SparkOperator>(); - this.udfFinder = new UDFFinder(); - this.nig = NodeIdGenerator.getGenerator(); - this.splitsSeen = new HashMap<OperatorKey, SparkOperator>(); + public SparkCompiler(PhysicalPlan physicalPlan, PigContext pigContext) { + super(physicalPlan, + new DepthFirstWalker<PhysicalOperator, PhysicalPlan>( + physicalPlan)); + this.physicalPlan = physicalPlan; + this.pigContext = pigContext; + this.pigProperties = pigContext.getProperties(); + this.sparkPlan = new SparkOperPlan(); + this.phyToSparkOpMap = new HashMap<PhysicalOperator, SparkOperator>(); + this.udfFinder = new UDFFinder(); + this.nig = NodeIdGenerator.getGenerator(); + this.splitsSeen = new HashMap<OperatorKey, SparkOperator>(); - } + } - public void compile() throws IOException, PlanException, VisitorException { - List<PhysicalOperator> roots = physicalPlan.getRoots(); - if ((roots == null) || (roots.size() <= 0)) { - int errCode = 2053; - String msg = "Internal error. Did not find roots in the physical physicalPlan."; - throw new SparkCompilerException(msg, errCode, PigException.BUG); - } - scope = roots.get(0).getOperatorKey().getScope(); - List<PhysicalOperator> leaves = physicalPlan.getLeaves(); + public void compile() throws IOException, PlanException, VisitorException { + List<PhysicalOperator> roots = physicalPlan.getRoots(); + if ((roots == null) || (roots.size() <= 0)) { + int errCode = 2053; + String msg = "Internal error. Did not find roots in the physical physicalPlan."; + throw new SparkCompilerException(msg, errCode, PigException.BUG); + } + scope = roots.get(0).getOperatorKey().getScope(); + List<PhysicalOperator> leaves = physicalPlan.getLeaves(); - if (!pigContext.inIllustrator) { + if (!pigContext.inIllustrator) { for (PhysicalOperator op : leaves) { if (!(op instanceof POStore)) { int errCode = 2025; @@ -171,321 +171,321 @@ public class SparkCompiler extends PhyPl } } - // get all stores and nativeSpark operators, sort them in order(operator - // id) - // and compile their plans - List<POStore> stores = PlanHelper.getPhysicalOperators(physicalPlan, - POStore.class); - List<PONative> nativeSparks = PlanHelper.getPhysicalOperators( - physicalPlan, PONative.class); - List<PhysicalOperator> ops; - if (!pigContext.inIllustrator) { - ops = new ArrayList<PhysicalOperator>(stores.size() - + nativeSparks.size()); - ops.addAll(stores); - } else { - ops = new ArrayList<PhysicalOperator>(leaves.size() - + nativeSparks.size()); - ops.addAll(leaves); - } - ops.addAll(nativeSparks); - Collections.sort(ops); + // get all stores and nativeSpark operators, sort them in order(operator + // id) + // and compile their plans + List<POStore> stores = PlanHelper.getPhysicalOperators(physicalPlan, + POStore.class); + List<PONative> nativeSparks = PlanHelper.getPhysicalOperators( + physicalPlan, PONative.class); + List<PhysicalOperator> ops; + if (!pigContext.inIllustrator) { + ops = new ArrayList<PhysicalOperator>(stores.size() + + nativeSparks.size()); + ops.addAll(stores); + } else { + ops = new ArrayList<PhysicalOperator>(leaves.size() + + nativeSparks.size()); + ops.addAll(leaves); + } + ops.addAll(nativeSparks); + Collections.sort(ops); - for (PhysicalOperator op : ops) { + for (PhysicalOperator op : ops) { if (LOG.isDebugEnabled()) LOG.debug("Starting compile of leaf-level operator " + op); compile(op); - } - } + } + } - /** - * Compiles the physicalPlan below op into a Spark Operator and stores it in - * curSparkOp. - * - * @param op - * @throws IOException - * @throws PlanException - * @throws VisitorException - */ - private void compile(PhysicalOperator op) throws IOException, - PlanException, VisitorException { - SparkOperator[] prevCompInp = compiledInputs; + /** + * Compiles the physicalPlan below op into a Spark Operator and stores it in + * curSparkOp. + * + * @param op + * @throws IOException + * @throws PlanException + * @throws VisitorException + */ + private void compile(PhysicalOperator op) throws IOException, + PlanException, VisitorException { + SparkOperator[] prevCompInp = compiledInputs; if (LOG.isDebugEnabled()) LOG.debug("Compiling physical operator " + op + ". Current spark operator is " + curSparkOp); - List<PhysicalOperator> predecessors = physicalPlan.getPredecessors(op); - if (op instanceof PONative) { - // the predecessor (store) has already been processed - // don't process it again - } else if (predecessors != null && predecessors.size() > 0) { - // When processing an entire script (multiquery), we can - // get into a situation where a load has - // predecessors. This means that it depends on some store - // earlier in the physicalPlan. We need to take that dependency - // and connect the respective Spark operators, while at the - // same time removing the connection between the Physical - // operators. That way the jobs will run in the right - // order. - if (op instanceof POLoad) { - - if (predecessors.size() != 1) { - int errCode = 2125; - String msg = "Expected at most one predecessor of load. Got " - + predecessors.size(); - throw new PlanException(msg, errCode, PigException.BUG); - } - - PhysicalOperator p = predecessors.get(0); - SparkOperator oper = null; - if (p instanceof POStore || p instanceof PONative) { - oper = phyToSparkOpMap.get(p); - } else { - int errCode = 2126; - String msg = "Predecessor of load should be a store or spark operator. Got " - + p.getClass(); - throw new PlanException(msg, errCode, PigException.BUG); - } - - // Need new operator - curSparkOp = getSparkOp(); - curSparkOp.add(op); - sparkPlan.add(curSparkOp); - physicalPlan.disconnect(op, p); - sparkPlan.connect(oper, curSparkOp); - phyToSparkOpMap.put(op, curSparkOp); - return; - } - - Collections.sort(predecessors); - compiledInputs = new SparkOperator[predecessors.size()]; - int i = -1; - for (PhysicalOperator pred : predecessors) { - if (pred instanceof POSplit - && splitsSeen.containsKey(pred.getOperatorKey())) { - compiledInputs[++i] = startNew( - ((POSplit) pred).getSplitStore(), - splitsSeen.get(pred.getOperatorKey())); - continue; - } - compile(pred); - compiledInputs[++i] = curSparkOp; - } - } else { - // No predecessors. Mostly a load. But this is where - // we start. We create a new sparkOp and add its first - // operator op. Also this should be added to the sparkPlan. - curSparkOp = getSparkOp(); - curSparkOp.add(op); - if (op != null && op instanceof POLoad) { - if (((POLoad) op).getLFile() != null - && ((POLoad) op).getLFile().getFuncSpec() != null) - curSparkOp.UDFs.add(((POLoad) op).getLFile().getFuncSpec() - .toString()); - } - sparkPlan.add(curSparkOp); - phyToSparkOpMap.put(op, curSparkOp); - return; - } - op.visit(this); - compiledInputs = prevCompInp; - } + List<PhysicalOperator> predecessors = physicalPlan.getPredecessors(op); + if (op instanceof PONative) { + // the predecessor (store) has already been processed + // don't process it again + } else if (predecessors != null && predecessors.size() > 0) { + // When processing an entire script (multiquery), we can + // get into a situation where a load has + // predecessors. This means that it depends on some store + // earlier in the physicalPlan. We need to take that dependency + // and connect the respective Spark operators, while at the + // same time removing the connection between the Physical + // operators. That way the jobs will run in the right + // order. + if (op instanceof POLoad) { + + if (predecessors.size() != 1) { + int errCode = 2125; + String msg = "Expected at most one predecessor of load. Got " + + predecessors.size(); + throw new PlanException(msg, errCode, PigException.BUG); + } + + PhysicalOperator p = predecessors.get(0); + SparkOperator oper = null; + if (p instanceof POStore || p instanceof PONative) { + oper = phyToSparkOpMap.get(p); + } else { + int errCode = 2126; + String msg = "Predecessor of load should be a store or spark operator. Got " + + p.getClass(); + throw new PlanException(msg, errCode, PigException.BUG); + } + + // Need new operator + curSparkOp = getSparkOp(); + curSparkOp.add(op); + sparkPlan.add(curSparkOp); + physicalPlan.disconnect(op, p); + sparkPlan.connect(oper, curSparkOp); + phyToSparkOpMap.put(op, curSparkOp); + return; + } + + Collections.sort(predecessors); + compiledInputs = new SparkOperator[predecessors.size()]; + int i = -1; + for (PhysicalOperator pred : predecessors) { + if (pred instanceof POSplit + && splitsSeen.containsKey(pred.getOperatorKey())) { + compiledInputs[++i] = startNew( + ((POSplit) pred).getSplitStore(), + splitsSeen.get(pred.getOperatorKey())); + continue; + } + compile(pred); + compiledInputs[++i] = curSparkOp; + } + } else { + // No predecessors. Mostly a load. But this is where + // we start. We create a new sparkOp and add its first + // operator op. Also this should be added to the sparkPlan. + curSparkOp = getSparkOp(); + curSparkOp.add(op); + if (op != null && op instanceof POLoad) { + if (((POLoad) op).getLFile() != null + && ((POLoad) op).getLFile().getFuncSpec() != null) + curSparkOp.UDFs.add(((POLoad) op).getLFile().getFuncSpec() + .toString()); + } + sparkPlan.add(curSparkOp); + phyToSparkOpMap.put(op, curSparkOp); + return; + } + op.visit(this); + compiledInputs = prevCompInp; + } - private SparkOperator getSparkOp() { - SparkOperator op = new SparkOperator(OperatorKey.genOpKey(scope)); + private SparkOperator getSparkOp() { + SparkOperator op = new SparkOperator(OperatorKey.genOpKey(scope)); if (LOG.isDebugEnabled()) LOG.debug("Created new Spark operator " + op); return op; - } + } + + public SparkOperPlan getSparkPlan() { + return sparkPlan; + } + + public void connectSoftLink() throws PlanException, IOException { + for (PhysicalOperator op : physicalPlan) { + if (physicalPlan.getSoftLinkPredecessors(op) != null) { + for (PhysicalOperator pred : physicalPlan + .getSoftLinkPredecessors(op)) { + SparkOperator from = phyToSparkOpMap.get(pred); + SparkOperator to = phyToSparkOpMap.get(op); + if (from == to) + continue; + if (sparkPlan.getPredecessors(to) == null + || !sparkPlan.getPredecessors(to).contains(from)) { + sparkPlan.connect(from, to); + } + } + } + } + } - public SparkOperPlan getSparkPlan() { - return sparkPlan; - } - - public void connectSoftLink() throws PlanException, IOException { - for (PhysicalOperator op : physicalPlan) { - if (physicalPlan.getSoftLinkPredecessors(op) != null) { - for (PhysicalOperator pred : physicalPlan - .getSoftLinkPredecessors(op)) { - SparkOperator from = phyToSparkOpMap.get(pred); - SparkOperator to = phyToSparkOpMap.get(op); - if (from == to) - continue; - if (sparkPlan.getPredecessors(to) == null - || !sparkPlan.getPredecessors(to).contains(from)) { - sparkPlan.connect(from, to); - } - } - } - } - } - - private SparkOperator startNew(FileSpec fSpec, SparkOperator old) - throws PlanException { - POLoad ld = getLoad(); - ld.setLFile(fSpec); - SparkOperator ret = getSparkOp(); - ret.add(ld); - sparkPlan.add(ret); - sparkPlan.connect(old, ret); - return ret; - } - - private POLoad getLoad() { - POLoad ld = new POLoad(new OperatorKey(scope, nig.getNextNodeId(scope))); - ld.setPc(pigContext); - ld.setIsTmpLoad(true); - return ld; - } - - @Override - public void visitSplit(POSplit op) throws VisitorException { - try { - FileSpec fSpec = op.getSplitStore(); - SparkOperator sparkOp = endSingleInputPlanWithStr(fSpec); - sparkOp.setSplitter(true); - splitsSeen.put(op.getOperatorKey(), sparkOp); - curSparkOp = startNew(fSpec, sparkOp); - phyToSparkOpMap.put(op, curSparkOp); - } catch (Exception e) { - int errCode = 2034; - String msg = "Error compiling operator " - + op.getClass().getSimpleName(); - throw new SparkCompilerException(msg, errCode, PigException.BUG, e); - } - } - - public void visitDistinct(PODistinct op) throws VisitorException { - try { - addToPlan(op); + private SparkOperator startNew(FileSpec fSpec, SparkOperator old) + throws PlanException { + POLoad ld = getLoad(); + ld.setLFile(fSpec); + SparkOperator ret = getSparkOp(); + ret.add(ld); + sparkPlan.add(ret); + sparkPlan.connect(old, ret); + return ret; + } + + private POLoad getLoad() { + POLoad ld = new POLoad(new OperatorKey(scope, nig.getNextNodeId(scope))); + ld.setPc(pigContext); + ld.setIsTmpLoad(true); + return ld; + } + + @Override + public void visitSplit(POSplit op) throws VisitorException { + try { + FileSpec fSpec = op.getSplitStore(); + SparkOperator sparkOp = endSingleInputPlanWithStr(fSpec); + sparkOp.setSplitter(true); + splitsSeen.put(op.getOperatorKey(), sparkOp); + curSparkOp = startNew(fSpec, sparkOp); + phyToSparkOpMap.put(op, curSparkOp); + } catch (Exception e) { + int errCode = 2034; + String msg = "Error compiling operator " + + op.getClass().getSimpleName(); + throw new SparkCompilerException(msg, errCode, PigException.BUG, e); + } + } + + public void visitDistinct(PODistinct op) throws VisitorException { + try { + addToPlan(op); phyToSparkOpMap.put(op, curSparkOp); - } catch (Exception e) { - int errCode = 2034; - String msg = "Error compiling operator " - + op.getClass().getSimpleName(); - throw new SparkCompilerException(msg, errCode, PigException.BUG, e); - } - } - - private SparkOperator endSingleInputPlanWithStr(FileSpec fSpec) - throws PlanException { - if (compiledInputs.length > 1) { - int errCode = 2023; - String msg = "Received a multi input physicalPlan when expecting only a single input one."; - throw new PlanException(msg, errCode, PigException.BUG); - } - SparkOperator sparkOp = compiledInputs[0]; // Load - POStore str = getStore(); - str.setSFile(fSpec); - sparkOp.physicalPlan.addAsLeaf(str); - return sparkOp; - } - - private POStore getStore() { - POStore st = new POStore(new OperatorKey(scope, - nig.getNextNodeId(scope))); - // mark store as tmp store. These could be removed by the - // optimizer, because it wasn't the user requesting it. - st.setIsTmpStore(true); - return st; - } - - @Override - public void visitLoad(POLoad op) throws VisitorException { - try { - addToPlan(op); - phyToSparkOpMap.put(op, curSparkOp); - } catch (Exception e) { - int errCode = 2034; - String msg = "Error compiling operator " - + op.getClass().getSimpleName(); - throw new SparkCompilerException(msg, errCode, PigException.BUG, e); - } - } - - @Override - public void visitNative(PONative op) throws VisitorException { - try { - SparkOperator nativesparkOpper = getNativeSparkOp( - op.getNativeMRjar(), op.getParams()); + } catch (Exception e) { + int errCode = 2034; + String msg = "Error compiling operator " + + op.getClass().getSimpleName(); + throw new SparkCompilerException(msg, errCode, PigException.BUG, e); + } + } + + private SparkOperator endSingleInputPlanWithStr(FileSpec fSpec) + throws PlanException { + if (compiledInputs.length > 1) { + int errCode = 2023; + String msg = "Received a multi input physicalPlan when expecting only a single input one."; + throw new PlanException(msg, errCode, PigException.BUG); + } + SparkOperator sparkOp = compiledInputs[0]; // Load + POStore str = getStore(); + str.setSFile(fSpec); + sparkOp.physicalPlan.addAsLeaf(str); + return sparkOp; + } + + private POStore getStore() { + POStore st = new POStore(new OperatorKey(scope, + nig.getNextNodeId(scope))); + // mark store as tmp store. These could be removed by the + // optimizer, because it wasn't the user requesting it. + st.setIsTmpStore(true); + return st; + } + + @Override + public void visitLoad(POLoad op) throws VisitorException { + try { + addToPlan(op); + phyToSparkOpMap.put(op, curSparkOp); + } catch (Exception e) { + int errCode = 2034; + String msg = "Error compiling operator " + + op.getClass().getSimpleName(); + throw new SparkCompilerException(msg, errCode, PigException.BUG, e); + } + } + + @Override + public void visitNative(PONative op) throws VisitorException { + try { + SparkOperator nativesparkOpper = getNativeSparkOp( + op.getNativeMRjar(), op.getParams()); nativesparkOpper.markNative(); - sparkPlan.add(nativesparkOpper); - sparkPlan.connect(curSparkOp, nativesparkOpper); - phyToSparkOpMap.put(op, nativesparkOpper); - curSparkOp = nativesparkOpper; - } catch (Exception e) { - int errCode = 2034; - String msg = "Error compiling operator " - + op.getClass().getSimpleName(); - throw new SparkCompilerException(msg, errCode, PigException.BUG, e); - } - } - - private NativeSparkOperator getNativeSparkOp(String sparkJar, - String[] parameters) { - return new NativeSparkOperator(new OperatorKey(scope, - nig.getNextNodeId(scope)), sparkJar, parameters); - } - - @Override - public void visitStore(POStore op) throws VisitorException { - try { - addToPlan(op); - phyToSparkOpMap.put(op, curSparkOp); - if (op.getSFile() != null && op.getSFile().getFuncSpec() != null) - curSparkOp.UDFs.add(op.getSFile().getFuncSpec().toString()); - } catch (Exception e) { - int errCode = 2034; - String msg = "Error compiling operator " - + op.getClass().getSimpleName(); - throw new SparkCompilerException(msg, errCode, PigException.BUG, e); - } - } - - @Override - public void visitFilter(POFilter op) throws VisitorException { - try { - addToPlan(op); - processUDFs(op.getPlan()); - phyToSparkOpMap.put(op, curSparkOp); - } catch (Exception e) { - int errCode = 2034; - String msg = "Error compiling operator " - + op.getClass().getSimpleName(); - throw new SparkCompilerException(msg, errCode, PigException.BUG, e); - } - } - - @Override - public void visitCross(POCross op) throws VisitorException { - try { - addToPlan(op); - phyToSparkOpMap.put(op, curSparkOp); - } catch (Exception e) { - int errCode = 2034; - String msg = "Error compiling operator " - + op.getClass().getSimpleName(); - throw new SparkCompilerException(msg, errCode, PigException.BUG, e); - } - } - - @Override - public void visitStream(POStream op) throws VisitorException { - try { - addToPlan(op); - phyToSparkOpMap.put(op, curSparkOp); - } catch (Exception e) { - int errCode = 2034; - String msg = "Error compiling operator " - + op.getClass().getSimpleName(); - throw new SparkCompilerException(msg, errCode, PigException.BUG, e); - } - } - - @Override - public void visitSort(POSort op) throws VisitorException { - try { + sparkPlan.add(nativesparkOpper); + sparkPlan.connect(curSparkOp, nativesparkOpper); + phyToSparkOpMap.put(op, nativesparkOpper); + curSparkOp = nativesparkOpper; + } catch (Exception e) { + int errCode = 2034; + String msg = "Error compiling operator " + + op.getClass().getSimpleName(); + throw new SparkCompilerException(msg, errCode, PigException.BUG, e); + } + } + + private NativeSparkOperator getNativeSparkOp(String sparkJar, + String[] parameters) { + return new NativeSparkOperator(new OperatorKey(scope, + nig.getNextNodeId(scope)), sparkJar, parameters); + } + + @Override + public void visitStore(POStore op) throws VisitorException { + try { + addToPlan(op); + phyToSparkOpMap.put(op, curSparkOp); + if (op.getSFile() != null && op.getSFile().getFuncSpec() != null) + curSparkOp.UDFs.add(op.getSFile().getFuncSpec().toString()); + } catch (Exception e) { + int errCode = 2034; + String msg = "Error compiling operator " + + op.getClass().getSimpleName(); + throw new SparkCompilerException(msg, errCode, PigException.BUG, e); + } + } + + @Override + public void visitFilter(POFilter op) throws VisitorException { + try { + addToPlan(op); + processUDFs(op.getPlan()); + phyToSparkOpMap.put(op, curSparkOp); + } catch (Exception e) { + int errCode = 2034; + String msg = "Error compiling operator " + + op.getClass().getSimpleName(); + throw new SparkCompilerException(msg, errCode, PigException.BUG, e); + } + } + + @Override + public void visitCross(POCross op) throws VisitorException { + try { + addToPlan(op); + phyToSparkOpMap.put(op, curSparkOp); + } catch (Exception e) { + int errCode = 2034; + String msg = "Error compiling operator " + + op.getClass().getSimpleName(); + throw new SparkCompilerException(msg, errCode, PigException.BUG, e); + } + } + + @Override + public void visitStream(POStream op) throws VisitorException { + try { + addToPlan(op); + phyToSparkOpMap.put(op, curSparkOp); + } catch (Exception e) { + int errCode = 2034; + String msg = "Error compiling operator " + + op.getClass().getSimpleName(); + throw new SparkCompilerException(msg, errCode, PigException.BUG, e); + } + } + + @Override + public void visitSort(POSort op) throws VisitorException { + try { addToPlan(op); POSort sort = op; long limit = sort.getLimit(); @@ -496,143 +496,143 @@ public class SparkCompiler extends PhyPl curSparkOp.markLimitAfterSort(); } phyToSparkOpMap.put(op, curSparkOp); - } catch (Exception e) { - int errCode = 2034; - String msg = "Error compiling operator " - + op.getClass().getSimpleName(); - throw new SparkCompilerException(msg, errCode, PigException.BUG, e); - } - } - - @Override - public void visitLimit(POLimit op) throws VisitorException { - try { - addToPlan(op); + } catch (Exception e) { + int errCode = 2034; + String msg = "Error compiling operator " + + op.getClass().getSimpleName(); + throw new SparkCompilerException(msg, errCode, PigException.BUG, e); + } + } + + @Override + public void visitLimit(POLimit op) throws VisitorException { + try { + addToPlan(op); curSparkOp.markLimit(); phyToSparkOpMap.put(op, curSparkOp); } catch (Exception e) { - int errCode = 2034; - String msg = "Error compiling operator " - + op.getClass().getSimpleName(); - throw new SparkCompilerException(msg, errCode, PigException.BUG, e); - } - } - - @Override - public void visitLocalRearrange(POLocalRearrange op) - throws VisitorException { - try { - addToPlan(op); - List<PhysicalPlan> plans = op.getPlans(); - if (plans != null) - for (PhysicalPlan ep : plans) - processUDFs(ep); - phyToSparkOpMap.put(op, curSparkOp); - } catch (Exception e) { - int errCode = 2034; - String msg = "Error compiling operator " - + op.getClass().getSimpleName(); - throw new SparkCompilerException(msg, errCode, PigException.BUG, e); - } - } - - @Override - public void visitCollectedGroup(POCollectedGroup op) - throws VisitorException { - List<PhysicalOperator> roots = curSparkOp.physicalPlan.getRoots(); - if (roots.size() != 1) { - int errCode = 2171; - String errMsg = "Expected one but found more then one root physical operator in physical physicalPlan."; - throw new SparkCompilerException(errMsg, errCode, PigException.BUG); - } - - PhysicalOperator phyOp = roots.get(0); - if (!(phyOp instanceof POLoad)) { - int errCode = 2172; - String errMsg = "Expected physical operator at root to be POLoad. Found : " - + phyOp.getClass().getCanonicalName(); - throw new SparkCompilerException(errMsg, errCode, PigException.BUG); - } - - LoadFunc loadFunc = ((POLoad) phyOp).getLoadFunc(); - try { - if (!(CollectableLoadFunc.class.isAssignableFrom(loadFunc - .getClass()))) { - int errCode = 2249; - throw new SparkCompilerException( - "While using 'collected' on group; data must be loaded via loader implementing CollectableLoadFunc.", - errCode); - } - ((CollectableLoadFunc) loadFunc).ensureAllKeyInstancesInSameSplit(); - } catch (SparkCompilerException e) { - throw (e); - } catch (IOException e) { - int errCode = 2034; - String msg = "Error compiling operator " - + op.getClass().getSimpleName(); - throw new SparkCompilerException(msg, errCode, PigException.BUG, e); - } - - try { - addToPlan(op); - phyToSparkOpMap.put(op, curSparkOp); - } catch (Exception e) { - int errCode = 2034; - String msg = "Error compiling operator " - + op.getClass().getSimpleName(); - throw new SparkCompilerException(msg, errCode, PigException.BUG, e); - } - } - - @Override - public void visitPOForEach(POForEach op) throws VisitorException { - try { - addToPlan(op); - List<PhysicalPlan> plans = op.getInputPlans(); - if (plans != null) { - for (PhysicalPlan ep : plans) { - processUDFs(ep); - } - } - phyToSparkOpMap.put(op, curSparkOp); - } catch (Exception e) { - int errCode = 2034; - String msg = "Error compiling operator " - + op.getClass().getSimpleName(); - throw new SparkCompilerException(msg, errCode, PigException.BUG, e); - } - } - - @Override - public void visitCounter(POCounter op) throws VisitorException { - try { - addToPlan(op); - phyToSparkOpMap.put(op, curSparkOp); - } catch (Exception e) { - int errCode = 2034; - String msg = "Error compiling operator " - + op.getClass().getSimpleName(); - throw new SparkCompilerException(msg, errCode, PigException.BUG, e); - } - } - - @Override - public void visitRank(PORank op) throws VisitorException { - try { - addToPlan(op); - phyToSparkOpMap.put(op, curSparkOp); - } catch (Exception e) { - int errCode = 2034; - String msg = "Error compiling operator " - + op.getClass().getSimpleName(); - throw new SparkCompilerException(msg, errCode, PigException.BUG, e); - } - } - - @Override - public void visitGlobalRearrange(POGlobalRearrange op) - throws VisitorException { - try { + int errCode = 2034; + String msg = "Error compiling operator " + + op.getClass().getSimpleName(); + throw new SparkCompilerException(msg, errCode, PigException.BUG, e); + } + } + + @Override + public void visitLocalRearrange(POLocalRearrange op) + throws VisitorException { + try { + addToPlan(op); + List<PhysicalPlan> plans = op.getPlans(); + if (plans != null) + for (PhysicalPlan ep : plans) + processUDFs(ep); + phyToSparkOpMap.put(op, curSparkOp); + } catch (Exception e) { + int errCode = 2034; + String msg = "Error compiling operator " + + op.getClass().getSimpleName(); + throw new SparkCompilerException(msg, errCode, PigException.BUG, e); + } + } + + @Override + public void visitCollectedGroup(POCollectedGroup op) + throws VisitorException { + List<PhysicalOperator> roots = curSparkOp.physicalPlan.getRoots(); + if (roots.size() != 1) { + int errCode = 2171; + String errMsg = "Expected one but found more then one root physical operator in physical physicalPlan."; + throw new SparkCompilerException(errMsg, errCode, PigException.BUG); + } + + PhysicalOperator phyOp = roots.get(0); + if (!(phyOp instanceof POLoad)) { + int errCode = 2172; + String errMsg = "Expected physical operator at root to be POLoad. Found : " + + phyOp.getClass().getCanonicalName(); + throw new SparkCompilerException(errMsg, errCode, PigException.BUG); + } + + LoadFunc loadFunc = ((POLoad) phyOp).getLoadFunc(); + try { + if (!(CollectableLoadFunc.class.isAssignableFrom(loadFunc + .getClass()))) { + int errCode = 2249; + throw new SparkCompilerException( + "While using 'collected' on group; data must be loaded via loader implementing CollectableLoadFunc.", + errCode); + } + ((CollectableLoadFunc) loadFunc).ensureAllKeyInstancesInSameSplit(); + } catch (SparkCompilerException e) { + throw (e); + } catch (IOException e) { + int errCode = 2034; + String msg = "Error compiling operator " + + op.getClass().getSimpleName(); + throw new SparkCompilerException(msg, errCode, PigException.BUG, e); + } + + try { + addToPlan(op); + phyToSparkOpMap.put(op, curSparkOp); + } catch (Exception e) { + int errCode = 2034; + String msg = "Error compiling operator " + + op.getClass().getSimpleName(); + throw new SparkCompilerException(msg, errCode, PigException.BUG, e); + } + } + + @Override + public void visitPOForEach(POForEach op) throws VisitorException { + try { + addToPlan(op); + List<PhysicalPlan> plans = op.getInputPlans(); + if (plans != null) { + for (PhysicalPlan ep : plans) { + processUDFs(ep); + } + } + phyToSparkOpMap.put(op, curSparkOp); + } catch (Exception e) { + int errCode = 2034; + String msg = "Error compiling operator " + + op.getClass().getSimpleName(); + throw new SparkCompilerException(msg, errCode, PigException.BUG, e); + } + } + + @Override + public void visitCounter(POCounter op) throws VisitorException { + try { + addToPlan(op); + phyToSparkOpMap.put(op, curSparkOp); + } catch (Exception e) { + int errCode = 2034; + String msg = "Error compiling operator " + + op.getClass().getSimpleName(); + throw new SparkCompilerException(msg, errCode, PigException.BUG, e); + } + } + + @Override + public void visitRank(PORank op) throws VisitorException { + try { + addToPlan(op); + phyToSparkOpMap.put(op, curSparkOp); + } catch (Exception e) { + int errCode = 2034; + String msg = "Error compiling operator " + + op.getClass().getSimpleName(); + throw new SparkCompilerException(msg, errCode, PigException.BUG, e); + } + } + + @Override + public void visitGlobalRearrange(POGlobalRearrange op) + throws VisitorException { + try { POGlobalRearrangeSpark glbOp = new POGlobalRearrangeSpark(op); addToPlan(glbOp); if (op.isCross()) { @@ -641,50 +641,50 @@ public class SparkCompiler extends PhyPl curSparkOp.customPartitioner = op.getCustomPartitioner(); phyToSparkOpMap.put(op, curSparkOp); - } catch (Exception e) { - int errCode = 2034; - String msg = "Error compiling operator " - + op.getClass().getSimpleName(); - throw new SparkCompilerException(msg, errCode, PigException.BUG, e); - } - } - - @Override - public void visitPackage(POPackage op) throws VisitorException { - try { - addToPlan(op); - phyToSparkOpMap.put(op, curSparkOp); - if (op.getPkgr().getPackageType() == Packager.PackageType.JOIN) { - curSparkOp.markRegularJoin(); - } else if (op.getPkgr().getPackageType() == Packager.PackageType.GROUP) { - if (op.getNumInps() == 1) { - curSparkOp.markGroupBy(); - } else if (op.getNumInps() > 1) { - curSparkOp.markCogroup(); - } - } - - } catch (Exception e) { - int errCode = 2034; - String msg = "Error compiling operator " - + op.getClass().getSimpleName(); - throw new SparkCompilerException(msg, errCode, PigException.BUG, e); - } - } - - @Override - public void visitUnion(POUnion op) throws VisitorException { - try { - addToPlan(op); - phyToSparkOpMap.put(op, curSparkOp); + } catch (Exception e) { + int errCode = 2034; + String msg = "Error compiling operator " + + op.getClass().getSimpleName(); + throw new SparkCompilerException(msg, errCode, PigException.BUG, e); + } + } + + @Override + public void visitPackage(POPackage op) throws VisitorException { + try { + addToPlan(op); + phyToSparkOpMap.put(op, curSparkOp); + if (op.getPkgr().getPackageType() == Packager.PackageType.JOIN) { + curSparkOp.markRegularJoin(); + } else if (op.getPkgr().getPackageType() == Packager.PackageType.GROUP) { + if (op.getNumInps() == 1) { + curSparkOp.markGroupBy(); + } else if (op.getNumInps() > 1) { + curSparkOp.markCogroup(); + } + } + + } catch (Exception e) { + int errCode = 2034; + String msg = "Error compiling operator " + + op.getClass().getSimpleName(); + throw new SparkCompilerException(msg, errCode, PigException.BUG, e); + } + } + + @Override + public void visitUnion(POUnion op) throws VisitorException { + try { + addToPlan(op); + phyToSparkOpMap.put(op, curSparkOp); curSparkOp.markUnion(); - } catch (Exception e) { - int errCode = 2034; - String msg = "Error compiling operator " - + op.getClass().getSimpleName(); - throw new SparkCompilerException(msg, errCode, PigException.BUG, e); - } - } + } catch (Exception e) { + int errCode = 2034; + String msg = "Error compiling operator " + + op.getClass().getSimpleName(); + throw new SparkCompilerException(msg, errCode, PigException.BUG, e); + } + } /** * currently use regular join to replace skewedJoin @@ -725,292 +725,292 @@ public class SparkCompiler extends PhyPl @Override public void visitFRJoin(POFRJoin op) throws VisitorException { - try { - curSparkOp = phyToSparkOpMap.get(op.getInputs().get(op.getFragment())); - for (int i = 0; i < compiledInputs.length; i++) { - SparkOperator sparkOperator = compiledInputs[i]; - if (curSparkOp.equals(sparkOperator)) { - continue; - } - - OperatorKey broadcastKey = new OperatorKey(scope, nig.getNextNodeId(scope)); - POBroadcastSpark poBroadcastSpark = new POBroadcastSpark(broadcastKey); - poBroadcastSpark.setBroadcastedVariableName(broadcastKey.toString()); - - sparkOperator.physicalPlan.addAsLeaf(poBroadcastSpark); - } - - POFRJoinSpark poFRJoinSpark = new POFRJoinSpark(op); - addToPlan(poFRJoinSpark); - phyToSparkOpMap.put(op, curSparkOp); - } catch (Exception e) { - int errCode = 2034; - String msg = "Error compiling operator " + op.getClass().getSimpleName(); - throw new SparkCompilerException(msg, errCode, PigException.BUG, e); - } - } - - @Override - public void visitMergeJoin(POMergeJoin joinOp) throws VisitorException { - try { - if (compiledInputs.length != 2 || joinOp.getInputs().size() != 2){ - int errCode=1101; - throw new SparkCompilerException("Merge Join must have exactly two inputs. Found : "+compiledInputs.length, errCode); - } - - curSparkOp = phyToSparkOpMap.get(joinOp.getInputs().get(0)); - SparkOperator rightSparkOp; - if(curSparkOp.equals(compiledInputs[0])) { - rightSparkOp = compiledInputs[1]; - } else { - rightSparkOp = compiledInputs[0]; - } - - PhysicalPlan rightPipelinePlan; - PhysicalPlan rightPhyPlan = rightSparkOp.physicalPlan; - if (rightPhyPlan.getRoots().size() != 1) { - int errCode = 2171; - String errMsg = "Expected one but found more then one root physical operator in physical plan."; - throw new SparkCompilerException(errMsg,errCode); - } - PhysicalOperator rightPhyLoader = rightPhyPlan.getRoots().get(0); - if (!(rightPhyLoader instanceof POLoad)) { - int errCode = 2172; - String errMsg = "Expected physical operator at root to be POLoad. Found : "+rightPhyLoader.getClass().getCanonicalName(); - throw new SparkCompilerException(errMsg,errCode); - } - if (rightPhyPlan.getSuccessors(rightPhyLoader) == null || rightPhyPlan.getSuccessors(rightPhyLoader).isEmpty()) { - // Load - Join case. - rightPipelinePlan = null; - } else{ // We got something on right side. Yank it and set it as inner plan of right input. - rightPipelinePlan = rightPhyPlan.clone(); - PhysicalOperator root = rightPipelinePlan.getRoots().get(0); - rightPipelinePlan.disconnect(root, rightPipelinePlan.getSuccessors(root).get(0)); - rightPipelinePlan.remove(root); - rightPhyPlan.trimBelow(rightPhyLoader); - } - - joinOp.setupRightPipeline(rightPipelinePlan); - rightSparkOp.setRequestedParallelism(1); // for indexing job - - POLoad rightLoader = (POLoad)rightSparkOp.physicalPlan.getRoots().get(0); - joinOp.setSignature(rightLoader.getSignature()); - LoadFunc rightLoadFunc = rightLoader.getLoadFunc(); - - if(IndexableLoadFunc.class.isAssignableFrom(rightLoadFunc.getClass())) { - joinOp.setRightLoaderFuncSpec(rightLoader.getLFile().getFuncSpec()); - joinOp.setRightInputFileName(rightLoader.getLFile().getFileName()); - curSparkOp.UDFs.add(rightLoader.getLFile().getFuncSpec().toString()); - - // we don't need the right rightSparkOp since - // the right loader is an IndexableLoadFunc which can handle the index itself - sparkPlan.remove(rightSparkOp); - if(rightSparkOp == compiledInputs[0]) { - compiledInputs[0] = null; - } else if(rightSparkOp == compiledInputs[1]) { - compiledInputs[1] = null; - } - - // validate that the join keys in merge join are only - // simple column projections or '*' and not expression - expressions - // cannot be handled when the index is built by the storage layer on the sorted - // data when the sorted data (and corresponding index) is written. - // So merge join will be restricted not have expressions as join keys - int numInputs = mPlan.getPredecessors(joinOp).size(); // should be 2 - for(int i = 0; i < numInputs; i++) { - List<PhysicalPlan> keyPlans = joinOp.getInnerPlansOf(i); - for (PhysicalPlan keyPlan : keyPlans) { - for(PhysicalOperator op : keyPlan) { - if(!(op instanceof POProject)) { - int errCode = 1106; - String errMsg = "Merge join is possible only for simple column or '*' join keys when using " + - rightLoader.getLFile().getFuncSpec() + " as the loader"; - throw new SparkCompilerException(errMsg, errCode, PigException.INPUT); - } - } - } - } - - } else { - //Replacing POLoad with indexer is disabled for 'merge-sparse' joins. While - //this feature would be useful, the current implementation of DefaultIndexableLoader - //is not designed to handle multiple calls to seekNear. Specifically, it rereads the entire index - //for each call. Some refactoring of this class is required - and then the check below could be removed. - if (joinOp.getJoinType() == LOJoin.JOINTYPE.MERGESPARSE) { - int errCode = 1104; - String errMsg = "Right input of merge-join must implement IndexableLoadFunc. " + - "The specified loader " + rightLoadFunc + " doesn't implement it"; - throw new SparkCompilerException(errMsg,errCode); - } - - // Replace POLoad with indexer. - if (! (OrderedLoadFunc.class.isAssignableFrom(rightLoadFunc.getClass()))){ - int errCode = 1104; - String errMsg = "Right input of merge-join must implement " + - "OrderedLoadFunc interface. The specified loader " - + rightLoadFunc + " doesn't implement it"; - throw new SparkCompilerException(errMsg,errCode); - } - - String[] indexerArgs = new String[6]; - List<PhysicalPlan> rightInpPlans = joinOp.getInnerPlansOf(1); - FileSpec origRightLoaderFileSpec = rightLoader.getLFile(); - - indexerArgs[0] = origRightLoaderFileSpec.getFuncSpec().toString(); - indexerArgs[1] = ObjectSerializer.serialize((Serializable)rightInpPlans); - indexerArgs[2] = ObjectSerializer.serialize(rightPipelinePlan); - indexerArgs[3] = rightLoader.getSignature(); - indexerArgs[4] = rightLoader.getOperatorKey().scope; - indexerArgs[5] = Boolean.toString(true); - - FileSpec lFile = new FileSpec(rightLoader.getLFile().getFileName(),new FuncSpec(MergeJoinIndexer.class.getName(), indexerArgs)); - rightLoader.setLFile(lFile); - - // (keyFirst1, keyFirst2, .. , position, splitIndex) See MergeJoinIndexer - rightSparkOp.useTypedComparator(true); - POStore idxStore = getStore(); - FileSpec idxStrFile = getTempFileSpec(); - idxStore.setSFile(idxStrFile); - rightSparkOp.physicalPlan.addAsLeaf(idxStore); - rightSparkOp.markIndexer(); - - curSparkOp.UDFs.add(origRightLoaderFileSpec.getFuncSpec().toString()); - - // We want to ensure indexing job runs prior to actual join job. - // So, connect them in order. - sparkPlan.connect(rightSparkOp, curSparkOp); - - // set up the DefaultIndexableLoader for the join operator - String[] defaultIndexableLoaderArgs = new String[5]; - defaultIndexableLoaderArgs[0] = origRightLoaderFileSpec.getFuncSpec().toString(); - defaultIndexableLoaderArgs[1] = idxStrFile.getFileName(); - defaultIndexableLoaderArgs[2] = idxStrFile.getFuncSpec().toString(); - defaultIndexableLoaderArgs[3] = joinOp.getOperatorKey().scope; - defaultIndexableLoaderArgs[4] = origRightLoaderFileSpec.getFileName(); - joinOp.setRightLoaderFuncSpec((new FuncSpec(DefaultIndexableLoader.class.getName(), defaultIndexableLoaderArgs))); - joinOp.setRightInputFileName(origRightLoaderFileSpec.getFileName()); - - joinOp.setIndexFile(idxStrFile.getFileName()); - } - - curSparkOp.physicalPlan.addAsLeaf(joinOp); - phyToSparkOpMap.put(joinOp, curSparkOp); - - } catch (Exception e) { - int errCode = 2034; - String msg = "Error compiling operator " - + joinOp.getClass().getSimpleName(); - throw new SparkCompilerException(msg, errCode, PigException.BUG, e); - } - } - - private void processUDFs(PhysicalPlan plan) throws VisitorException { - if (plan != null) { - // Process Scalars (UDF with referencedOperators) - ScalarPhyFinder scalarPhyFinder = new ScalarPhyFinder(plan); - scalarPhyFinder.visit(); - curSparkOp.scalars.addAll(scalarPhyFinder.getScalars()); - - // Process UDFs - udfFinder.setPlan(plan); - udfFinder.visit(); - curSparkOp.UDFs.addAll(udfFinder.getUDFs()); - } - } - - private void addToPlan(PhysicalOperator op) throws PlanException, - IOException { - SparkOperator sparkOp = null; - if (compiledInputs.length == 1) { - sparkOp = compiledInputs[0]; - } else { - sparkOp = merge(compiledInputs); - } - sparkOp.physicalPlan.addAsLeaf(op); - curSparkOp = sparkOp; - } - - private SparkOperator merge(SparkOperator[] compiledInputs) - throws PlanException { - SparkOperator ret = getSparkOp(); - sparkPlan.add(ret); + try { + curSparkOp = phyToSparkOpMap.get(op.getInputs().get(op.getFragment())); + for (int i = 0; i < compiledInputs.length; i++) { + SparkOperator sparkOperator = compiledInputs[i]; + if (curSparkOp.equals(sparkOperator)) { + continue; + } + + OperatorKey broadcastKey = new OperatorKey(scope, nig.getNextNodeId(scope)); + POBroadcastSpark poBroadcastSpark = new POBroadcastSpark(broadcastKey); + poBroadcastSpark.setBroadcastedVariableName(broadcastKey.toString()); + + sparkOperator.physicalPlan.addAsLeaf(poBroadcastSpark); + } + + POFRJoinSpark poFRJoinSpark = new POFRJoinSpark(op); + addToPlan(poFRJoinSpark); + phyToSparkOpMap.put(op, curSparkOp); + } catch (Exception e) { + int errCode = 2034; + String msg = "Error compiling operator " + op.getClass().getSimpleName(); + throw new SparkCompilerException(msg, errCode, PigException.BUG, e); + } + } + + @Override + public void visitMergeJoin(POMergeJoin joinOp) throws VisitorException { + try { + if (compiledInputs.length != 2 || joinOp.getInputs().size() != 2){ + int errCode=1101; + throw new SparkCompilerException("Merge Join must have exactly two inputs. Found : "+compiledInputs.length, errCode); + } + + curSparkOp = phyToSparkOpMap.get(joinOp.getInputs().get(0)); + SparkOperator rightSparkOp; + if(curSparkOp.equals(compiledInputs[0])) { + rightSparkOp = compiledInputs[1]; + } else { + rightSparkOp = compiledInputs[0]; + } + + PhysicalPlan rightPipelinePlan; + PhysicalPlan rightPhyPlan = rightSparkOp.physicalPlan; + if (rightPhyPlan.getRoots().size() != 1) { + int errCode = 2171; + String errMsg = "Expected one but found more then one root physical operator in physical plan."; + throw new SparkCompilerException(errMsg,errCode); + } + PhysicalOperator rightPhyLoader = rightPhyPlan.getRoots().get(0); + if (!(rightPhyLoader instanceof POLoad)) { + int errCode = 2172; + String errMsg = "Expected physical operator at root to be POLoad. Found : "+rightPhyLoader.getClass().getCanonicalName(); + throw new SparkCompilerException(errMsg,errCode); + } + if (rightPhyPlan.getSuccessors(rightPhyLoader) == null || rightPhyPlan.getSuccessors(rightPhyLoader).isEmpty()) { + // Load - Join case. + rightPipelinePlan = null; + } else{ // We got something on right side. Yank it and set it as inner plan of right input. + rightPipelinePlan = rightPhyPlan.clone(); + PhysicalOperator root = rightPipelinePlan.getRoots().get(0); + rightPipelinePlan.disconnect(root, rightPipelinePlan.getSuccessors(root).get(0)); + rightPipelinePlan.remove(root); + rightPhyPlan.trimBelow(rightPhyLoader); + } + + joinOp.setupRightPipeline(rightPipelinePlan); + rightSparkOp.setRequestedParallelism(1); // for indexing job + + POLoad rightLoader = (POLoad)rightSparkOp.physicalPlan.getRoots().get(0); + joinOp.setSignature(rightLoader.getSignature()); + LoadFunc rightLoadFunc = rightLoader.getLoadFunc(); + + if(IndexableLoadFunc.class.isAssignableFrom(rightLoadFunc.getClass())) { + joinOp.setRightLoaderFuncSpec(rightLoader.getLFile().getFuncSpec()); + joinOp.setRightInputFileName(rightLoader.getLFile().getFileName()); + curSparkOp.UDFs.add(rightLoader.getLFile().getFuncSpec().toString()); + + // we don't need the right rightSparkOp since + // the right loader is an IndexableLoadFunc which can handle the index itself + sparkPlan.remove(rightSparkOp); + if(rightSparkOp == compiledInputs[0]) { + compiledInputs[0] = null; + } else if(rightSparkOp == compiledInputs[1]) { + compiledInputs[1] = null; + } + + // validate that the join keys in merge join are only + // simple column projections or '*' and not expression - expressions + // cannot be handled when the index is built by the storage layer on the sorted + // data when the sorted data (and corresponding index) is written. + // So merge join will be restricted not have expressions as join keys + int numInputs = mPlan.getPredecessors(joinOp).size(); // should be 2 + for(int i = 0; i < numInputs; i++) { + List<PhysicalPlan> keyPlans = joinOp.getInnerPlansOf(i); + for (PhysicalPlan keyPlan : keyPlans) { + for(PhysicalOperator op : keyPlan) { + if(!(op instanceof POProject)) { + int errCode = 1106; + String errMsg = "Merge join is possible only for simple column or '*' join keys when using " + + rightLoader.getLFile().getFuncSpec() + " as the loader"; + throw new SparkCompilerException(errMsg, errCode, PigException.INPUT); + } + } + } + } + + } else { + //Replacing POLoad with indexer is disabled for 'merge-sparse' joins. While + //this feature would be useful, the current implementation of DefaultIndexableLoader + //is not designed to handle multiple calls to seekNear. Specifically, it rereads the entire index + //for each call. Some refactoring of this class is required - and then the check below could be removed. + if (joinOp.getJoinType() == LOJoin.JOINTYPE.MERGESPARSE) { + int errCode = 1104; + String errMsg = "Right input of merge-join must implement IndexableLoadFunc. " + + "The specified loader " + rightLoadFunc + " doesn't implement it"; + throw new SparkCompilerException(errMsg,errCode); + } + + // Replace POLoad with indexer. + if (! (OrderedLoadFunc.class.isAssignableFrom(rightLoadFunc.getClass()))){ + int errCode = 1104; + String errMsg = "Right input of merge-join must implement " + + "OrderedLoadFunc interface. The specified loader " + + rightLoadFunc + " doesn't implement it"; + throw new SparkCompilerException(errMsg,errCode); + } + + String[] indexerArgs = new String[6]; + List<PhysicalPlan> rightInpPlans = joinOp.getInnerPlansOf(1); + FileSpec origRightLoaderFileSpec = rightLoader.getLFile(); + + indexerArgs[0] = origRightLoaderFileSpec.getFuncSpec().toString(); + indexerArgs[1] = ObjectSerializer.serialize((Serializable)rightInpPlans); + indexerArgs[2] = ObjectSerializer.serialize(rightPipelinePlan); + indexerArgs[3] = rightLoader.getSignature(); + indexerArgs[4] = rightLoader.getOperatorKey().scope; + indexerArgs[5] = Boolean.toString(true); + + FileSpec lFile = new FileSpec(rightLoader.getLFile().getFileName(),new FuncSpec(MergeJoinIndexer.class.getName(), indexerArgs)); + rightLoader.setLFile(lFile); + + // (keyFirst1, keyFirst2, .. , position, splitIndex) See MergeJoinIndexer + rightSparkOp.useTypedComparator(true); + POStore idxStore = getStore(); + FileSpec idxStrFile = getTempFileSpec(); + idxStore.setSFile(idxStrFile); + rightSparkOp.physicalPlan.addAsLeaf(idxStore); + rightSparkOp.markIndexer(); + + curSparkOp.UDFs.add(origRightLoaderFileSpec.getFuncSpec().toString()); + + // We want to ensure indexing job runs prior to actual join job. + // So, connect them in order. + sparkPlan.connect(rightSparkOp, curSparkOp); + + // set up the DefaultIndexableLoader for the join operator + String[] defaultIndexableLoaderArgs = new String[5]; + defaultIndexableLoaderArgs[0] = origRightLoaderFileSpec.getFuncSpec().toString(); + defaultIndexableLoaderArgs[1] = idxStrFile.getFileName(); + defaultIndexableLoaderArgs[2] = idxStrFile.getFuncSpec().toString(); + defaultIndexableLoaderArgs[3] = joinOp.getOperatorKey().scope; + defaultIndexableLoaderArgs[4] = origRightLoaderFileSpec.getFileName(); + joinOp.setRightLoaderFuncSpec((new FuncSpec(DefaultIndexableLoader.class.getName(), defaultIndexableLoaderArgs))); + joinOp.setRightInputFileName(origRightLoaderFileSpec.getFileName()); + + joinOp.setIndexFile(idxStrFile.getFileName()); + } + + curSparkOp.physicalPlan.addAsLeaf(joinOp); + phyToSparkOpMap.put(joinOp, curSparkOp); + + } catch (Exception e) { + int errCode = 2034; + String msg = "Error compiling operator " + + joinOp.getClass().getSimpleName(); + throw new SparkCompilerException(msg, errCode, PigException.BUG, e); + } + } + + private void processUDFs(PhysicalPlan plan) throws VisitorException { + if (plan != null) { + // Process Scalars (UDF with referencedOperators) + ScalarPhyFinder scalarPhyFinder = new ScalarPhyFinder(plan); + scalarPhyFinder.visit(); + curSparkOp.scalars.addAll(scalarPhyFinder.getScalars()); + + // Process UDFs + udfFinder.setPlan(plan); + udfFinder.visit(); + curSparkOp.UDFs.addAll(udfFinder.getUDFs()); + } + } + + private void addToPlan(PhysicalOperator op) throws PlanException, + IOException { + SparkOperator sparkOp = null; + if (compiledInputs.length == 1) { + sparkOp = compiledInputs[0]; + } else { + sparkOp = merge(compiledInputs); + } + sparkOp.physicalPlan.addAsLeaf(op); + curSparkOp = sparkOp; + } + + private SparkOperator merge(SparkOperator[] compiledInputs) + throws PlanException { + SparkOperator ret = getSparkOp(); + sparkPlan.add(ret); Set<SparkOperator> toBeConnected = new HashSet<SparkOperator>(); - List<SparkOperator> toBeRemoved = new ArrayList<SparkOperator>(); + List<SparkOperator> toBeRemoved = new ArrayList<SparkOperator>(); - List<PhysicalPlan> toBeMerged = new ArrayList<PhysicalPlan>(); + List<PhysicalPlan> toBeMerged = new ArrayList<PhysicalPlan>(); - for (SparkOperator sparkOp : compiledInputs) { + for (SparkOperator sparkOp : compiledInputs) { if (LOG.isDebugEnabled()) LOG.debug("Merging Spark operator" + sparkOp); - toBeRemoved.add(sparkOp); - toBeMerged.add(sparkOp.physicalPlan); - List<SparkOperator> predecessors = sparkPlan - .getPredecessors(sparkOp); - if (predecessors != null) { - for (SparkOperator predecessorSparkOp : predecessors) { - toBeConnected.add(predecessorSparkOp); - } - } - } - merge(ret.physicalPlan, toBeMerged); - - Iterator<SparkOperator> it = toBeConnected.iterator(); - while (it.hasNext()) - sparkPlan.connect(it.next(), ret); - for (SparkOperator removeSparkOp : toBeRemoved) { - if (removeSparkOp.requestedParallelism > ret.requestedParallelism) - ret.requestedParallelism = removeSparkOp.requestedParallelism; - for (String udf : removeSparkOp.UDFs) { - if (!ret.UDFs.contains(udf)) - ret.UDFs.add(udf); - } - // We also need to change scalar marking - for (PhysicalOperator physOp : removeSparkOp.scalars) { - if (!ret.scalars.contains(physOp)) { - ret.scalars.add(physOp); - } - } - - if(removeSparkOp.getCrossKeys()!=null){ - for(String crossKey: removeSparkOp.getCrossKeys()) - ret.addCrossKey(crossKey); - } - - - Set<PhysicalOperator> opsToChange = new HashSet<PhysicalOperator>(); - for (Map.Entry<PhysicalOperator, SparkOperator> entry : phyToSparkOpMap - .entrySet()) { - if (entry.getValue() == removeSparkOp) { - opsToChange.add(entry.getKey()); - } - } - for (PhysicalOperator op : opsToChange) { - phyToSparkOpMap.put(op, ret); - } - - sparkPlan.remove(removeSparkOp); - } - return ret; - } - - /** - * The merge of a list of plans into a single physicalPlan - * - * @param <O> - * @param <E> - * @param finPlan - * - Final Plan into which the list of plans is merged - * @param plans - * - list of plans to be merged - * @throws PlanException - */ - private <O extends Operator<?>, E extends OperatorPlan<O>> void merge( - E finPlan, List<E> plans) throws PlanException { - for (E e : plans) { - finPlan.merge(e); - } - } + toBeRemoved.add(sparkOp); + toBeMerged.add(sparkOp.physicalPlan); + List<SparkOperator> predecessors = sparkPlan + .getPredecessors(sparkOp); + if (predecessors != null) { + for (SparkOperator predecessorSparkOp : predecessors) { + toBeConnected.add(predecessorSparkOp); + } + } + } + merge(ret.physicalPlan, toBeMerged); + + Iterator<SparkOperator> it = toBeConnected.iterator(); + while (it.hasNext()) + sparkPlan.connect(it.next(), ret); + for (SparkOperator removeSparkOp : toBeRemoved) { + if (removeSparkOp.requestedParallelism > ret.requestedParallelism) + ret.requestedParallelism = removeSparkOp.requestedParallelism; + for (String udf : removeSparkOp.UDFs) { + if (!ret.UDFs.contains(udf)) + ret.UDFs.add(udf); + } + // We also need to change scalar marking + for (PhysicalOperator physOp : removeSparkOp.scalars) { + if (!ret.scalars.contains(physOp)) { + ret.scalars.add(physOp); + } + } + + if(removeSparkOp.getCrossKeys()!=null){ + for(String crossKey: removeSparkOp.getCrossKeys()) + ret.addCrossKey(crossKey); + } + + + Set<PhysicalOperator> opsToChange = new HashSet<PhysicalOperator>(); + for (Map.Entry<PhysicalOperator, SparkOperator> entry : phyToSparkOpMap + .entrySet()) { + if (entry.getValue() == removeSparkOp) { + opsToChange.add(entry.getKey()); + } + } + for (PhysicalOperator op : opsToChange) { + phyToSparkOpMap.put(op, ret); + } + + sparkPlan.remove(removeSparkOp); + } + return ret; + } + + /** + * The merge of a list of plans into a single physicalPlan + * + * @param <O> + * @param <E> + * @param finPlan + * - Final Plan into which the list of plans is merged + * @param plans + * - list of plans to be merged + * @throws PlanException + */ + private <O extends Operator<?>, E extends OperatorPlan<O>> void merge( + E finPlan, List<E> plans) throws PlanException { + for (E e : plans) { + finPlan.merge(e); + } + } @Override public void visitMergeCoGroup(POMergeCogroup poCoGrp) throws VisitorException { @@ -1200,24 +1200,24 @@ public class SparkCompiler extends PhyPl /** * build a POPoissonSampleSpark operator for SkewedJoin's sampling job */ - private void addSampleOperatorForSkewedJoin(SparkOperator sampleSparkOp) - throws PlanException { - Configuration conf = ConfigurationUtil.toConfiguration(pigProperties); - int sampleRate = conf.getInt( - PigConfiguration.PIG_POISSON_SAMPLER_SAMPLE_RATE, - POPoissonSampleSpark.DEFAULT_SAMPLE_RATE); - float heapPerc = conf.getFloat( - PigConfiguration.PIG_SKEWEDJOIN_REDUCE_MEMUSAGE, - PartitionSkewedKeys.DEFAULT_PERCENT_MEMUSAGE); - long totalMemory = conf.getLong( - PigConfiguration.PIG_SKEWEDJOIN_REDUCE_MEM, -1); - - POPoissonSampleSpark poSample = new POPoissonSampleSpark( - new OperatorKey(scope, nig.getNextNodeId(scope)), -1, - sampleRate, heapPerc, totalMemory); + private void addSampleOperatorForSkewedJoin(SparkOperator sampleSparkOp) + throws PlanException { + Configuration conf = ConfigurationUtil.toConfiguration(pigProperties); + int sampleRate = conf.getInt( + PigConfiguration.PIG_POISSON_SAMPLER_SAMPLE_RATE, + POPoissonSampleSpark.DEFAULT_SAMPLE_RATE); + float heapPerc = conf.getFloat( + PigConfiguration.PIG_SKEWEDJOIN_REDUCE_MEMUSAGE, + PartitionSkewedKeys.DEFAULT_PERCENT_MEMUSAGE); + long totalMemory = conf.getLong( + PigConfiguration.PIG_SKEWEDJOIN_REDUCE_MEM, -1); + + POPoissonSampleSpark poSample = new POPoissonSampleSpark( + new OperatorKey(scope, nig.getNextNodeId(scope)), -1, + sampleRate, heapPerc, totalMemory); - sampleSparkOp.physicalPlan.addAsLeaf(poSample); - } + sampleSparkOp.physicalPlan.addAsLeaf(poSample); + } private SparkOperator getSortJob( POSort sort, @@ -1480,17 +1480,17 @@ public class SparkCompiler extends PhyPl throw new PlanException(msg, errCode, PigException.BUG); } - /** - * Add POBroadcastSpark operator to broadcast key distribution for SkewedJoin's sampling job - * @param sampleSparkOp - * @throws PlanException - */ - private void buildBroadcastForSkewedJoin(SparkOperator sampleSparkOp, String pigKeyDistFile) throws PlanException { - - POBroadcastSpark poBroadcast = new POBroadcastSpark(new OperatorKey(scope, nig.getNextNodeId(scope))); - poBroadcast.setBroadcastedVariableName(pigKeyDistFile); - sampleSparkOp.physicalPlan.addAsLeaf(poBroadcast); - } + /** + * Add POBroadcastSpark operator to broadcast key distribution for SkewedJoin's sampling job + * @param sampleSparkOp + * @throws PlanException + */ + private void buildBroadcastForSkewedJoin(SparkOperator sampleSparkOp, String pigKeyDistFile) throws PlanException { + + POBroadcastSpark poBroadcast = new POBroadcastSpark(new OperatorKey(scope, nig.getNextNodeId(scope))); + poBroadcast.setBroadcastedVariableName(pigKeyDistFile); + sampleSparkOp.physicalPlan.addAsLeaf(poBroadcast); + } /** * Create Sampling job for skewed join.
Modified: pig/branches/spark/src/org/apache/pig/backend/hadoop/executionengine/spark/plan/SparkOpPlanVisitor.java URL: http://svn.apache.org/viewvc/pig/branches/spark/src/org/apache/pig/backend/hadoop/executionengine/spark/plan/SparkOpPlanVisitor.java?rev=1791060&r1=1791059&r2=1791060&view=diff ============================================================================== --- pig/branches/spark/src/org/apache/pig/backend/hadoop/executionengine/spark/plan/SparkOpPlanVisitor.java (original) +++ pig/branches/spark/src/org/apache/pig/backend/hadoop/executionengine/spark/plan/SparkOpPlanVisitor.java Wed Apr 12 02:20:20 2017 @@ -25,17 +25,15 @@ import org.apache.pig.impl.plan.VisitorE * A visitor for the SparkOperPlan class */ public class SparkOpPlanVisitor extends - PlanVisitor<SparkOperator, SparkOperPlan> { + PlanVisitor<SparkOperator, SparkOperPlan> { - public SparkOpPlanVisitor(SparkOperPlan plan, - PlanWalker<SparkOperator, SparkOperPlan> walker) { - super(plan, walker); - // TODO Auto-generated constructor stub - } + public SparkOpPlanVisitor(SparkOperPlan plan, + PlanWalker<SparkOperator, SparkOperPlan> walker) { + super(plan, walker); + } - public void visitSparkOp(SparkOperator sparkOperator) - throws VisitorException { - // TODO Auto-generated method stub - } + public void visitSparkOp(SparkOperator sparkOperator) + throws VisitorException { + } } Modified: pig/branches/spark/src/org/apache/pig/backend/hadoop/executionengine/spark/plan/SparkOperator.java URL: http://svn.apache.org/viewvc/pig/branches/spark/src/org/apache/pig/backend/hadoop/executionengine/spark/plan/SparkOperator.java?rev=1791060&r1=1791059&r2=1791060&view=diff ============================================================================== --- pig/branches/spark/src/org/apache/pig/backend/hadoop/executionengine/spark/plan/SparkOperator.java (original) +++ pig/branches/spark/src/org/apache/pig/backend/hadoop/executionengine/spark/plan/SparkOperator.java Wed Apr 12 02:20:20 2017 @@ -35,18 +35,18 @@ import org.apache.pig.impl.util.MultiMap * execute in spark. */ public class SparkOperator extends Operator<SparkOpPlanVisitor> { - private static enum OPER_FEATURE { - NONE, - // Indicate if this job is a sampling job - SAMPLER, - // Indicate if this job is a merge indexer - INDEXER, - // Indicate if this job is a group by job - GROUPBY, - // Indicate if this job is a cogroup job - COGROUP, - // Indicate if this job is a regular join job - HASHJOIN, + private static enum OPER_FEATURE { + NONE, + // Indicate if this job is a sampling job + SAMPLER, + // Indicate if this job is a merge indexer + INDEXER, + // Indicate if this job is a group by job + GROUPBY, + // Indicate if this job is a cogroup job + COGROUP, + // Indicate if this job is a regular join job + HASHJOIN, // Indicate if this job is a union job UNION, // Indicate if this job is a native job @@ -55,32 +55,32 @@ public class SparkOperator extends Opera LIMIT, // Indicate if this job is a limit job after sort LIMIT_AFTER_SORT; - }; + }; - public PhysicalPlan physicalPlan; + public PhysicalPlan physicalPlan; - public Set<String> UDFs; + public Set<String> UDFs; - /* Name of the Custom Partitioner used */ - public String customPartitioner = null; + /* Name of the Custom Partitioner used */ + public String customPartitioner = null; - public Set<PhysicalOperator> scalars; + public Set<PhysicalOperator> scalars; - public int requestedParallelism = -1; + public int requestedParallelism = -1; private BitSet feature = new BitSet(); - private boolean splitter = false; + private boolean splitter = false; - // Name of the partition file generated by sampling process, - // Used by Skewed Join - private String skewedJoinPartitionFile; + // Name of the partition file generated by sampling process, + // Used by Skewed Join + private String skewedJoinPartitionFile; - private boolean usingTypedComparator = false; + private boolean usingTypedComparator = false; - private boolean combineSmallSplits = true; + private boolean combineSmallSplits = true; - private List<String> crossKeys = null; + private List<String> crossKeys = null; private MultiMap<OperatorKey, OperatorKey> multiQueryOptimizeConnectionMap = new MultiMap<OperatorKey, OperatorKey>(); @@ -100,126 +100,126 @@ public class SparkOperator extends Opera scalars = new HashSet<PhysicalOperator>(); } - @Override - public boolean supportsMultipleInputs() { - return true; - } - - @Override - public boolean supportsMultipleOutputs() { - return true; - } - - @Override - public String name() { - String udfStr = getUDFsAsStr(); - StringBuilder sb = new StringBuilder("Spark" + "(" - + requestedParallelism + (udfStr.equals("") ? "" : ",") - + udfStr + ")" + " - " + mKey.toString()); - return sb.toString(); - } - - private String getUDFsAsStr() { - StringBuilder sb = new StringBuilder(); - if (UDFs != null && UDFs.size() > 0) { - for (String str : UDFs) { - sb.append(str.substring(str.lastIndexOf('.') + 1)); - sb.append(','); - } - sb.deleteCharAt(sb.length() - 1); - } - return sb.toString(); - } - - public void add(PhysicalOperator physicalOper) { - this.physicalPlan.add(physicalOper); - } - - @Override - public void visit(SparkOpPlanVisitor v) throws VisitorException { - v.visitSparkOp(this); - } - - public void addCrossKey(String key) { - if (crossKeys == null) { - crossKeys = new ArrayList<String>(); - } - crossKeys.add(key); - } - - public List<String> getCrossKeys() { - return crossKeys; - } + @Override + public boolean supportsMultipleInputs() { + return true; + } + + @Override + public boolean supportsMultipleOutputs() { + return true; + } + + @Override + public String name() { + String udfStr = getUDFsAsStr(); + StringBuilder sb = new StringBuilder("Spark" + "(" + + requestedParallelism + (udfStr.equals("") ? "" : ",") + + udfStr + ")" + " - " + mKey.toString()); + return sb.toString(); + } + + private String getUDFsAsStr() { + StringBuilder sb = new StringBuilder(); + if (UDFs != null && UDFs.size() > 0) { + for (String str : UDFs) { + sb.append(str.substring(str.lastIndexOf('.') + 1)); + sb.append(','); + } + sb.deleteCharAt(sb.length() - 1); + } + return sb.toString(); + } + + public void add(PhysicalOperator physicalOper) { + this.physicalPlan.add(physicalOper); + } + + @Override + public void visit(SparkOpPlanVisitor v) throws VisitorException { + v.visitSparkOp(this); + } + + public void addCrossKey(String key) { + if (crossKeys == null) { + crossKeys = new ArrayList<String>(); + } + crossKeys.add(key); + } + + public List<String> getCrossKeys() { + return crossKeys; + } - public boolean isGroupBy() { + public boolean isGroupBy() { return feature.get(OPER_FEATURE.GROUPBY.ordinal()); } - public void markGroupBy() { + public void markGroupBy() { feature.set(OPER_FEATURE.GROUPBY.ordinal()); } - public boolean isCogroup() { + public boolean isCogroup() { return feature.get(OPER_FEATURE.COGROUP.ordinal()); } - public void markCogroup() { + public void markCogroup() { feature.set(OPER_FEATURE.COGROUP.ordinal()); } - public boolean isRegularJoin() { + public boolean isRegularJoin() { return feature.get(OPER_FEATURE.HASHJOIN.ordinal()); } - public void markRegularJoin() { + public void markRegularJoin() { feature.set(OPER_FEATURE.HASHJOIN.ordinal()); } - public int getRequestedParallelism() { - return requestedParallelism; - } - - public void setSplitter(boolean spl) { - splitter = spl; - } - - public boolean isSplitter() { - return splitter; - } + public int getRequestedParallelism() { + return requestedParallelism; + } - public boolean isSampler() { + public void setSplitter(boolean spl) { + splitter = spl; + } + + public boolean isSplitter() { + return splitter; + } + + public boolean isSampler() { return feature.get(OPER_FEATURE.SAMPLER.ordinal()); } - public void markSampler() { + public void markSampler() { feature.set(OPER_FEATURE.SAMPLER.ordinal()); } - public void setSkewedJoinPartitionFile(String file) { - skewedJoinPartitionFile = file; - } - - public String getSkewedJoinPartitionFile() { - return skewedJoinPartitionFile; - } - - protected boolean usingTypedComparator() { - return usingTypedComparator; - } - - protected void useTypedComparator(boolean useTypedComparator) { - this.usingTypedComparator = useTypedComparator; - } - - protected void noCombineSmallSplits() { - combineSmallSplits = false; - } - - public boolean combineSmallSplits() { - return combineSmallSplits; - } + public void setSkewedJoinPartitionFile(String file) { + skewedJoinPartitionFile = file; + } + + public String getSkewedJoinPartitionFile() { + return skewedJoinPartitionFile; + } + + protected boolean usingTypedComparator() { + return usingTypedComparator; + } + + protected void useTypedComparator(boolean useTypedComparator) { + this.usingTypedComparator = useTypedComparator; + } + + protected void noCombineSmallSplits() { + combineSmallSplits = false; + } + + public boolean combineSmallSplits() { + return combineSmallSplits; + } - public boolean isIndexer() { + public boolean isIndexer() { return feature.get(OPER_FEATURE.INDEXER.ordinal()); } Modified: pig/branches/spark/src/org/apache/pig/backend/hadoop/executionengine/spark/plan/SparkPOPackageAnnotator.java URL: http://svn.apache.org/viewvc/pig/branches/spark/src/org/apache/pig/backend/hadoop/executionengine/spark/plan/SparkPOPackageAnnotator.java?rev=1791060&r1=1791059&r2=1791060&view=diff ============================================================================== --- pig/branches/spark/src/org/apache/pig/backend/hadoop/executionengine/spark/plan/SparkPOPackageAnnotator.java (original) +++ pig/branches/spark/src/org/apache/pig/backend/hadoop/executionengine/spark/plan/SparkPOPackageAnnotator.java Wed Apr 12 02:20:20 2017 @@ -45,101 +45,101 @@ import org.apache.pig.impl.util.Pair; * stitched in to the "value" */ public class SparkPOPackageAnnotator extends SparkOpPlanVisitor { - private static final Log LOG = LogFactory.getLog(SparkPOPackageAnnotator.class); + private static final Log LOG = LogFactory.getLog(SparkPOPackageAnnotator.class); - public SparkPOPackageAnnotator(SparkOperPlan plan) { - super(plan, new DepthFirstWalker<SparkOperator, SparkOperPlan>(plan)); - } - - @Override - public void visitSparkOp(SparkOperator sparkOp) throws VisitorException { - if (!sparkOp.physicalPlan.isEmpty()) { - PackageDiscoverer pkgDiscoverer = new PackageDiscoverer( - sparkOp.physicalPlan); - pkgDiscoverer.visit(); - } - } - - static class PackageDiscoverer extends PhyPlanVisitor { - private POPackage pkg; - private PhysicalPlan plan; - - public PackageDiscoverer(PhysicalPlan plan) { - super(plan, new DepthFirstWalker<PhysicalOperator, PhysicalPlan>( - plan)); - this.plan = plan; - } - - @Override - public void visitPackage(POPackage pkg) throws VisitorException { - this.pkg = pkg; - - // Find POLocalRearrange(s) corresponding to this POPackage - PhysicalOperator graOp = plan.getPredecessors(pkg).get(0); - if (! (graOp instanceof POGlobalRearrange)) { - throw new OptimizerException("Package operator is not preceded by " + - "GlobalRearrange operator in Spark Plan", 2087, PigException.BUG); - } + public SparkPOPackageAnnotator(SparkOperPlan plan) { + super(plan, new DepthFirstWalker<SparkOperator, SparkOperPlan>(plan)); + } + + @Override + public void visitSparkOp(SparkOperator sparkOp) throws VisitorException { + if (!sparkOp.physicalPlan.isEmpty()) { + PackageDiscoverer pkgDiscoverer = new PackageDiscoverer( + sparkOp.physicalPlan); + pkgDiscoverer.visit(); + } + } + + static class PackageDiscoverer extends PhyPlanVisitor { + private POPackage pkg; + private PhysicalPlan plan; + + public PackageDiscoverer(PhysicalPlan plan) { + super(plan, new DepthFirstWalker<PhysicalOperator, PhysicalPlan>( + plan)); + this.plan = plan; + } + + @Override + public void visitPackage(POPackage pkg) throws VisitorException { + this.pkg = pkg; + + // Find POLocalRearrange(s) corresponding to this POPackage + PhysicalOperator graOp = plan.getPredecessors(pkg).get(0); + if (! (graOp instanceof POGlobalRearrange)) { + throw new OptimizerException("Package operator is not preceded by " + + "GlobalRearrange operator in Spark Plan", 2087, PigException.BUG); + } - List<PhysicalOperator> lraOps = plan.getPredecessors(graOp); - if (pkg.getNumInps() != lraOps.size()) { + List<PhysicalOperator> lraOps = plan.getPredecessors(graOp); + if (pkg.getNumInps() != lraOps.size()) { throw new OptimizerException("Unexpected problem during optimization. " + - "Could not find all LocalRearrange operators. Expected " + pkg.getNumInps() + - ". Got " + lraOps.size() + ".", 2086, PigException.BUG); - } - Collections.sort(lraOps); - for (PhysicalOperator op : lraOps) { - if (! (op instanceof POLocalRearrange)) { - throw new OptimizerException("GlobalRearrange operator can only be preceded by " + - "LocalRearrange operator(s) in Spark Plan", 2087, PigException.BUG); - } - annotatePkgWithLRA((POLocalRearrange)op); - } - }; - - private void annotatePkgWithLRA(POLocalRearrange lrearrange) - throws VisitorException { - - Map<Integer, Pair<Boolean, Map<Integer, Integer>>> keyInfo; - if (LOG.isDebugEnabled()) - LOG.debug("Annotating package " + pkg + " with localrearrange operator " + "Could not find all LocalRearrange operators. Expected " + pkg.getNumInps() + + ". Got " + lraOps.size() + ".", 2086, PigException.BUG); + } + Collections.sort(lraOps); + for (PhysicalOperator op : lraOps) { + if (! (op instanceof POLocalRearrange)) { + throw new OptimizerException("GlobalRearrange operator can only be preceded by " + + "LocalRearrange operator(s) in Spark Plan", 2087, PigException.BUG); + } + annotatePkgWithLRA((POLocalRearrange)op); + } + }; + + private void annotatePkgWithLRA(POLocalRearrange lrearrange) + throws VisitorException { + + Map<Integer, Pair<Boolean, Map<Integer, Integer>>> keyInfo; + if (LOG.isDebugEnabled()) + LOG.debug("Annotating package " + pkg + " with localrearrange operator " + lrearrange + " with index " + lrearrange.getIndex()); - if (pkg.getPkgr() instanceof LitePackager) { - if (lrearrange.getIndex() != 0) { - throw new RuntimeException( - "POLocalRearrange for POPackageLite cannot have index other than 0, but has index - " - + lrearrange.getIndex()); - } - } - - // annotate the package with information from the LORearrange - // update the keyInfo information if already present in the - // POPackage - keyInfo = pkg.getPkgr().getKeyInfo(); - if (keyInfo == null) - keyInfo = new HashMap<Integer, Pair<Boolean, Map<Integer, Integer>>>(); - - if (keyInfo.get(Integer.valueOf(lrearrange.getIndex())) != null) { - // something is wrong - we should not be getting key info - // for the same index from two different Local Rearranges - int errCode = 2087; - String msg = "Unexpected problem during optimization." - + " Found index:" + lrearrange.getIndex() - + " in multiple LocalRearrange operators."; - throw new OptimizerException(msg, errCode, PigException.BUG); - - } - keyInfo.put( - Integer.valueOf(lrearrange.getIndex()), - new Pair<Boolean, Map<Integer, Integer>>(lrearrange - .isProjectStar(), lrearrange.getProjectedColsMap())); - if (LOG.isDebugEnabled()) + if (pkg.getPkgr() instanceof LitePackager) { + if (lrearrange.getIndex() != 0) { + throw new RuntimeException( + "POLocalRearrange for POPackageLite cannot have index other than 0, but has index - " + + lrearrange.getIndex()); + } + } + + // annotate the package with information from the LORearrange + // update the keyInfo information if already present in the + // POPackage + keyInfo = pkg.getPkgr().getKeyInfo(); + if (keyInfo == null) + keyInfo = new HashMap<Integer, Pair<Boolean, Map<Integer, Integer>>>(); + + if (keyInfo.get(Integer.valueOf(lrearrange.getIndex())) != null) { + // something is wrong - we should not be getting key info + // for the same index from two different Local Rearranges + int errCode = 2087; + String msg = "Unexpected problem during optimization." + + " Found index:" + lrearrange.getIndex() + + " in multiple LocalRearrange operators."; + throw new OptimizerException(msg, errCode, PigException.BUG); + + } + keyInfo.put( + Integer.valueOf(lrearrange.getIndex()), + new Pair<Boolean, Map<Integer, Integer>>(lrearrange + .isProjectStar(), lrearrange.getProjectedColsMap())); + if (LOG.isDebugEnabled()) LOG.debug("KeyInfo for packager for package operator " + pkg + " is " + keyInfo ); - pkg.getPkgr().setKeyInfo(keyInfo); - pkg.getPkgr().setKeyTuple(lrearrange.isKeyTuple()); - pkg.getPkgr().setKeyCompound(lrearrange.isKeyCompound()); - } - } + pkg.getPkgr().setKeyInfo(keyInfo); + pkg.getPkgr().setKeyTuple(lrearrange.isKeyTuple()); + pkg.getPkgr().setKeyCompound(lrearrange.isKeyCompound()); + } + } } \ No newline at end of file Modified: pig/branches/spark/src/org/apache/pig/backend/hadoop/executionengine/spark/plan/SparkPrinter.java URL: http://svn.apache.org/viewvc/pig/branches/spark/src/org/apache/pig/backend/hadoop/executionengine/spark/plan/SparkPrinter.java?rev=1791060&r1=1791059&r2=1791060&view=diff ============================================================================== --- pig/branches/spark/src/org/apache/pig/backend/hadoop/executionengine/spark/plan/SparkPrinter.java (original) +++ pig/branches/spark/src/org/apache/pig/backend/hadoop/executionengine/spark/plan/SparkPrinter.java Wed Apr 12 02:20:20 2017 @@ -34,20 +34,20 @@ import org.apache.pig.impl.plan.VisitorE */ public class SparkPrinter extends SparkOpPlanVisitor { - private PrintStream mStream = null; - private boolean isVerbose = true; + private PrintStream mStream = null; + private boolean isVerbose = true; - public SparkPrinter(PrintStream ps, SparkOperPlan plan) { - super(plan, new DepthFirstWalker<SparkOperator, SparkOperPlan>(plan)); - mStream = ps; - mStream.println("#--------------------------------------------------"); - mStream.println("# Spark Plan "); - mStream.println("#--------------------------------------------------"); - } + public SparkPrinter(PrintStream ps, SparkOperPlan plan) { + super(plan, new DepthFirstWalker<SparkOperator, SparkOperPlan>(plan)); + mStream = ps; + mStream.println("#--------------------------------------------------"); + mStream.println("# Spark Plan "); + mStream.println("#--------------------------------------------------"); + } - public void setVerbose(boolean verbose) { - isVerbose = verbose; - } + public void setVerbose(boolean verbose) { + isVerbose = verbose; + } @Override public void visitSparkOp(SparkOperator sparkOp) throws VisitorException {