Modified: pig/branches/spark/src/org/apache/pig/backend/hadoop/executionengine/spark/plan/SparkCompiler.java URL: http://svn.apache.org/viewvc/pig/branches/spark/src/org/apache/pig/backend/hadoop/executionengine/spark/plan/SparkCompiler.java?rev=1665404&r1=1665403&r2=1665404&view=diff ============================================================================== --- pig/branches/spark/src/org/apache/pig/backend/hadoop/executionengine/spark/plan/SparkCompiler.java (original) +++ pig/branches/spark/src/org/apache/pig/backend/hadoop/executionengine/spark/plan/SparkCompiler.java Tue Mar 10 04:37:36 2015 @@ -56,7 +56,7 @@ import org.apache.pig.backend.hadoop.exe import org.apache.pig.backend.hadoop.executionengine.physicalLayer.relationalOperators.POUnion; import org.apache.pig.backend.hadoop.executionengine.physicalLayer.relationalOperators.Packager; import org.apache.pig.backend.hadoop.executionengine.physicalLayer.util.PlanHelper; -import org.apache.pig.backend.hadoop.executionengine.spark.operator.NativeSparkOper; +import org.apache.pig.backend.hadoop.executionengine.spark.operator.NativeSparkOperator; import org.apache.pig.backend.hadoop.executionengine.spark.operator.POStreamSpark; import org.apache.pig.impl.PigContext; import org.apache.pig.impl.io.FileSpec; @@ -69,601 +69,652 @@ import org.apache.pig.impl.plan.PlanExce import org.apache.pig.impl.plan.VisitorException; /** - * The compiler that compiles a given physical physicalPlan - * into a DAG of Spark operators + * The compiler that compiles a given physical physicalPlan into a DAG of Spark + * operators */ public class SparkCompiler extends PhyPlanVisitor { - private PigContext pigContext; + private PigContext pigContext; - //The physicalPlan that is being compiled - private PhysicalPlan physicalPlan; + // The physicalPlan that is being compiled + private PhysicalPlan physicalPlan; - //The physicalPlan of Spark Operators - private SparkOperPlan sparkPlan; + // The physicalPlan of Spark Operators + private SparkOperPlan sparkPlan; - private SparkOper curSparkOp; - - private String scope; - - private SparkOper[] compiledInputs = null; - - private Map<OperatorKey, SparkOper> splitsSeen; - - private NodeIdGenerator nig; - - private Map<PhysicalOperator,SparkOper> phyToSparkOpMap; - private UDFFinder udfFinder; - - public SparkCompiler(PhysicalPlan physicalPlan, - PigContext pigContext){ - super(physicalPlan, new DepthFirstWalker<PhysicalOperator, PhysicalPlan>(physicalPlan)); - this.physicalPlan = physicalPlan; - this.pigContext = pigContext; - this.sparkPlan = new SparkOperPlan(); - this.phyToSparkOpMap = new HashMap<PhysicalOperator,SparkOper>(); - this.udfFinder = new UDFFinder(); - this.nig = NodeIdGenerator.getGenerator(); - this.splitsSeen = new HashMap<OperatorKey, SparkOper>(); - - } - - public void compile() throws IOException, PlanException, VisitorException { - List<PhysicalOperator> roots = physicalPlan.getRoots(); - if((roots == null) || (roots.size() <= 0)) { - int errCode = 2053; - String msg = "Internal error. Did not find roots in the physical physicalPlan."; - throw new SparkCompilerException(msg, errCode, PigException.BUG); - } - scope = roots.get(0).getOperatorKey().getScope(); - List<PhysicalOperator> leaves = physicalPlan.getLeaves(); - - if (!pigContext.inIllustrator) - for (PhysicalOperator op : leaves) { - if (!(op instanceof POStore)) { - int errCode = 2025; - String msg = "Expected leaf of reduce physicalPlan to " + - "always be POStore. Found " + op.getClass().getSimpleName(); - throw new SparkCompilerException(msg, errCode, PigException.BUG); - } - } - - // get all stores and nativeSpark operators, sort them in order(operator id) - // and compile their plans - List<POStore> stores = PlanHelper.getPhysicalOperators(physicalPlan, POStore.class); - List<PONative> nativeSparks= PlanHelper.getPhysicalOperators(physicalPlan, PONative.class); - List<PhysicalOperator> ops; - if (!pigContext.inIllustrator) { - ops = new ArrayList<PhysicalOperator>(stores.size() + nativeSparks.size()); - ops.addAll(stores); - } else { - ops = new ArrayList<PhysicalOperator>(leaves.size() + nativeSparks.size()); - ops.addAll(leaves); - } - ops.addAll(nativeSparks); - Collections.sort(ops); - - for (PhysicalOperator op : ops) { - compile(op); - } - } - - /** - * Compiles the physicalPlan below op into a Spark Operator - * and stores it in curSparkOp. - * @param op - * @throws IOException - * @throws PlanException - * @throws VisitorException - */ - private void compile(PhysicalOperator op) throws IOException, PlanException, VisitorException { - SparkOper[] prevCompInp = compiledInputs; - - List<PhysicalOperator> predecessors = physicalPlan.getPredecessors(op); - if(op instanceof PONative){ - // the predecessor (store) has already been processed - // don't process it again - } - else if (predecessors != null && predecessors.size() > 0) { - // When processing an entire script (multiquery), we can - // get into a situation where a load has - // predecessors. This means that it depends on some store - // earlier in the physicalPlan. We need to take that dependency - // and connect the respective Spark operators, while at the - // same time removing the connection between the Physical - // operators. That way the jobs will run in the right - // order. - if (op instanceof POLoad) { - - if (predecessors.size() != 1) { - int errCode = 2125; - String msg = "Expected at most one predecessor of load. Got "+predecessors.size(); - throw new PlanException(msg, errCode, PigException.BUG); - } - - PhysicalOperator p = predecessors.get(0); - SparkOper oper = null; - if(p instanceof POStore || p instanceof PONative){ - oper = phyToSparkOpMap.get(p); - }else{ - int errCode = 2126; - String msg = "Predecessor of load should be a store or spark operator. Got "+p.getClass(); - throw new PlanException(msg, errCode, PigException.BUG); - } - - // Need new operator - curSparkOp = getSparkOp(); - curSparkOp.add(op); - sparkPlan.add(curSparkOp); - physicalPlan.disconnect(op, p); - sparkPlan.connect(oper, curSparkOp); - phyToSparkOpMap.put(op, curSparkOp); - return; - } - - Collections.sort(predecessors); - compiledInputs = new SparkOper[predecessors.size()]; - int i = -1; - for (PhysicalOperator pred : predecessors) { - if(pred instanceof POSplit && splitsSeen.containsKey(pred.getOperatorKey())){ - compiledInputs[++i] = startNew(((POSplit)pred).getSplitStore(), splitsSeen.get(pred.getOperatorKey())); - continue; - } - compile(pred); - compiledInputs[++i] = curSparkOp; - } - } else { - //No predecessors. Mostly a load. But this is where - //we start. We create a new sparkOp and add its first - //operator op. Also this should be added to the sparkPlan. - curSparkOp = getSparkOp(); - curSparkOp.add(op); - if (op !=null && op instanceof POLoad) - { - if (((POLoad)op).getLFile()!=null && ((POLoad)op).getLFile().getFuncSpec()!=null) - curSparkOp.UDFs.add(((POLoad)op).getLFile().getFuncSpec().toString()); - } - sparkPlan.add(curSparkOp); - phyToSparkOpMap.put(op, curSparkOp); - return; - } - op.visit(this); - compiledInputs = prevCompInp; - } - - - private SparkOper getSparkOp() { - return new SparkOper(OperatorKey.genOpKey(scope)); - } - - public SparkOperPlan getSparkPlan() { - return sparkPlan; - } - - public void connectSoftLink() throws PlanException, IOException { - for (PhysicalOperator op : physicalPlan) { - if (physicalPlan.getSoftLinkPredecessors(op)!=null) { - for (PhysicalOperator pred : physicalPlan.getSoftLinkPredecessors(op)) { - SparkOper from = phyToSparkOpMap.get(pred); - SparkOper to = phyToSparkOpMap.get(op); - if (from==to) - continue; - if (sparkPlan.getPredecessors(to)==null || !sparkPlan.getPredecessors(to).contains(from)) { - sparkPlan.connect(from, to); - } - } - } - } - } - - private SparkOper startNew(FileSpec fSpec, SparkOper old) throws PlanException{ - POLoad ld = getLoad(); - ld.setLFile(fSpec); - SparkOper ret = getSparkOp(); - ret.add(ld); - sparkPlan.add(ret); - sparkPlan.connect(old, ret); - return ret; - } - - - private POLoad getLoad(){ - POLoad ld = new POLoad(new OperatorKey(scope,nig.getNextNodeId(scope))); - ld.setPc(pigContext); - ld.setIsTmpLoad(true); - return ld; - } - - @Override - public void visitSplit(POSplit op) throws VisitorException{ - try{ - FileSpec fSpec = op.getSplitStore(); - SparkOper sparkOp = endSingleInputPlanWithStr(fSpec); - sparkOp.setSplitter(true); - splitsSeen.put(op.getOperatorKey(), sparkOp); - curSparkOp = startNew(fSpec, sparkOp); - phyToSparkOpMap.put(op, curSparkOp); - }catch(Exception e){ - int errCode = 2034; - String msg = "Error compiling operator " + op.getClass().getSimpleName(); - throw new SparkCompilerException(msg, errCode, PigException.BUG, e); - } - } - - public void visitDistinct(PODistinct op) throws VisitorException{ - try{ - nonBlocking(op); - }catch(Exception e){ - int errCode = 2034; - String msg = "Error compiling operator " + op.getClass().getSimpleName(); - throw new SparkCompilerException(msg, errCode, PigException.BUG, e); - } - } - - private SparkOper endSingleInputPlanWithStr(FileSpec fSpec) throws PlanException{ - if(compiledInputs.length>1) { - int errCode = 2023; - String msg = "Received a multi input physicalPlan when expecting only a single input one."; - throw new PlanException(msg, errCode, PigException.BUG); - } - SparkOper sparkOp = compiledInputs[0]; // Load - POStore str = getStore(); - str.setSFile(fSpec); - sparkOp.plan.addAsLeaf(str); - return sparkOp; - } - - private POStore getStore(){ - POStore st = new POStore(new OperatorKey(scope,nig.getNextNodeId(scope))); - // mark store as tmp store. These could be removed by the - // optimizer, because it wasn't the user requesting it. - st.setIsTmpStore(true); - return st; - } - - @Override - public void visitLoad(POLoad op) throws VisitorException{ - try{ - nonBlocking(op); - phyToSparkOpMap.put(op, curSparkOp); - }catch(Exception e){ - int errCode = 2034; - String msg = "Error compiling operator " + op.getClass().getSimpleName(); - throw new SparkCompilerException(msg, errCode, PigException.BUG, e); - } - } - - @Override - public void visitNative(PONative op) throws VisitorException{ - try{ - SparkOper nativesparkOpper = getNativeSparkOp(op.getNativeMRjar(), op.getParams()); - sparkPlan.add(nativesparkOpper); - sparkPlan.connect(curSparkOp, nativesparkOpper); - phyToSparkOpMap.put(op, nativesparkOpper); - curSparkOp = nativesparkOpper; - }catch(Exception e){ - int errCode = 2034; - String msg = "Error compiling operator " + op.getClass().getSimpleName(); - throw new SparkCompilerException(msg, errCode, PigException.BUG, e); - } - } - - private NativeSparkOper getNativeSparkOp(String sparkJar, String[] parameters) { - return new NativeSparkOper(new OperatorKey(scope,nig.getNextNodeId(scope)), sparkJar, parameters); - } - - @Override - public void visitStore(POStore op) throws VisitorException{ - try{ - nonBlocking(op); - phyToSparkOpMap.put(op, curSparkOp); - if (op.getSFile()!=null && op.getSFile().getFuncSpec()!=null) - curSparkOp.UDFs.add(op.getSFile().getFuncSpec().toString()); - }catch(Exception e){ - int errCode = 2034; - String msg = "Error compiling operator " + op.getClass().getSimpleName(); - throw new SparkCompilerException(msg, errCode, PigException.BUG, e); - } - } - - @Override - public void visitFilter(POFilter op) throws VisitorException{ - try{ - nonBlocking(op); - processUDFs(op.getPlan()); - phyToSparkOpMap.put(op, curSparkOp); - }catch(Exception e){ - int errCode = 2034; - String msg = "Error compiling operator " + op.getClass().getSimpleName(); - throw new SparkCompilerException(msg, errCode, PigException.BUG, e); - } - } - - @Override - public void visitCross(POCross op) throws VisitorException { - try { - nonBlocking(op); - phyToSparkOpMap.put(op, curSparkOp); - } catch (Exception e) { - int errCode = 2034; - String msg = "Error compiling operator " + op.getClass().getSimpleName(); - throw new SparkCompilerException(msg, errCode, PigException.BUG, e); - } - } - - @Override - public void visitStream(POStream op) throws VisitorException{ - try{ - POStreamSpark poStreamSpark = new POStreamSpark(op); - nonBlocking(poStreamSpark); - phyToSparkOpMap.put(op, curSparkOp); - }catch(Exception e){ - int errCode = 2034; - String msg = "Error compiling operator " + op.getClass().getSimpleName(); - throw new SparkCompilerException(msg, errCode, PigException.BUG, e); - } - } - - @Override - public void visitSort(POSort op) throws VisitorException{ - try{ - nonBlocking(op); - phyToSparkOpMap.put(op, curSparkOp); - }catch(Exception e){ - int errCode = 2034; - String msg = "Error compiling operator " + op.getClass().getSimpleName(); - throw new SparkCompilerException(msg, errCode, PigException.BUG, e); - } - } - - @Override - public void visitLimit(POLimit op) throws VisitorException { - try { - nonBlocking(op); - } catch (Exception e) { - int errCode = 2034; - String msg = "Error compiling operator " + op.getClass().getSimpleName(); - throw new SparkCompilerException(msg, errCode, PigException.BUG, e); - } - } - - @Override - public void visitLocalRearrange(POLocalRearrange op) throws VisitorException { - try{ - nonBlocking(op); - List<PhysicalPlan> plans = op.getPlans(); - if(plans!=null) - for(PhysicalPlan ep : plans) - processUDFs(ep); - phyToSparkOpMap.put(op, curSparkOp); - }catch(Exception e){ - int errCode = 2034; - String msg = "Error compiling operator " + op.getClass().getSimpleName(); - throw new SparkCompilerException(msg, errCode, PigException.BUG, e); - } - } - - @Override - public void visitCollectedGroup(POCollectedGroup op) throws VisitorException { - List<PhysicalOperator> roots = curSparkOp.plan.getRoots(); - if(roots.size() != 1){ - int errCode = 2171; - String errMsg = "Expected one but found more then one root physical operator in physical physicalPlan."; - throw new SparkCompilerException(errMsg,errCode,PigException.BUG); - } - - PhysicalOperator phyOp = roots.get(0); - if(! (phyOp instanceof POLoad)){ - int errCode = 2172; - String errMsg = "Expected physical operator at root to be POLoad. Found : "+phyOp.getClass().getCanonicalName(); - throw new SparkCompilerException(errMsg,errCode,PigException.BUG); - } - - LoadFunc loadFunc = ((POLoad)phyOp).getLoadFunc(); - try { - if(!(CollectableLoadFunc.class.isAssignableFrom(loadFunc.getClass()))){ - int errCode = 2249; - throw new SparkCompilerException("While using 'collected' on group; data must be loaded via loader implementing CollectableLoadFunc.", errCode); - } - ((CollectableLoadFunc)loadFunc).ensureAllKeyInstancesInSameSplit(); - } catch (SparkCompilerException e){ - throw (e); - } catch (IOException e) { - int errCode = 2034; - String msg = "Error compiling operator " + op.getClass().getSimpleName(); - throw new SparkCompilerException(msg, errCode, PigException.BUG, e); - } - - try{ - nonBlocking(op); - phyToSparkOpMap.put(op, curSparkOp); - }catch(Exception e){ - int errCode = 2034; - String msg = "Error compiling operator " + op.getClass().getSimpleName(); - throw new SparkCompilerException(msg, errCode, PigException.BUG, e); - } - } - - @Override - public void visitPOForEach(POForEach op) throws VisitorException{ - try{ - nonBlocking(op); - List<PhysicalPlan> plans = op.getInputPlans(); - if (plans != null) { - for (PhysicalPlan ep : plans) { - processUDFs(ep); - } - } - phyToSparkOpMap.put(op, curSparkOp); - } catch (Exception e) { - int errCode = 2034; - String msg = "Error compiling operator " + op.getClass().getSimpleName(); - throw new SparkCompilerException(msg, errCode, PigException.BUG, e); - } - } - - @Override - public void visitGlobalRearrange(POGlobalRearrange op) throws VisitorException{ - try{ - blocking(op); - curSparkOp.customPartitioner = op.getCustomPartitioner(); - phyToSparkOpMap.put(op, curSparkOp); - }catch(Exception e){ - int errCode = 2034; - String msg = "Error compiling operator " + op.getClass().getSimpleName(); - throw new SparkCompilerException(msg, errCode, PigException.BUG, e); - } - } - - @Override - public void visitPackage(POPackage op) throws VisitorException{ - try{ - nonBlocking(op); - phyToSparkOpMap.put(op, curSparkOp); - if (op.getPkgr().getPackageType() == Packager.PackageType.JOIN) { - curSparkOp.markRegularJoin(); - } else if (op.getPkgr().getPackageType() == Packager.PackageType.GROUP) { - if (op.getNumInps() == 1) { - curSparkOp.markGroupBy(); - } else if (op.getNumInps() > 1) { - curSparkOp.markCogroup(); - } - } - - }catch(Exception e){ - int errCode = 2034; - String msg = "Error compiling operator " + op.getClass().getSimpleName(); - throw new SparkCompilerException(msg, errCode, PigException.BUG, e); - } - } - - @Override - public void visitUnion(POUnion op) throws VisitorException{ - try{ - nonBlocking(op); - phyToSparkOpMap.put(op, curSparkOp); - }catch(Exception e){ - int errCode = 2034; - String msg = "Error compiling operator " + op.getClass().getSimpleName(); - throw new SparkCompilerException(msg, errCode, PigException.BUG, e); - } - } - - - @Override - public void visitSkewedJoin(POSkewedJoin op) throws VisitorException { - //TODO - } - - @Override - public void visitFRJoin(POFRJoin op) throws VisitorException { - //TODO - } - - @Override - public void visitMergeJoin(POMergeJoin joinOp) throws VisitorException { - //TODO - } - - private void processUDFs(PhysicalPlan plan) throws VisitorException{ - if(plan!=null){ - //Process Scalars (UDF with referencedOperators) - ScalarPhyFinder scalarPhyFinder = new ScalarPhyFinder(plan); - scalarPhyFinder.visit(); - curSparkOp.scalars.addAll(scalarPhyFinder.getScalars()); - - //Process UDFs - udfFinder.setPlan(plan); - udfFinder.visit(); - curSparkOp.UDFs.addAll(udfFinder.getUDFs()); - } - } - - private void nonBlocking(PhysicalOperator op) throws PlanException, IOException{ - SparkOper sparkOp=null; - if (compiledInputs.length == 1) { - sparkOp = compiledInputs[0]; - } else { - sparkOp = merge(compiledInputs); - } - sparkOp.plan.addAsLeaf(op); - curSparkOp = sparkOp; - } - - private void blocking(PhysicalOperator op) throws PlanException, IOException{ - SparkOper sparkOp = getSparkOp(); - sparkPlan.add(sparkOp); - for(SparkOper compileInput: compiledInputs){ - sparkPlan.connect(compileInput, sparkOp); - } - sparkOp.plan.addAsLeaf(op); - curSparkOp = sparkOp; - } - - private SparkOper merge(SparkOper[] compiledInputs)throws PlanException { - SparkOper ret = getSparkOp(); - sparkPlan.add(ret); - - Set<SparkOper> toBeConnected = new HashSet<SparkOper>(); - List<SparkOper> toBeRemoved = new ArrayList<SparkOper>(); - - List<PhysicalPlan> toBeMerged = new ArrayList<PhysicalPlan>(); - - for (SparkOper sparkOp : compiledInputs) { - toBeRemoved.add(sparkOp); - toBeMerged.add(sparkOp.plan); - List<SparkOper> predecessors = sparkPlan.getPredecessors(sparkOp); - if( predecessors != null){ - for( SparkOper predecessorSparkOp: predecessors){ - toBeConnected.add(predecessorSparkOp); - } - } - } - merge(ret.plan, toBeMerged); - - Iterator<SparkOper> it = toBeConnected.iterator(); - while(it.hasNext()) - sparkPlan.connect(it.next(), ret); - for(SparkOper removeSparkOp : toBeRemoved){ - if(removeSparkOp.requestedParallelism > ret.requestedParallelism) - ret.requestedParallelism = removeSparkOp.requestedParallelism; - for (String udf:removeSparkOp.UDFs) - { - if (!ret.UDFs.contains(udf)) - ret.UDFs.add(udf); - } - // We also need to change scalar marking - for(PhysicalOperator physOp: removeSparkOp.scalars) { - if(!ret.scalars.contains(physOp)) { - ret.scalars.add(physOp); - } - } - Set<PhysicalOperator> opsToChange = new HashSet<PhysicalOperator>(); - for (Map.Entry<PhysicalOperator, SparkOper> entry : phyToSparkOpMap.entrySet()) { - if (entry.getValue()==removeSparkOp) { - opsToChange.add(entry.getKey()); - } - } - for (PhysicalOperator op : opsToChange) { - phyToSparkOpMap.put(op, ret); - } - - sparkPlan.remove(removeSparkOp); - } - return ret; - } - - /** - * The merge of a list of plans into a single physicalPlan - * @param <O> - * @param <E> - * @param finPlan - Final Plan into which the list of plans is merged - * @param plans - list of plans to be merged - * @throws PlanException - */ - private <O extends Operator<?>, E extends OperatorPlan<O>> void merge( - E finPlan, List<E> plans) throws PlanException { - for (E e : plans) { - finPlan.merge(e); - } - } + private SparkOperator curSparkOp; + + private String scope; + + private SparkOperator[] compiledInputs = null; + + private Map<OperatorKey, SparkOperator> splitsSeen; + + private NodeIdGenerator nig; + + private Map<PhysicalOperator, SparkOperator> phyToSparkOpMap; + private UDFFinder udfFinder; + + public SparkCompiler(PhysicalPlan physicalPlan, PigContext pigContext) { + super(physicalPlan, + new DepthFirstWalker<PhysicalOperator, PhysicalPlan>( + physicalPlan)); + this.physicalPlan = physicalPlan; + this.pigContext = pigContext; + this.sparkPlan = new SparkOperPlan(); + this.phyToSparkOpMap = new HashMap<PhysicalOperator, SparkOperator>(); + this.udfFinder = new UDFFinder(); + this.nig = NodeIdGenerator.getGenerator(); + this.splitsSeen = new HashMap<OperatorKey, SparkOperator>(); + + } + + public void compile() throws IOException, PlanException, VisitorException { + List<PhysicalOperator> roots = physicalPlan.getRoots(); + if ((roots == null) || (roots.size() <= 0)) { + int errCode = 2053; + String msg = "Internal error. Did not find roots in the physical physicalPlan."; + throw new SparkCompilerException(msg, errCode, PigException.BUG); + } + scope = roots.get(0).getOperatorKey().getScope(); + List<PhysicalOperator> leaves = physicalPlan.getLeaves(); + + if (!pigContext.inIllustrator) + for (PhysicalOperator op : leaves) { + if (!(op instanceof POStore)) { + int errCode = 2025; + String msg = "Expected leaf of reduce physicalPlan to " + + "always be POStore. Found " + + op.getClass().getSimpleName(); + throw new SparkCompilerException(msg, errCode, + PigException.BUG); + } + } + + // get all stores and nativeSpark operators, sort them in order(operator + // id) + // and compile their plans + List<POStore> stores = PlanHelper.getPhysicalOperators(physicalPlan, + POStore.class); + List<PONative> nativeSparks = PlanHelper.getPhysicalOperators( + physicalPlan, PONative.class); + List<PhysicalOperator> ops; + if (!pigContext.inIllustrator) { + ops = new ArrayList<PhysicalOperator>(stores.size() + + nativeSparks.size()); + ops.addAll(stores); + } else { + ops = new ArrayList<PhysicalOperator>(leaves.size() + + nativeSparks.size()); + ops.addAll(leaves); + } + ops.addAll(nativeSparks); + Collections.sort(ops); + + for (PhysicalOperator op : ops) { + compile(op); + } + } + + /** + * Compiles the physicalPlan below op into a Spark Operator and stores it in + * curSparkOp. + * + * @param op + * @throws IOException + * @throws PlanException + * @throws VisitorException + */ + private void compile(PhysicalOperator op) throws IOException, + PlanException, VisitorException { + SparkOperator[] prevCompInp = compiledInputs; + + List<PhysicalOperator> predecessors = physicalPlan.getPredecessors(op); + if (op instanceof PONative) { + // the predecessor (store) has already been processed + // don't process it again + } else if (predecessors != null && predecessors.size() > 0) { + // When processing an entire script (multiquery), we can + // get into a situation where a load has + // predecessors. This means that it depends on some store + // earlier in the physicalPlan. We need to take that dependency + // and connect the respective Spark operators, while at the + // same time removing the connection between the Physical + // operators. That way the jobs will run in the right + // order. + if (op instanceof POLoad) { + + if (predecessors.size() != 1) { + int errCode = 2125; + String msg = "Expected at most one predecessor of load. Got " + + predecessors.size(); + throw new PlanException(msg, errCode, PigException.BUG); + } + + PhysicalOperator p = predecessors.get(0); + SparkOperator oper = null; + if (p instanceof POStore || p instanceof PONative) { + oper = phyToSparkOpMap.get(p); + } else { + int errCode = 2126; + String msg = "Predecessor of load should be a store or spark operator. Got " + + p.getClass(); + throw new PlanException(msg, errCode, PigException.BUG); + } + + // Need new operator + curSparkOp = getSparkOp(); + curSparkOp.add(op); + sparkPlan.add(curSparkOp); + physicalPlan.disconnect(op, p); + sparkPlan.connect(oper, curSparkOp); + phyToSparkOpMap.put(op, curSparkOp); + return; + } + + Collections.sort(predecessors); + compiledInputs = new SparkOperator[predecessors.size()]; + int i = -1; + for (PhysicalOperator pred : predecessors) { + if (pred instanceof POSplit + && splitsSeen.containsKey(pred.getOperatorKey())) { + compiledInputs[++i] = startNew( + ((POSplit) pred).getSplitStore(), + splitsSeen.get(pred.getOperatorKey())); + continue; + } + compile(pred); + compiledInputs[++i] = curSparkOp; + } + } else { + // No predecessors. Mostly a load. But this is where + // we start. We create a new sparkOp and add its first + // operator op. Also this should be added to the sparkPlan. + curSparkOp = getSparkOp(); + curSparkOp.add(op); + if (op != null && op instanceof POLoad) { + if (((POLoad) op).getLFile() != null + && ((POLoad) op).getLFile().getFuncSpec() != null) + curSparkOp.UDFs.add(((POLoad) op).getLFile().getFuncSpec() + .toString()); + } + sparkPlan.add(curSparkOp); + phyToSparkOpMap.put(op, curSparkOp); + return; + } + op.visit(this); + compiledInputs = prevCompInp; + } + + private SparkOperator getSparkOp() { + return new SparkOperator(OperatorKey.genOpKey(scope)); + } + + public SparkOperPlan getSparkPlan() { + return sparkPlan; + } + + public void connectSoftLink() throws PlanException, IOException { + for (PhysicalOperator op : physicalPlan) { + if (physicalPlan.getSoftLinkPredecessors(op) != null) { + for (PhysicalOperator pred : physicalPlan + .getSoftLinkPredecessors(op)) { + SparkOperator from = phyToSparkOpMap.get(pred); + SparkOperator to = phyToSparkOpMap.get(op); + if (from == to) + continue; + if (sparkPlan.getPredecessors(to) == null + || !sparkPlan.getPredecessors(to).contains(from)) { + sparkPlan.connect(from, to); + } + } + } + } + } + + private SparkOperator startNew(FileSpec fSpec, SparkOperator old) + throws PlanException { + POLoad ld = getLoad(); + ld.setLFile(fSpec); + SparkOperator ret = getSparkOp(); + ret.add(ld); + sparkPlan.add(ret); + sparkPlan.connect(old, ret); + return ret; + } + + private POLoad getLoad() { + POLoad ld = new POLoad(new OperatorKey(scope, nig.getNextNodeId(scope))); + ld.setPc(pigContext); + ld.setIsTmpLoad(true); + return ld; + } + + @Override + public void visitSplit(POSplit op) throws VisitorException { + try { + FileSpec fSpec = op.getSplitStore(); + SparkOperator sparkOp = endSingleInputPlanWithStr(fSpec); + sparkOp.setSplitter(true); + splitsSeen.put(op.getOperatorKey(), sparkOp); + curSparkOp = startNew(fSpec, sparkOp); + phyToSparkOpMap.put(op, curSparkOp); + } catch (Exception e) { + int errCode = 2034; + String msg = "Error compiling operator " + + op.getClass().getSimpleName(); + throw new SparkCompilerException(msg, errCode, PigException.BUG, e); + } + } + + public void visitDistinct(PODistinct op) throws VisitorException { + try { + nonBlocking(op); + } catch (Exception e) { + int errCode = 2034; + String msg = "Error compiling operator " + + op.getClass().getSimpleName(); + throw new SparkCompilerException(msg, errCode, PigException.BUG, e); + } + } + + private SparkOperator endSingleInputPlanWithStr(FileSpec fSpec) + throws PlanException { + if (compiledInputs.length > 1) { + int errCode = 2023; + String msg = "Received a multi input physicalPlan when expecting only a single input one."; + throw new PlanException(msg, errCode, PigException.BUG); + } + SparkOperator sparkOp = compiledInputs[0]; // Load + POStore str = getStore(); + str.setSFile(fSpec); + sparkOp.physicalPlan.addAsLeaf(str); + return sparkOp; + } + + private POStore getStore() { + POStore st = new POStore(new OperatorKey(scope, + nig.getNextNodeId(scope))); + // mark store as tmp store. These could be removed by the + // optimizer, because it wasn't the user requesting it. + st.setIsTmpStore(true); + return st; + } + + @Override + public void visitLoad(POLoad op) throws VisitorException { + try { + nonBlocking(op); + phyToSparkOpMap.put(op, curSparkOp); + } catch (Exception e) { + int errCode = 2034; + String msg = "Error compiling operator " + + op.getClass().getSimpleName(); + throw new SparkCompilerException(msg, errCode, PigException.BUG, e); + } + } + + @Override + public void visitNative(PONative op) throws VisitorException { + try { + SparkOperator nativesparkOpper = getNativeSparkOp( + op.getNativeMRjar(), op.getParams()); + sparkPlan.add(nativesparkOpper); + sparkPlan.connect(curSparkOp, nativesparkOpper); + phyToSparkOpMap.put(op, nativesparkOpper); + curSparkOp = nativesparkOpper; + } catch (Exception e) { + int errCode = 2034; + String msg = "Error compiling operator " + + op.getClass().getSimpleName(); + throw new SparkCompilerException(msg, errCode, PigException.BUG, e); + } + } + + private NativeSparkOperator getNativeSparkOp(String sparkJar, + String[] parameters) { + return new NativeSparkOperator(new OperatorKey(scope, + nig.getNextNodeId(scope)), sparkJar, parameters); + } + + @Override + public void visitStore(POStore op) throws VisitorException { + try { + nonBlocking(op); + phyToSparkOpMap.put(op, curSparkOp); + if (op.getSFile() != null && op.getSFile().getFuncSpec() != null) + curSparkOp.UDFs.add(op.getSFile().getFuncSpec().toString()); + } catch (Exception e) { + int errCode = 2034; + String msg = "Error compiling operator " + + op.getClass().getSimpleName(); + throw new SparkCompilerException(msg, errCode, PigException.BUG, e); + } + } + + @Override + public void visitFilter(POFilter op) throws VisitorException { + try { + nonBlocking(op); + processUDFs(op.getPlan()); + phyToSparkOpMap.put(op, curSparkOp); + } catch (Exception e) { + int errCode = 2034; + String msg = "Error compiling operator " + + op.getClass().getSimpleName(); + throw new SparkCompilerException(msg, errCode, PigException.BUG, e); + } + } + + @Override + public void visitCross(POCross op) throws VisitorException { + try { + nonBlocking(op); + phyToSparkOpMap.put(op, curSparkOp); + } catch (Exception e) { + int errCode = 2034; + String msg = "Error compiling operator " + + op.getClass().getSimpleName(); + throw new SparkCompilerException(msg, errCode, PigException.BUG, e); + } + } + + @Override + public void visitStream(POStream op) throws VisitorException { + try { + POStreamSpark poStreamSpark = new POStreamSpark(op); + nonBlocking(poStreamSpark); + phyToSparkOpMap.put(op, curSparkOp); + } catch (Exception e) { + int errCode = 2034; + String msg = "Error compiling operator " + + op.getClass().getSimpleName(); + throw new SparkCompilerException(msg, errCode, PigException.BUG, e); + } + } + + @Override + public void visitSort(POSort op) throws VisitorException { + try { + nonBlocking(op); + phyToSparkOpMap.put(op, curSparkOp); + } catch (Exception e) { + int errCode = 2034; + String msg = "Error compiling operator " + + op.getClass().getSimpleName(); + throw new SparkCompilerException(msg, errCode, PigException.BUG, e); + } + } + + @Override + public void visitLimit(POLimit op) throws VisitorException { + try { + nonBlocking(op); + } catch (Exception e) { + int errCode = 2034; + String msg = "Error compiling operator " + + op.getClass().getSimpleName(); + throw new SparkCompilerException(msg, errCode, PigException.BUG, e); + } + } + + @Override + public void visitLocalRearrange(POLocalRearrange op) + throws VisitorException { + try { + nonBlocking(op); + List<PhysicalPlan> plans = op.getPlans(); + if (plans != null) + for (PhysicalPlan ep : plans) + processUDFs(ep); + phyToSparkOpMap.put(op, curSparkOp); + } catch (Exception e) { + int errCode = 2034; + String msg = "Error compiling operator " + + op.getClass().getSimpleName(); + throw new SparkCompilerException(msg, errCode, PigException.BUG, e); + } + } + + @Override + public void visitCollectedGroup(POCollectedGroup op) + throws VisitorException { + List<PhysicalOperator> roots = curSparkOp.physicalPlan.getRoots(); + if (roots.size() != 1) { + int errCode = 2171; + String errMsg = "Expected one but found more then one root physical operator in physical physicalPlan."; + throw new SparkCompilerException(errMsg, errCode, PigException.BUG); + } + + PhysicalOperator phyOp = roots.get(0); + if (!(phyOp instanceof POLoad)) { + int errCode = 2172; + String errMsg = "Expected physical operator at root to be POLoad. Found : " + + phyOp.getClass().getCanonicalName(); + throw new SparkCompilerException(errMsg, errCode, PigException.BUG); + } + + LoadFunc loadFunc = ((POLoad) phyOp).getLoadFunc(); + try { + if (!(CollectableLoadFunc.class.isAssignableFrom(loadFunc + .getClass()))) { + int errCode = 2249; + throw new SparkCompilerException( + "While using 'collected' on group; data must be loaded via loader implementing CollectableLoadFunc.", + errCode); + } + ((CollectableLoadFunc) loadFunc).ensureAllKeyInstancesInSameSplit(); + } catch (SparkCompilerException e) { + throw (e); + } catch (IOException e) { + int errCode = 2034; + String msg = "Error compiling operator " + + op.getClass().getSimpleName(); + throw new SparkCompilerException(msg, errCode, PigException.BUG, e); + } + + try { + nonBlocking(op); + phyToSparkOpMap.put(op, curSparkOp); + } catch (Exception e) { + int errCode = 2034; + String msg = "Error compiling operator " + + op.getClass().getSimpleName(); + throw new SparkCompilerException(msg, errCode, PigException.BUG, e); + } + } + + @Override + public void visitPOForEach(POForEach op) throws VisitorException { + try { + nonBlocking(op); + List<PhysicalPlan> plans = op.getInputPlans(); + if (plans != null) { + for (PhysicalPlan ep : plans) { + processUDFs(ep); + } + } + phyToSparkOpMap.put(op, curSparkOp); + } catch (Exception e) { + int errCode = 2034; + String msg = "Error compiling operator " + + op.getClass().getSimpleName(); + throw new SparkCompilerException(msg, errCode, PigException.BUG, e); + } + } + + @Override + public void visitGlobalRearrange(POGlobalRearrange op) + throws VisitorException { + try { + blocking(op); + curSparkOp.customPartitioner = op.getCustomPartitioner(); + phyToSparkOpMap.put(op, curSparkOp); + } catch (Exception e) { + int errCode = 2034; + String msg = "Error compiling operator " + + op.getClass().getSimpleName(); + throw new SparkCompilerException(msg, errCode, PigException.BUG, e); + } + } + + @Override + public void visitPackage(POPackage op) throws VisitorException { + try { + nonBlocking(op); + phyToSparkOpMap.put(op, curSparkOp); + if (op.getPkgr().getPackageType() == Packager.PackageType.JOIN) { + curSparkOp.markRegularJoin(); + } else if (op.getPkgr().getPackageType() == Packager.PackageType.GROUP) { + if (op.getNumInps() == 1) { + curSparkOp.markGroupBy(); + } else if (op.getNumInps() > 1) { + curSparkOp.markCogroup(); + } + } + + } catch (Exception e) { + int errCode = 2034; + String msg = "Error compiling operator " + + op.getClass().getSimpleName(); + throw new SparkCompilerException(msg, errCode, PigException.BUG, e); + } + } + + @Override + public void visitUnion(POUnion op) throws VisitorException { + try { + nonBlocking(op); + phyToSparkOpMap.put(op, curSparkOp); + } catch (Exception e) { + int errCode = 2034; + String msg = "Error compiling operator " + + op.getClass().getSimpleName(); + throw new SparkCompilerException(msg, errCode, PigException.BUG, e); + } + } + + @Override + public void visitSkewedJoin(POSkewedJoin op) throws VisitorException { + // TODO + } + + @Override + public void visitFRJoin(POFRJoin op) throws VisitorException { + // TODO + } + + @Override + public void visitMergeJoin(POMergeJoin joinOp) throws VisitorException { + // TODO + } + + private void processUDFs(PhysicalPlan plan) throws VisitorException { + if (plan != null) { + // Process Scalars (UDF with referencedOperators) + ScalarPhyFinder scalarPhyFinder = new ScalarPhyFinder(plan); + scalarPhyFinder.visit(); + curSparkOp.scalars.addAll(scalarPhyFinder.getScalars()); + + // Process UDFs + udfFinder.setPlan(plan); + udfFinder.visit(); + curSparkOp.UDFs.addAll(udfFinder.getUDFs()); + } + } + + private void nonBlocking(PhysicalOperator op) throws PlanException, + IOException { + SparkOperator sparkOp = null; + if (compiledInputs.length == 1) { + sparkOp = compiledInputs[0]; + } else { + sparkOp = merge(compiledInputs); + } + sparkOp.physicalPlan.addAsLeaf(op); + curSparkOp = sparkOp; + } + + private void blocking(PhysicalOperator op) throws PlanException, + IOException { + SparkOperator sparkOp = getSparkOp(); + sparkPlan.add(sparkOp); + for (SparkOperator compileInput : compiledInputs) { + sparkPlan.connect(compileInput, sparkOp); + } + sparkOp.physicalPlan.addAsLeaf(op); + curSparkOp = sparkOp; + } + + private SparkOperator merge(SparkOperator[] compiledInputs) + throws PlanException { + SparkOperator ret = getSparkOp(); + sparkPlan.add(ret); + + Set<SparkOperator> toBeConnected = new HashSet<SparkOperator>(); + List<SparkOperator> toBeRemoved = new ArrayList<SparkOperator>(); + + List<PhysicalPlan> toBeMerged = new ArrayList<PhysicalPlan>(); + + for (SparkOperator sparkOp : compiledInputs) { + toBeRemoved.add(sparkOp); + toBeMerged.add(sparkOp.physicalPlan); + List<SparkOperator> predecessors = sparkPlan + .getPredecessors(sparkOp); + if (predecessors != null) { + for (SparkOperator predecessorSparkOp : predecessors) { + toBeConnected.add(predecessorSparkOp); + } + } + } + merge(ret.physicalPlan, toBeMerged); + + Iterator<SparkOperator> it = toBeConnected.iterator(); + while (it.hasNext()) + sparkPlan.connect(it.next(), ret); + for (SparkOperator removeSparkOp : toBeRemoved) { + if (removeSparkOp.requestedParallelism > ret.requestedParallelism) + ret.requestedParallelism = removeSparkOp.requestedParallelism; + for (String udf : removeSparkOp.UDFs) { + if (!ret.UDFs.contains(udf)) + ret.UDFs.add(udf); + } + // We also need to change scalar marking + for (PhysicalOperator physOp : removeSparkOp.scalars) { + if (!ret.scalars.contains(physOp)) { + ret.scalars.add(physOp); + } + } + Set<PhysicalOperator> opsToChange = new HashSet<PhysicalOperator>(); + for (Map.Entry<PhysicalOperator, SparkOperator> entry : phyToSparkOpMap + .entrySet()) { + if (entry.getValue() == removeSparkOp) { + opsToChange.add(entry.getKey()); + } + } + for (PhysicalOperator op : opsToChange) { + phyToSparkOpMap.put(op, ret); + } + + sparkPlan.remove(removeSparkOp); + } + return ret; + } + + /** + * The merge of a list of plans into a single physicalPlan + * + * @param <O> + * @param <E> + * @param finPlan + * - Final Plan into which the list of plans is merged + * @param plans + * - list of plans to be merged + * @throws PlanException + */ + private <O extends Operator<?>, E extends OperatorPlan<O>> void merge( + E finPlan, List<E> plans) throws PlanException { + for (E e : plans) { + finPlan.merge(e); + } + } }
Modified: pig/branches/spark/src/org/apache/pig/backend/hadoop/executionengine/spark/plan/SparkOpPlanVisitor.java URL: http://svn.apache.org/viewvc/pig/branches/spark/src/org/apache/pig/backend/hadoop/executionengine/spark/plan/SparkOpPlanVisitor.java?rev=1665404&r1=1665403&r2=1665404&view=diff ============================================================================== --- pig/branches/spark/src/org/apache/pig/backend/hadoop/executionengine/spark/plan/SparkOpPlanVisitor.java (original) +++ pig/branches/spark/src/org/apache/pig/backend/hadoop/executionengine/spark/plan/SparkOpPlanVisitor.java Tue Mar 10 04:37:36 2015 @@ -24,15 +24,18 @@ import org.apache.pig.impl.plan.VisitorE /** * A visitor for the SparkOperPlan class */ -public class SparkOpPlanVisitor extends PlanVisitor<SparkOper, SparkOperPlan> { +public class SparkOpPlanVisitor extends + PlanVisitor<SparkOperator, SparkOperPlan> { - public SparkOpPlanVisitor(SparkOperPlan plan, PlanWalker<SparkOper, SparkOperPlan> walker) { - super(plan, walker); - // TODO Auto-generated constructor stub - } + public SparkOpPlanVisitor(SparkOperPlan plan, + PlanWalker<SparkOperator, SparkOperPlan> walker) { + super(plan, walker); + // TODO Auto-generated constructor stub + } - public void visitSparkOp(SparkOper sparkOper) throws VisitorException { - // TODO Auto-generated method stub - } + public void visitSparkOp(SparkOperator sparkOperator) + throws VisitorException { + // TODO Auto-generated method stub + } } Modified: pig/branches/spark/src/org/apache/pig/backend/hadoop/executionengine/spark/plan/SparkOperPlan.java URL: http://svn.apache.org/viewvc/pig/branches/spark/src/org/apache/pig/backend/hadoop/executionengine/spark/plan/SparkOperPlan.java?rev=1665404&r1=1665403&r2=1665404&view=diff ============================================================================== --- pig/branches/spark/src/org/apache/pig/backend/hadoop/executionengine/spark/plan/SparkOperPlan.java (original) +++ pig/branches/spark/src/org/apache/pig/backend/hadoop/executionengine/spark/plan/SparkOperPlan.java Tue Mar 10 04:37:36 2015 @@ -24,23 +24,23 @@ import org.apache.pig.impl.plan.Operator import org.apache.pig.impl.plan.VisitorException; /** - * A Plan used to create the plan of - * Spark Operators + * A Plan used to create the physicalPlan of Spark Operators */ -public class SparkOperPlan extends OperatorPlan<SparkOper> { +public class SparkOperPlan extends OperatorPlan<SparkOperator> { - @Override - public String toString() { - ByteArrayOutputStream baos = new ByteArrayOutputStream(); - PrintStream ps = new PrintStream(baos); - SparkPrinter printer = new SparkPrinter(ps, this); - printer.setVerbose(true); - try { - printer.visit(); - } catch (VisitorException e) { - // TODO Auto-generated catch block - throw new RuntimeException("Unable to get String representation of plan:"+e, e ); - } - return baos.toString(); - } + @Override + public String toString() { + ByteArrayOutputStream baos = new ByteArrayOutputStream(); + PrintStream ps = new PrintStream(baos); + SparkPrinter printer = new SparkPrinter(ps, this); + printer.setVerbose(true); + try { + printer.visit(); + } catch (VisitorException e) { + // TODO Auto-generated catch block + throw new RuntimeException( + "Unable to get String representation of plan:" + e, e); + } + return baos.toString(); + } } Added: pig/branches/spark/src/org/apache/pig/backend/hadoop/executionengine/spark/plan/SparkOperator.java URL: http://svn.apache.org/viewvc/pig/branches/spark/src/org/apache/pig/backend/hadoop/executionengine/spark/plan/SparkOperator.java?rev=1665404&view=auto ============================================================================== --- pig/branches/spark/src/org/apache/pig/backend/hadoop/executionengine/spark/plan/SparkOperator.java (added) +++ pig/branches/spark/src/org/apache/pig/backend/hadoop/executionengine/spark/plan/SparkOperator.java Tue Mar 10 04:37:36 2015 @@ -0,0 +1,195 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.pig.backend.hadoop.executionengine.spark.plan; + +import java.util.HashSet; +import java.util.Set; + +import org.apache.pig.backend.hadoop.executionengine.physicalLayer.PhysicalOperator; +import org.apache.pig.backend.hadoop.executionengine.physicalLayer.plans.PhysicalPlan; +import org.apache.pig.impl.plan.Operator; +import org.apache.pig.impl.plan.OperatorKey; +import org.apache.pig.impl.plan.VisitorException; + +/** + * An operator model for a Spark job. Acts as a host to the plans that will + * execute in spark. + */ +public class SparkOperator extends Operator<SparkOpPlanVisitor> { + private static enum OPER_FEATURE { + NONE, + // Indicate if this job is a sampling job + SAMPLER, + // Indicate if this job is a merge indexer + INDEXER, + // Indicate if this job is a group by job + GROUPBY, + // Indicate if this job is a cogroup job + COGROUP, + // Indicate if this job is a regular join job + HASHJOIN; + }; + + public PhysicalPlan physicalPlan; + + public Set<String> UDFs; + + /* Name of the Custom Partitioner used */ + public String customPartitioner = null; + + public Set<PhysicalOperator> scalars; + + public boolean isUDFComparatorUsed = false; + + public int requestedParallelism = -1; + + private OPER_FEATURE feature = OPER_FEATURE.NONE; + + private boolean splitter = false; + + // Name of the partition file generated by sampling process, + // Used by Skewed Join + private String skewedJoinPartitionFile; + + private boolean usingTypedComparator = false; + + private boolean combineSmallSplits = true; + + public SparkOperator(OperatorKey k) { + super(k); + physicalPlan = new PhysicalPlan(); + UDFs = new HashSet<String>(); + scalars = new HashSet<PhysicalOperator>(); + } + + @Override + public boolean supportsMultipleInputs() { + return true; + } + + @Override + public boolean supportsMultipleOutputs() { + return true; + } + + @Override + public String name() { + String udfStr = getUDFsAsStr(); + StringBuilder sb = new StringBuilder("Spark" + "(" + + requestedParallelism + (udfStr.equals("") ? "" : ",") + + udfStr + ")" + " - " + mKey.toString()); + return sb.toString(); + } + + private String getUDFsAsStr() { + StringBuilder sb = new StringBuilder(); + if (UDFs != null && UDFs.size() > 0) { + for (String str : UDFs) { + sb.append(str.substring(str.lastIndexOf('.') + 1)); + sb.append(','); + } + sb.deleteCharAt(sb.length() - 1); + } + return sb.toString(); + } + + public void add(PhysicalOperator physicalOper) { + this.physicalPlan.add(physicalOper); + } + + @Override + public void visit(SparkOpPlanVisitor v) throws VisitorException { + v.visitSparkOp(this); + } + + public boolean isGroupBy() { + return (feature == OPER_FEATURE.GROUPBY); + } + + public void markGroupBy() { + feature = OPER_FEATURE.GROUPBY; + } + + public boolean isCogroup() { + return (feature == OPER_FEATURE.COGROUP); + } + + public void markCogroup() { + feature = OPER_FEATURE.COGROUP; + } + + public boolean isRegularJoin() { + return (feature == OPER_FEATURE.HASHJOIN); + } + + public void markRegularJoin() { + feature = OPER_FEATURE.HASHJOIN; + } + + public int getRequestedParallelism() { + return requestedParallelism; + } + + public void setSplitter(boolean spl) { + splitter = spl; + } + + public boolean isSplitter() { + return splitter; + } + + public boolean isSampler() { + return (feature == OPER_FEATURE.SAMPLER); + } + + public void markSampler() { + feature = OPER_FEATURE.SAMPLER; + } + + public void setSkewedJoinPartitionFile(String file) { + skewedJoinPartitionFile = file; + } + + public String getSkewedJoinPartitionFile() { + return skewedJoinPartitionFile; + } + + protected boolean usingTypedComparator() { + return usingTypedComparator; + } + + protected void useTypedComparator(boolean useTypedComparator) { + this.usingTypedComparator = useTypedComparator; + } + + protected void noCombineSmallSplits() { + combineSmallSplits = false; + } + + public boolean combineSmallSplits() { + return combineSmallSplits; + } + + public boolean isIndexer() { + return (feature == OPER_FEATURE.INDEXER); + } + + public void markIndexer() { + feature = OPER_FEATURE.INDEXER; + } +} Modified: pig/branches/spark/src/org/apache/pig/backend/hadoop/executionengine/spark/plan/SparkPOPackageAnnotator.java URL: http://svn.apache.org/viewvc/pig/branches/spark/src/org/apache/pig/backend/hadoop/executionengine/spark/plan/SparkPOPackageAnnotator.java?rev=1665404&r1=1665403&r2=1665404&view=diff ============================================================================== --- pig/branches/spark/src/org/apache/pig/backend/hadoop/executionengine/spark/plan/SparkPOPackageAnnotator.java (original) +++ pig/branches/spark/src/org/apache/pig/backend/hadoop/executionengine/spark/plan/SparkPOPackageAnnotator.java Tue Mar 10 04:37:36 2015 @@ -34,132 +34,141 @@ import org.apache.pig.impl.plan.optimize import org.apache.pig.impl.util.Pair; /** - * This visitor visits the SparkPlan and does the following - * for each SparkOper - * - visits the POPackage in the plan and finds the corresponding - * POLocalRearrange(s). It then annotates the POPackage - * with information about which columns in the "value" are present in the - * "key" and will need to stitched in to the "value" + * This visitor visits the SparkPlan and does the following for each + * SparkOperator - visits the POPackage in the plan and finds the corresponding + * POLocalRearrange(s). It then annotates the POPackage with information about + * which columns in the "value" are present in the "key" and will need to + * stitched in to the "value" */ public class SparkPOPackageAnnotator extends SparkOpPlanVisitor { - public SparkPOPackageAnnotator(SparkOperPlan plan) { - super(plan, new DepthFirstWalker<SparkOper, SparkOperPlan>(plan)); - } - - @Override - public void visitSparkOp(SparkOper sparkOp) throws VisitorException { - if(!sparkOp.plan.isEmpty()) { - PackageDiscoverer pkgDiscoverer = new PackageDiscoverer(sparkOp.plan); - pkgDiscoverer.visit(); - POPackage pkg = pkgDiscoverer.getPkg(); - if(pkg != null) { - handlePackage(sparkOp, pkg); - } - } - } - - private void handlePackage(SparkOper pkgSparkOp, POPackage pkg) throws VisitorException { - int lrFound = 0; - List<SparkOper> predecessors = this.mPlan.getPredecessors(pkgSparkOp); - if (predecessors != null && predecessors.size() > 0) { - for (SparkOper pred : predecessors) { - lrFound += patchPackage(pred, pkgSparkOp, pkg); - if(lrFound == pkg.getNumInps()) { - break; - } - } - } - if (lrFound != pkg.getNumInps()) { - int errCode = 2086; - String msg = "Unexpected problem during optimization. Could not find all LocalRearrange operators."; - throw new OptimizerException(msg, errCode, PigException.BUG); - } - } - - private int patchPackage(SparkOper pred , SparkOper pkgSparkOp, POPackage pkg) throws VisitorException { - LoRearrangeDiscoverer lrDiscoverer = new LoRearrangeDiscoverer(pred.plan, pkg); - lrDiscoverer.visit(); - // let our caller know if we managed to patch - // the package - return lrDiscoverer.getLoRearrangeFound(); - } - - - static class PackageDiscoverer extends PhyPlanVisitor { - - private POPackage pkg; - - public PackageDiscoverer(PhysicalPlan plan) { - super(plan, new DepthFirstWalker<PhysicalOperator, PhysicalPlan>(plan)); - } - - @Override - public void visitPackage(POPackage pkg) throws VisitorException { - this.pkg = pkg; - }; - - /** - * @return the pkg - */ - public POPackage getPkg() { - return pkg; - } - - } - - - static class LoRearrangeDiscoverer extends PhyPlanVisitor { - - private int loRearrangeFound = 0; - private POPackage pkg; - - public LoRearrangeDiscoverer(PhysicalPlan plan, POPackage pkg) { - super(plan, new DepthFirstWalker<PhysicalOperator, PhysicalPlan>(plan)); - this.pkg = pkg; - } - - @Override - public void visitLocalRearrange(POLocalRearrange lrearrange) throws VisitorException { - loRearrangeFound++; - Map<Integer,Pair<Boolean, Map<Integer, Integer>>> keyInfo; - - if (pkg.getPkgr() instanceof LitePackager) { - if(lrearrange.getIndex() != 0) { - // Throw some exception here - throw new RuntimeException("POLocalRearrange for POPackageLite cannot have index other than 0, but has index - "+lrearrange.getIndex()); - } - } - - // annotate the package with information from the LORearrange - // update the keyInfo information if already present in the POPackage - keyInfo = pkg.getPkgr().getKeyInfo(); - if(keyInfo == null) - keyInfo = new HashMap<Integer, Pair<Boolean, Map<Integer, Integer>>>(); - - if(keyInfo.get(Integer.valueOf(lrearrange.getIndex())) != null) { - // something is wrong - we should not be getting key info - // for the same index from two different Local Rearranges - int errCode = 2087; - String msg = "Unexpected problem during optimization." + - " Found index:" + lrearrange.getIndex() + - " in multiple LocalRearrange operators."; - throw new OptimizerException(msg, errCode, PigException.BUG); - - } - keyInfo.put(Integer.valueOf(lrearrange.getIndex()), - new Pair<Boolean, Map<Integer, Integer>>( - lrearrange.isProjectStar(), lrearrange.getProjectedColsMap())); - pkg.getPkgr().setKeyInfo(keyInfo); - pkg.getPkgr().setKeyTuple(lrearrange.isKeyTuple()); - pkg.getPkgr().setKeyCompound(lrearrange.isKeyCompound()); - } - - /** - * @return the loRearrangeFound - */ - public int getLoRearrangeFound() { - return loRearrangeFound; - } + public SparkPOPackageAnnotator(SparkOperPlan plan) { + super(plan, new DepthFirstWalker<SparkOperator, SparkOperPlan>(plan)); + } + + @Override + public void visitSparkOp(SparkOperator sparkOp) throws VisitorException { + if (!sparkOp.physicalPlan.isEmpty()) { + PackageDiscoverer pkgDiscoverer = new PackageDiscoverer( + sparkOp.physicalPlan); + pkgDiscoverer.visit(); + POPackage pkg = pkgDiscoverer.getPkg(); + if (pkg != null) { + handlePackage(sparkOp, pkg); + } + } + } + + private void handlePackage(SparkOperator pkgSparkOp, POPackage pkg) + throws VisitorException { + int lrFound = 0; + List<SparkOperator> predecessors = this.mPlan + .getPredecessors(pkgSparkOp); + if (predecessors != null && predecessors.size() > 0) { + for (SparkOperator pred : predecessors) { + lrFound += patchPackage(pred, pkgSparkOp, pkg); + if (lrFound == pkg.getNumInps()) { + break; + } + } + } + if (lrFound != pkg.getNumInps()) { + int errCode = 2086; + String msg = "Unexpected problem during optimization. Could not find all LocalRearrange operators."; + throw new OptimizerException(msg, errCode, PigException.BUG); + } + } + + private int patchPackage(SparkOperator pred, SparkOperator pkgSparkOp, + POPackage pkg) throws VisitorException { + LoRearrangeDiscoverer lrDiscoverer = new LoRearrangeDiscoverer( + pred.physicalPlan, pkg); + lrDiscoverer.visit(); + // let our caller know if we managed to patch + // the package + return lrDiscoverer.getLoRearrangeFound(); + } + + static class PackageDiscoverer extends PhyPlanVisitor { + + private POPackage pkg; + + public PackageDiscoverer(PhysicalPlan plan) { + super(plan, new DepthFirstWalker<PhysicalOperator, PhysicalPlan>( + plan)); + } + + @Override + public void visitPackage(POPackage pkg) throws VisitorException { + this.pkg = pkg; + }; + + /** + * @return the pkg + */ + public POPackage getPkg() { + return pkg; + } + + } + + static class LoRearrangeDiscoverer extends PhyPlanVisitor { + + private int loRearrangeFound = 0; + private POPackage pkg; + + public LoRearrangeDiscoverer(PhysicalPlan plan, POPackage pkg) { + super(plan, new DepthFirstWalker<PhysicalOperator, PhysicalPlan>( + plan)); + this.pkg = pkg; + } + + @Override + public void visitLocalRearrange(POLocalRearrange lrearrange) + throws VisitorException { + loRearrangeFound++; + Map<Integer, Pair<Boolean, Map<Integer, Integer>>> keyInfo; + + if (pkg.getPkgr() instanceof LitePackager) { + if (lrearrange.getIndex() != 0) { + // Throw some exception here + throw new RuntimeException( + "POLocalRearrange for POPackageLite cannot have index other than 0, but has index - " + + lrearrange.getIndex()); + } + } + + // annotate the package with information from the LORearrange + // update the keyInfo information if already present in the + // POPackage + keyInfo = pkg.getPkgr().getKeyInfo(); + if (keyInfo == null) + keyInfo = new HashMap<Integer, Pair<Boolean, Map<Integer, Integer>>>(); + + if (keyInfo.get(Integer.valueOf(lrearrange.getIndex())) != null) { + // something is wrong - we should not be getting key info + // for the same index from two different Local Rearranges + int errCode = 2087; + String msg = "Unexpected problem during optimization." + + " Found index:" + lrearrange.getIndex() + + " in multiple LocalRearrange operators."; + throw new OptimizerException(msg, errCode, PigException.BUG); + + } + keyInfo.put( + Integer.valueOf(lrearrange.getIndex()), + new Pair<Boolean, Map<Integer, Integer>>(lrearrange + .isProjectStar(), lrearrange.getProjectedColsMap())); + pkg.getPkgr().setKeyInfo(keyInfo); + pkg.getPkgr().setKeyTuple(lrearrange.isKeyTuple()); + pkg.getPkgr().setKeyCompound(lrearrange.isKeyCompound()); + } + + /** + * @return the loRearrangeFound + */ + public int getLoRearrangeFound() { + return loRearrangeFound; + } - } + } } Modified: pig/branches/spark/src/org/apache/pig/backend/hadoop/executionengine/spark/plan/SparkPrinter.java URL: http://svn.apache.org/viewvc/pig/branches/spark/src/org/apache/pig/backend/hadoop/executionengine/spark/plan/SparkPrinter.java?rev=1665404&r1=1665403&r2=1665404&view=diff ============================================================================== --- pig/branches/spark/src/org/apache/pig/backend/hadoop/executionengine/spark/plan/SparkPrinter.java (original) +++ pig/branches/spark/src/org/apache/pig/backend/hadoop/executionengine/spark/plan/SparkPrinter.java Tue Mar 10 04:37:36 2015 @@ -22,7 +22,7 @@ import java.io.PrintStream; import org.apache.pig.backend.hadoop.executionengine.physicalLayer.PhysicalOperator; import org.apache.pig.backend.hadoop.executionengine.physicalLayer.plans.PhysicalPlan; import org.apache.pig.backend.hadoop.executionengine.physicalLayer.plans.PlanPrinter; -import org.apache.pig.backend.hadoop.executionengine.spark.operator.NativeSparkOper; +import org.apache.pig.backend.hadoop.executionengine.spark.operator.NativeSparkOperator; import org.apache.pig.impl.plan.DepthFirstWalker; import org.apache.pig.impl.plan.VisitorException; @@ -31,36 +31,36 @@ import org.apache.pig.impl.plan.VisitorE */ public class SparkPrinter extends SparkOpPlanVisitor { + private PrintStream mStream = null; + private boolean isVerbose = true; - private PrintStream mStream = null; - private boolean isVerbose = true; - - public SparkPrinter(PrintStream ps, SparkOperPlan plan) { - super(plan, new DepthFirstWalker<SparkOper, SparkOperPlan>(plan)); - mStream = ps; - mStream.println("#--------------------------------------------------"); - mStream.println("# Spark Plan "); - mStream.println("#--------------------------------------------------"); - } - - public void setVerbose(boolean verbose) { - isVerbose = verbose; - } - - @Override - public void visitSparkOp(SparkOper sparkOp) throws VisitorException { - mStream.println(""); - mStream.println("Spark node " + sparkOp.getOperatorKey().toString()); - if(sparkOp instanceof NativeSparkOper) { - mStream.println("--------"); - mStream.println(); - return; - } - if (sparkOp.plan != null && sparkOp.plan.size() > 0) { - PlanPrinter<PhysicalOperator, PhysicalPlan> printer = new PlanPrinter<PhysicalOperator, PhysicalPlan>(sparkOp.plan, mStream); - printer.setVerbose(isVerbose); - printer.visit(); - mStream.println("--------"); - } - } + public SparkPrinter(PrintStream ps, SparkOperPlan plan) { + super(plan, new DepthFirstWalker<SparkOperator, SparkOperPlan>(plan)); + mStream = ps; + mStream.println("#--------------------------------------------------"); + mStream.println("# Spark Plan "); + mStream.println("#--------------------------------------------------"); + } + + public void setVerbose(boolean verbose) { + isVerbose = verbose; + } + + @Override + public void visitSparkOp(SparkOperator sparkOp) throws VisitorException { + mStream.println(""); + mStream.println("Spark node " + sparkOp.getOperatorKey().toString()); + if (sparkOp instanceof NativeSparkOperator) { + mStream.println("--------"); + mStream.println(); + return; + } + if (sparkOp.physicalPlan != null && sparkOp.physicalPlan.size() > 0) { + PlanPrinter<PhysicalOperator, PhysicalPlan> printer = new PlanPrinter<PhysicalOperator, PhysicalPlan>( + sparkOp.physicalPlan, mStream); + printer.setVerbose(isVerbose); + printer.visit(); + mStream.println("--------"); + } + } }
