Modified: pig/trunk/src/org/apache/pig/newplan/logical/relational/LOCogroup.java URL: http://svn.apache.org/viewvc/pig/trunk/src/org/apache/pig/newplan/logical/relational/LOCogroup.java?rev=1645056&r1=1645055&r2=1645056&view=diff ============================================================================== --- pig/trunk/src/org/apache/pig/newplan/logical/relational/LOCogroup.java (original) +++ pig/trunk/src/org/apache/pig/newplan/logical/relational/LOCogroup.java Fri Dec 12 20:15:26 2014 @@ -37,13 +37,13 @@ import org.apache.pig.newplan.logical.ex import org.apache.pig.newplan.logical.relational.LogicalSchema.LogicalFieldSchema; public class LOCogroup extends LogicalRelationalOperator { - + // List of booleans specifying if any of the cogroups is inner private boolean[] mIsInner; - + // List of expressionPlans according to input private MultiMap<Integer,LogicalExpressionPlan> mExpressionPlans; - + /** * Enum for the type of group */ @@ -52,39 +52,92 @@ public class LOCogroup extends LogicalRe COLLECTED, // Collected group MERGE // Map-side CoGroup on sorted data }; - + private GROUPTYPE mGroupType; - - private LogicalFieldSchema groupKeyUidOnlySchema; - + + private LogicalFieldSchema groupKeyUidOnlySchema; + /* * This is a map storing Uids which have been generated for an input * This map is required to make the uids persistant between calls of * resetSchema and getSchema */ private Map<Integer,Long> generatedInputUids = new HashMap<Integer,Long>(); - + final static String GROUP_COL_NAME = "group"; - - /** + + /** * static constant to refer to the option of selecting a group type */ public final static Integer OPTION_GROUPTYPE = 1; - + + //the pivot value + private int pivot = -1; + //the index of the first field involves in ROLLUP + private int rollupFieldIndex = 0; + //the original index of the first field involves in ROLLUP in case it was moved to the end + //(if we have the combination of cube and rollup) + private int rollupOldFieldIndex = 0; + //the size of total fields that involve in CUBE clause + private int dimensionSize = 0; + + //number of algebraic function that used after rollup + private int nAlgebraic = 0; + + public void setPivot(int pvt) { + this.pivot = pvt; + } + + public int getPivot() { + return this.pivot; + } + + public void setDimensionSize(int ds) { + this.dimensionSize = ds; + } + + public int getDimensionSize() { + return this.dimensionSize; + } + + public void setNumberAlgebraic(int na) { + this.nAlgebraic = na; + } + + public int getNumberAlgebraic() { + return this.nAlgebraic; + } + + public void setRollupOldFieldIndex(int rofi) { + this.rollupOldFieldIndex = rofi; + } + + public int getRollupOldFieldIndex() { + return this.rollupOldFieldIndex; + } + + public void setRollupFieldIndex(int rfi) { + this.rollupFieldIndex = rfi; + } + + public int getRollupFieldIndex() { + return this.rollupFieldIndex; + } + /** * Constructor for use in defining rule patterns * @param plan */ public LOCogroup(LogicalPlan plan) { - super("LOCogroup", plan); + super("LOCogroup", plan); } - - public LOCogroup(OperatorPlan plan, MultiMap<Integer,LogicalExpressionPlan> + + public LOCogroup(OperatorPlan plan, MultiMap<Integer,LogicalExpressionPlan> expressionPlans, boolean[] isInner ) { this( plan, expressionPlans, GROUPTYPE.REGULAR, isInner ); } - public LOCogroup(OperatorPlan plan, MultiMap<Integer,LogicalExpressionPlan> + public LOCogroup(OperatorPlan plan, MultiMap<Integer,LogicalExpressionPlan> expressionPlans, GROUPTYPE groupType, boolean[] isInner) { super("LOCogroup", plan); this.mExpressionPlans = expressionPlans; @@ -93,7 +146,7 @@ public class LOCogroup extends LogicalRe } this.mGroupType = groupType; } - + /** * Given an expression plan this function returns a LogicalFieldSchema * that can be generated using this expression plan @@ -120,7 +173,7 @@ public class LOCogroup extends LogicalRe if (inputs == null) { throw new FrontendException(this, "Cannot get predecessor for " + this, 2233); } - + List<LogicalFieldSchema> fieldSchemaList = new ArrayList<LogicalFieldSchema>(); // See if we have more than one expression plans, if so the @@ -139,7 +192,7 @@ public class LOCogroup extends LogicalRe LogicalSchema keySchema = new LogicalSchema(); // We sort here to maintain the correct order of inputs for( Integer key : mExpressionPlans.keySet()) { - Collection<LogicalExpressionPlan> plans = + Collection<LogicalExpressionPlan> plans = mExpressionPlans.get(key); for( LogicalExpressionPlan plan : plans ) { @@ -175,14 +228,14 @@ public class LOCogroup extends LogicalRe break; } break; - } + } } if(mExpressionPlans.size() > 1){ //reset the uid, because the group column is associated with more // than one input groupKeySchema.resetUid(); } - + if (groupKeySchema==null) { throw new FrontendException(this, "Cannot get group key schema for " + this, 2234); } @@ -194,8 +247,8 @@ public class LOCogroup extends LogicalRe int counter = 0; for (Operator op : inputs) { LogicalSchema inputSchema = ((LogicalRelationalOperator)op).getSchema(); - - // Check if we already have calculated Uid for this bag for given + + // Check if we already have calculated Uid for this bag for given // input operator long bagUid; if (generatedInputUids.get(counter)!=null) @@ -204,15 +257,15 @@ public class LOCogroup extends LogicalRe bagUid = LogicalExpression.getNextUid(); generatedInputUids.put( counter, bagUid ); } - + LogicalFieldSchema newTupleFieldSchema = new LogicalFieldSchema( null, inputSchema, DataType.TUPLE, LogicalExpression.getNextUid()); - + LogicalSchema bagSchema = new LogicalSchema(); bagSchema.addField(newTupleFieldSchema); - + LogicalFieldSchema newBagFieldSchema = new LogicalFieldSchema( - ((LogicalRelationalOperator)op).getAlias(), bagSchema, + ((LogicalRelationalOperator)op).getAlias(), bagSchema, DataType.BAG, bagUid); fieldSchemaList.add( newBagFieldSchema ); @@ -222,7 +275,7 @@ public class LOCogroup extends LogicalRe schema = new LogicalSchema(); for(LogicalFieldSchema fieldSchema: fieldSchemaList) { schema.addField(fieldSchema); - } + } return schema; } @@ -239,32 +292,32 @@ public class LOCogroup extends LogicalRe public boolean isEqual(Operator other) throws FrontendException { if (other != null && other instanceof LOCogroup) { LOCogroup oc = (LOCogroup)other; - if( mGroupType == oc.mGroupType && - mIsInner.length == oc.mIsInner.length + if( mGroupType == oc.mGroupType && + mIsInner.length == oc.mIsInner.length && mExpressionPlans.size() == oc.mExpressionPlans.size() ) { for( int i = 0; i < mIsInner.length; i++ ) { if( mIsInner[i] != oc.mIsInner[i] ) { return false; } } - for( Integer key : mExpressionPlans.keySet() ) { + for( Integer key : mExpressionPlans.keySet() ) { if( ! oc.mExpressionPlans.containsKey(key) ) { return false; } - Collection<LogicalExpressionPlan> exp1 = + Collection<LogicalExpressionPlan> exp1 = mExpressionPlans.get(key); - Collection<LogicalExpressionPlan> exp2 = + Collection<LogicalExpressionPlan> exp2 = oc.mExpressionPlans.get(key); - if(! ( exp1 instanceof ArrayList<?> + if(! ( exp1 instanceof ArrayList<?> || exp2 instanceof ArrayList<?> ) ) { throw new FrontendException( "Expected an ArrayList " + "of Expression Plans", 2235 ); } - ArrayList<LogicalExpressionPlan> expList1 = + ArrayList<LogicalExpressionPlan> expList1 = (ArrayList<LogicalExpressionPlan>) exp1; - ArrayList<LogicalExpressionPlan> expList2 = + ArrayList<LogicalExpressionPlan> expList2 = (ArrayList<LogicalExpressionPlan>) exp2; for (int i = 0; i < expList1.size(); i++) { @@ -282,37 +335,37 @@ public class LOCogroup extends LogicalRe public GROUPTYPE getGroupType() { return mGroupType; } - + public void resetGroupType() { mGroupType = GROUPTYPE.REGULAR; } - + /** - * Returns an Unmodifiable Map of Input Number to Uid + * Returns an Unmodifiable Map of Input Number to Uid * @return Unmodifiable Map<Integer,Long> */ public Map<Integer,Long> getGeneratedInputUids() { return Collections.unmodifiableMap( generatedInputUids ); } - + public MultiMap<Integer,LogicalExpressionPlan> getExpressionPlans() { return mExpressionPlans; } - + public void setExpressionPlans(MultiMap<Integer,LogicalExpressionPlan> plans) { this.mExpressionPlans = plans; } - + public void setGroupType(GROUPTYPE gt) { mGroupType = gt; } - + public void setInnerFlags(boolean[] flags) { if( flags != null ) { mIsInner = Arrays.copyOf( flags, flags.length ); } } - + public boolean[] getInner() { return mIsInner; } @@ -322,7 +375,7 @@ public class LOCogroup extends LogicalRe groupKeyUidOnlySchema = null; generatedInputUids = new HashMap<Integer,Long>(); } - + public List<Operator> getInputs(LogicalPlan plan) { return plan.getPredecessors(this); }
Modified: pig/trunk/src/org/apache/pig/newplan/logical/relational/LOCube.java URL: http://svn.apache.org/viewvc/pig/trunk/src/org/apache/pig/newplan/logical/relational/LOCube.java?rev=1645056&r1=1645055&r2=1645056&view=diff ============================================================================== --- pig/trunk/src/org/apache/pig/newplan/logical/relational/LOCube.java (original) +++ pig/trunk/src/org/apache/pig/newplan/logical/relational/LOCube.java Fri Dec 12 20:15:26 2014 @@ -84,6 +84,16 @@ import org.apache.pig.newplan.logical.ex public class LOCube extends LogicalRelationalOperator { private MultiMap<Integer, LogicalExpressionPlan> mExpressionPlans; private List<String> operations; + //the pivot position + private int pivot = -1; + + public void setPivot(int pvt) { + this.pivot = pvt; + } + + public int getPivot() { + return this.pivot; + } public LOCube(LogicalPlan plan) { super("LOCube", plan); Added: pig/trunk/src/org/apache/pig/newplan/logical/relational/LORollupHIIForEach.java URL: http://svn.apache.org/viewvc/pig/trunk/src/org/apache/pig/newplan/logical/relational/LORollupHIIForEach.java?rev=1645056&view=auto ============================================================================== --- pig/trunk/src/org/apache/pig/newplan/logical/relational/LORollupHIIForEach.java (added) +++ pig/trunk/src/org/apache/pig/newplan/logical/relational/LORollupHIIForEach.java Fri Dec 12 20:15:26 2014 @@ -0,0 +1,226 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.pig.newplan.logical.relational; + +import java.util.ArrayList; +import java.util.Iterator; +import java.util.List; +import java.util.Stack; + +import org.apache.pig.impl.logicalLayer.FrontendException; +import org.apache.pig.impl.util.Pair; +import org.apache.pig.newplan.Operator; +import org.apache.pig.newplan.OperatorPlan; +import org.apache.pig.newplan.PlanVisitor; +import org.apache.pig.newplan.ReverseDependencyOrderWalker; +import org.apache.pig.newplan.logical.expression.LogicalExpressionPlan; +import org.apache.pig.newplan.logical.expression.ProjectExpression; +import org.apache.pig.newplan.logical.optimizer.AllSameRalationalNodesVisitor; + +/** + * Rollup operator implementation for data rollup computation. This class + * provides a new ForEach logical operator to implement IRG and Hybrid IRG + * approaches for rollup computation. + */ + +public class LORollupHIIForEach extends LOForEach { + + private static final long serialVersionUID = 2L; + + private LogicalPlan innerPlan; + //the pivot value + private int pivot = -1; + //the index of the first field involves in ROLLUP + private int rollupFieldIndex = 0; + //the original index of the first field involves in ROLLUP in case it was moved to the end + //(if we have the combination of cube and rollup) + private int rollupOldFieldIndex = 0; + //number of fields that involve in ROLLUP + private int rollupSize = 0; + //if we only use IRG, not Hybrid IRG+IRG + private boolean isOnlyIRG = false; + //the size of total fields that involve in CUBE clause + private int dimensionSize = 0; + + public LORollupHIIForEach(OperatorPlan plan) { + super(plan); + } + + public LORollupHIIForEach(LOForEach foreach) throws FrontendException { + super(foreach.getPlan()); + this.setInnerPlan(foreach.getInnerPlan()); + this.setRequestedParallelism(foreach.getRequestedParallelism()); + this.setAlias(foreach.getAlias()); + this.setSchema(foreach.getSchema()); + this.setLocation(foreach.getLocation()); + Iterator<Operator> its = foreach.getInnerPlan().getOperators(); + while (its.hasNext()) { + Operator opr = its.next(); + AttachPrjToNew(opr, this); + } + } + + public void setOnlyIRG() { + isOnlyIRG = true; + } + + public boolean getOnlyIRG() { + return isOnlyIRG; + } + + public void setRollupOldFieldIndex(int rofi) { + this.rollupOldFieldIndex = rofi; + } + + public int getRollupOldFieldIndex() { + return this.rollupOldFieldIndex; + } + + public void setRollupSize(int rs) { + this.rollupSize = rs; + } + + public int getRollupSize() { + return this.rollupSize; + } + + public void setRollupFieldIndex(int rfi) { + this.rollupFieldIndex = rfi; + } + + public int getRollupFieldIndex() { + return this.rollupFieldIndex; + } + + public void setPivot(int pvt) { + this.pivot = pvt; + } + + public int getPivot() { + return this.pivot; + } + + public void setDimensionSize(int ds) { + this.dimensionSize = ds; + } + + public int getDimensionSize() { + return this.dimensionSize; + } + + /** + * Attach ProjectExpression from old LOForEach operator to new + * LORollupHIIForEach operator + * + * @param opr + * @param hfe + * @throws FrontendException + */ + private void AttachPrjToNew(Operator opr, LORollupHIIForEach hfe) throws FrontendException { + + if (opr instanceof LOGenerate) { + LOGenerate log = (LOGenerate) opr; + List<LogicalExpressionPlan> leps = log.getOutputPlans(); + for (LogicalExpressionPlan lep : leps) { + Iterator<Operator> its = lep.getOperators(); + while (its.hasNext()) { + Operator opr2 = its.next(); + if (opr2 instanceof ProjectExpression) { + if (((ProjectExpression) opr2).getAttachedRelationalOp() instanceof LOForEach) { + ((ProjectExpression) opr2).setAttachedRelationalOp(hfe); + } + Pair<List<LOInnerLoad>, Boolean> a = findReacheableInnerLoadFromBoundaryProject((ProjectExpression) opr2); + List<LOInnerLoad> innerLoads = a.first; + boolean needNewUid = a.second; + } + } + } + } else if (opr instanceof LOInnerLoad) { + LOInnerLoad loi = (LOInnerLoad) opr; + + if (loi.getProjection().getAttachedRelationalOp() instanceof LOForEach) + loi.getProjection().setAttachedRelationalOp(hfe); + } + } + + public LogicalPlan getInnerPlan() { + return innerPlan; + } + + public void setInnerPlan(LogicalPlan p) { + innerPlan = p; + } + + @Override + public boolean isEqual(Operator other) throws FrontendException { + if (!(other instanceof LORollupHIIForEach)) { + return false; + } + + return innerPlan.isEqual(((LORollupHIIForEach) other).innerPlan); + } + + @Override + public LogicalSchema getSchema() throws FrontendException { + List<Operator> ll = innerPlan.getSinks(); + if (ll != null) { + schema = ((LogicalRelationalOperator) ll.get(0)).getSchema(); + } + + return schema; + } + + @Override + public void accept(PlanVisitor v) throws FrontendException { + if (!(v instanceof LogicalRelationalNodesVisitor)) { + throw new FrontendException("Expected LogicalPlanVisitor", 2222); + } + ((LogicalRelationalNodesVisitor) v).visit(this); + } + + public LogicalSchema dumpNestedSchema(String alias, String nestedAlias) throws FrontendException { + NestedRelationalOperatorFinder opFinder = new NestedRelationalOperatorFinder(innerPlan, nestedAlias); + opFinder.visit(); + + if (opFinder.getMatchedOperator() != null) { + LogicalSchema nestedSc = opFinder.getMatchedOperator().getSchema(); + return nestedSc; + } + return null; + } + + private static class NestedRelationalOperatorFinder extends AllSameRalationalNodesVisitor { + String aliasOfOperator; + LogicalRelationalOperator opFound = null; + + public NestedRelationalOperatorFinder(LogicalPlan plan, String alias) throws FrontendException { + super(plan, new ReverseDependencyOrderWalker(plan)); + aliasOfOperator = alias; + } + + public LogicalRelationalOperator getMatchedOperator() { + return opFound; + } + + @Override + public void execute(LogicalRelationalOperator op) throws FrontendException { + if (op.getAlias() != null && op.getAlias().equals(aliasOfOperator)) + opFound = op; + } + } +} Modified: pig/trunk/src/org/apache/pig/newplan/logical/relational/LogToPhyTranslationVisitor.java URL: http://svn.apache.org/viewvc/pig/trunk/src/org/apache/pig/newplan/logical/relational/LogToPhyTranslationVisitor.java?rev=1645056&r1=1645055&r2=1645056&view=diff ============================================================================== --- pig/trunk/src/org/apache/pig/newplan/logical/relational/LogToPhyTranslationVisitor.java (original) +++ pig/trunk/src/org/apache/pig/newplan/logical/relational/LogToPhyTranslationVisitor.java Fri Dec 12 20:15:26 2014 @@ -58,6 +58,7 @@ import org.apache.pig.backend.hadoop.exe import org.apache.pig.backend.hadoop.executionengine.physicalLayer.relationalOperators.PONative; import org.apache.pig.backend.hadoop.executionengine.physicalLayer.relationalOperators.POPackage; import org.apache.pig.backend.hadoop.executionengine.physicalLayer.relationalOperators.PORank; +import org.apache.pig.backend.hadoop.executionengine.physicalLayer.relationalOperators.PORollupHIIForEach; import org.apache.pig.backend.hadoop.executionengine.physicalLayer.relationalOperators.POSkewedJoin; import org.apache.pig.backend.hadoop.executionengine.physicalLayer.relationalOperators.POSort; import org.apache.pig.backend.hadoop.executionengine.physicalLayer.relationalOperators.POSplit; @@ -900,6 +901,128 @@ public class LogToPhyTranslationVisitor translateSoftLinks(foreach); } + @Override + public void visit(LORollupHIIForEach hforeach) throws FrontendException { + String scope = DEFAULT_SCOPE; + + List<PhysicalPlan> innerPlans = new ArrayList<PhysicalPlan>(); + + org.apache.pig.newplan.logical.relational.LogicalPlan inner = hforeach.getInnerPlan(); + LOGenerate gen = (LOGenerate) inner.getSinks().get(0); + + List<LogicalExpressionPlan> exps = gen.getOutputPlans(); + List<Operator> preds = inner.getPredecessors(gen); + + currentPlans.push(currentPlan); + + // we need to translate each predecessor of LOGenerate into a physical plan. + // The physical plan should contain the expression plan for this predecessor plus + // the subtree starting with this predecessor + for (int i = 0; i < exps.size(); i++) { + currentPlan = new PhysicalPlan(); + // translate the expression plan + PlanWalker childWalker = new ReverseDependencyOrderWalkerWOSeenChk(exps.get(i)); + pushWalker(childWalker); + childWalker.walk(new ExpToPhyTranslationVisitor(exps.get(i), childWalker, gen, + currentPlan, logToPhyMap)); + popWalker(); + + List<Operator> leaves = exps.get(i).getSinks(); + for (Operator l : leaves) { + PhysicalOperator op = logToPhyMap.get(l); + if (l instanceof ProjectExpression) { + int input = ((ProjectExpression) l).getInputNum(); + + // for each sink projection, get its input logical plan and translate it + Operator pred = preds.get(input); + childWalker = new SubtreeDependencyOrderWalker(inner, pred); + pushWalker(childWalker); + childWalker.walk(this); + popWalker(); + + // get the physical operator of the leaf of input logical plan + PhysicalOperator leaf = logToPhyMap.get(pred); + + if (pred instanceof LOInnerLoad) { + // if predecessor is only an LOInnerLoad, remove the project that + // comes from LOInnerLoad and change the column of project that + // comes from expression plan + currentPlan.remove(leaf); + logToPhyMap.remove(pred); + + POProject leafProj = (POProject) leaf; + try { + if (leafProj.isStar()) { + ((POProject) op).setStar(true); + } else if (leafProj.isProjectToEnd()) { + ((POProject) op).setProjectToEnd(leafProj.getStartCol()); + } else { + ((POProject) op).setColumn(leafProj.getColumn()); + } + + } catch (ExecException e) { + throw new FrontendException(hforeach, "Cannot get column from " + leaf, + 2230, e); + } + + } else { + currentPlan.connect(leaf, op); + } + } + } + innerPlans.add(currentPlan); + } + + currentPlan = currentPlans.pop(); + + // PhysicalOperator poGen = new POGenerate(new OperatorKey("", + // r.nextLong()), inputs, toBeFlattened); + boolean[] flatten = gen.getFlattenFlags(); + List<Boolean> flattenList = new ArrayList<Boolean>(); + for (boolean fl : flatten) { + flattenList.add(fl); + } + // Create new PORollupHIIForEach for translation from Logical Plan to Physical Plan + PORollupHIIForEach poHFE = new PORollupHIIForEach(new OperatorKey(scope, nodeGen.getNextNodeId(scope)), + hforeach.getRequestedParallelism(), innerPlans, flattenList); + + // if the pivot position is zero, set the pivot position for physical op is zero + if(hforeach.getPivot() == 0) + poHFE.setPivot(0); + //else, decrease pivot position by one, because the position user specified and the + //rollup field index is different by one + else + poHFE.setPivot(hforeach.getPivot() - 1); + //get the start field index and size of rollup position in case the rollup does not stand at the front + poHFE.setRollupFieldIndex(hforeach.getRollupFieldIndex()); + poHFE.setRollupOldFieldIndex(hforeach.getRollupOldFieldIndex()); + poHFE.setRollupSize(hforeach.getRollupSize()); + poHFE.setDimensionSize(hforeach.getDimensionSize()); + + poHFE.addOriginalLocation(hforeach.getAlias(), hforeach.getLocation()); + poHFE.setResultType(DataType.BAG); + logToPhyMap.put(hforeach, poHFE); + currentPlan.add(poHFE); + + // generate cannot have multiple inputs + List<Operator> op = hforeach.getPlan().getPredecessors(hforeach); + + // generate may not have any predecessors + if (op == null) + return; + + PhysicalOperator from = logToPhyMap.get(op.get(0)); + try { + currentPlan.connect(from, poHFE); + } catch (Exception e) { + int errCode = 2015; + String msg = "Invalid physical operators in the physical plan"; + throw new LogicalToPhysicalTranslatorException(msg, errCode, PigException.BUG, e); + } + + translateSoftLinks(hforeach); + } + /** * This function takes in a List of LogicalExpressionPlan and converts them to * a list of PhysicalPlans @@ -1010,6 +1133,19 @@ public class LogToPhyTranslationVisitor case REGULAR: POPackage poPackage = compileToLR_GR_PackTrio(cg, cg.getCustomPartitioner(), cg.getInner(), cg.getExpressionPlans()); poPackage.getPkgr().setPackageType(PackageType.GROUP); + if(cg.getPivot()!=-1) { + //Set the pivot value + poPackage.setPivot(cg.getPivot()); + //Set the size of total fields that involve in CUBE clause + poPackage.setDimensionSize(cg.getDimensionSize()); + //Set the index of the first field involves in ROLLUP + poPackage.setRollupFieldIndex(cg.getRollupFieldIndex()); + //Set the original index of the first field involves in ROLLUP in case it was moved to the end + //(if we have the combination of cube and rollup) + poPackage.setRollupOldFieldIndex(cg.getRollupOldFieldIndex()); + //Set number of algebraic functions that used after rollup + poPackage.setNumberAlgebraic(cg.getNumberAlgebraic()); + } logToPhyMap.put(cg, poPackage); break; case MERGE: Modified: pig/trunk/src/org/apache/pig/newplan/logical/relational/LogicalPlan.java URL: http://svn.apache.org/viewvc/pig/trunk/src/org/apache/pig/newplan/logical/relational/LogicalPlan.java?rev=1645056&r1=1645055&r2=1645056&view=diff ============================================================================== --- pig/trunk/src/org/apache/pig/newplan/logical/relational/LogicalPlan.java (original) +++ pig/trunk/src/org/apache/pig/newplan/logical/relational/LogicalPlan.java Fri Dec 12 20:15:26 2014 @@ -263,6 +263,7 @@ public class LogicalPlan extends BaseOpe disabledOptimizerRules.add("ColumnMapKeyPrune"); disabledOptimizerRules.add("AddForEach"); disabledOptimizerRules.add("GroupByConstParallelSetter"); + disabledOptimizerRules.add("RollupHIIOptimizer"); } try { Modified: pig/trunk/src/org/apache/pig/newplan/logical/relational/LogicalRelationalNodesVisitor.java URL: http://svn.apache.org/viewvc/pig/trunk/src/org/apache/pig/newplan/logical/relational/LogicalRelationalNodesVisitor.java?rev=1645056&r1=1645055&r2=1645056&view=diff ============================================================================== --- pig/trunk/src/org/apache/pig/newplan/logical/relational/LogicalRelationalNodesVisitor.java (original) +++ pig/trunk/src/org/apache/pig/newplan/logical/relational/LogicalRelationalNodesVisitor.java Fri Dec 12 20:15:26 2014 @@ -57,6 +57,9 @@ public abstract class LogicalRelationalN public void visit(LOForEach foreach) throws FrontendException { } + public void visit(LORollupHIIForEach horeach) throws FrontendException { + } + public void visit(LOGenerate gen) throws FrontendException { } Modified: pig/trunk/src/org/apache/pig/newplan/logical/rules/OptimizerUtils.java URL: http://svn.apache.org/viewvc/pig/trunk/src/org/apache/pig/newplan/logical/rules/OptimizerUtils.java?rev=1645056&r1=1645055&r2=1645056&view=diff ============================================================================== --- pig/trunk/src/org/apache/pig/newplan/logical/rules/OptimizerUtils.java (original) +++ pig/trunk/src/org/apache/pig/newplan/logical/rules/OptimizerUtils.java Fri Dec 12 20:15:26 2014 @@ -27,6 +27,7 @@ import org.apache.pig.newplan.logical.ex import org.apache.pig.newplan.logical.expression.UserFuncExpression; import org.apache.pig.newplan.logical.relational.LOForEach; import org.apache.pig.newplan.logical.relational.LOGenerate; +import org.apache.pig.newplan.logical.relational.LORollupHIIForEach; import org.apache.pig.newplan.logical.relational.LogicalPlan; public class OptimizerUtils { @@ -41,6 +42,16 @@ public class OptimizerUtils { } /** + * Find generate op from the rolluphiiforeach operator. + * @param foreach the LORollupHIIForEach instance + * @return LOGenerate instance + */ + public static LOGenerate findGenerate(LORollupHIIForEach hfe) { + LogicalPlan inner = hfe.getInnerPlan(); + return (LOGenerate) inner.getSinks().get(0); + } + + /** * Check if a given LOGenerate operator has any flatten fields. * @param gen the given LOGenerate instance * @return true if LOGenerate instance contains flatten fields, false otherwise Added: pig/trunk/src/org/apache/pig/newplan/logical/rules/RollupHIIOptimizer.java URL: http://svn.apache.org/viewvc/pig/trunk/src/org/apache/pig/newplan/logical/rules/RollupHIIOptimizer.java?rev=1645056&view=auto ============================================================================== --- pig/trunk/src/org/apache/pig/newplan/logical/rules/RollupHIIOptimizer.java (added) +++ pig/trunk/src/org/apache/pig/newplan/logical/rules/RollupHIIOptimizer.java Fri Dec 12 20:15:26 2014 @@ -0,0 +1,413 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.pig.newplan.logical.rules; + +import java.util.ArrayList; +import java.util.Iterator; +import java.util.List; + +import org.apache.pig.Algebraic; +import org.apache.pig.EvalFunc; +import org.apache.pig.backend.hadoop.executionengine.mapReduceLayer.partitioners.RollupHIIPartitioner; +import org.apache.pig.builtin.RollupDimensions; +import org.apache.pig.data.DataType; +import org.apache.pig.impl.PigContext; +import org.apache.pig.impl.logicalLayer.FrontendException; +import org.apache.pig.newplan.Operator; +import org.apache.pig.newplan.OperatorPlan; +import org.apache.pig.newplan.OperatorSubPlan; +import org.apache.pig.newplan.logical.expression.CastExpression; +import org.apache.pig.newplan.logical.expression.DereferenceExpression; +import org.apache.pig.newplan.logical.expression.LogicalExpression; +import org.apache.pig.newplan.logical.expression.LogicalExpressionPlan; +import org.apache.pig.newplan.logical.expression.ProjectExpression; +import org.apache.pig.newplan.logical.expression.UserFuncExpression; +import org.apache.pig.newplan.logical.relational.LOCogroup; +import org.apache.pig.newplan.logical.relational.LOForEach; +import org.apache.pig.newplan.logical.relational.LOGenerate; +import org.apache.pig.newplan.logical.relational.LORollupHIIForEach; +import org.apache.pig.newplan.logical.relational.LOInnerLoad; +import org.apache.pig.newplan.logical.relational.LogicalPlan; +import org.apache.pig.newplan.logical.relational.LogicalSchema; +import org.apache.pig.newplan.logical.relational.LogicalSchema.LogicalFieldSchema; +import org.apache.pig.newplan.optimizer.Rule; +import org.apache.pig.newplan.optimizer.Transformer; + +public class RollupHIIOptimizer extends Rule { + + private OperatorSubPlan subPlan; + + private int nRollup = 0; + + private static final String ROLLUP_UDF = RollupDimensions.class.getName(); + private static final String ROLLUP_PARTITIONER = RollupHIIPartitioner.class.getName(); + + public RollupHIIOptimizer(String n) { + super(n, false); + } + + @Override + protected OperatorPlan buildPattern() { + + LogicalPlan plan = new LogicalPlan(); + LOForEach foreach = new LOForEach(plan); + LOCogroup groupby = new LOCogroup(plan); + + plan.add(foreach); + plan.add(groupby); + plan.connect(foreach, groupby); + + return plan; + } + + @Override + public Transformer getNewTransformer() { + return new RollupTransformer(); + } + + public class RollupTransformer extends Transformer { + + /** + * User defined schema for generate operator. If not specified output + * schema of UDF will be used which will prefix "dimensions" namespace + * to all fields + * + * @param input + * @return List<LogicalSchema> + * @throws FrontendException + */ + private List<LogicalSchema> getUserDefinedSchema(List<LogicalExpressionPlan> input) throws FrontendException { + List<LogicalSchema> result = new ArrayList<LogicalSchema>(); + for (int i = 0; i < input.size(); i++) { + // get sources of input (roots) + List<Operator> sources = input.get(i).getSources(); + // iterate sources list + for (Operator op : sources) { + + if (op instanceof ProjectExpression) { + LogicalSchema output = new LogicalSchema(); + output.addField(new LogicalFieldSchema(((ProjectExpression) op).getColAlias(), null, + DataType.NULL)); + result.add(output); + } else if (op instanceof UserFuncExpression) { + LogicalSchema output = new LogicalSchema(); + for (Operator new_op : ((UserFuncExpression) op).getPlan().getSinks()) { + output.addField(new LogicalFieldSchema(((ProjectExpression) new_op).getFieldSchema())); + } + result.add(output); + } else if (op instanceof CastExpression) { + LogicalSchema output = new LogicalSchema(); + output.addField(new LogicalFieldSchema(((CastExpression) op).getFieldSchema())); + result.add(output); + } else if (op instanceof DereferenceExpression) { + LogicalSchema output = new LogicalSchema(); + output.addField(new LogicalFieldSchema(((ProjectExpression) ((DereferenceExpression) op) + .getReferredExpression()).getColAlias(), null, DataType.NULL)); + result.add(output); + } + + } + } + return result; + } + + @Override + /** + * Check if the plan operator and its sub-tree has a match to the pattern + * operator and its sub-tree. + */ + public boolean check(OperatorPlan matched) throws FrontendException { + + // Check if we have the Rollup operation or not + + LOForEach foreach1 = (LOForEach) matched.getSources().get(0); + + LOCogroup cogroup = (LOCogroup) matched.getSuccessors(foreach1).get(0); + + if (currentPlan.getSuccessors(cogroup) == null) + return false; + + boolean bRollup = false; + + Iterator<Operator> it = foreach1.getInnerPlan().getOperators(); + while (it.hasNext()) { + Operator op = it.next(); + if (op instanceof LOGenerate) { + List<LogicalExpressionPlan> inner_leplan = ((LOGenerate) op).getOutputPlans(); + for (LogicalExpressionPlan op2 : inner_leplan) { + for (Operator op3 : op2.getSources()) { + if (op3 instanceof UserFuncExpression) { + UserFuncExpression uf = (UserFuncExpression) op3; + //Count the number of rollup operation + if (uf.getFuncSpec().toString().equals(ROLLUP_UDF)) { + bRollup = true; + nRollup++; + } + List<LogicalExpression> inpUfs = uf.getArguments(); + for (LogicalExpression op4 : inpUfs) { + if (op4 instanceof ProjectExpression) { + ProjectExpression pe = (ProjectExpression) op4; + } else if (op4 instanceof CastExpression) { + CastExpression ce = (CastExpression) op4; + } else { + } + } + } else if (op3 instanceof ProjectExpression) { + ProjectExpression pe = (ProjectExpression) op3; + + } else if (op3 instanceof CastExpression) { + CastExpression ce = (CastExpression) op3; + } else { + } + } + } + + } else if (op instanceof LOInnerLoad) { + LOInnerLoad iltmp = (LOInnerLoad) op; + ProjectExpression pe = iltmp.getProjection(); + } + } + + // If we did not find out a rollup or there are more than + // rollup, this operator plan fails the checks + + if (!bRollup) + return false; + + // We check if our userfuncexpression can be applied with the + // optimization or not. + + List<Operator> succs = currentPlan.getSuccessors(cogroup); + + // check if the optimization if applicable with the function + boolean bOptimization = false; + + if (!succs.isEmpty() && succs.size() == 1) { + if (succs.get(0) instanceof LOForEach) { + LOForEach foreach2 = (LOForEach) succs.get(0); + it = foreach2.getInnerPlan().getOperators(); + while (it.hasNext()) { + Operator op = it.next(); + if (op instanceof LOGenerate) { + List<LogicalExpressionPlan> inner_leplan = ((LOGenerate) op).getOutputPlans(); + for (LogicalExpressionPlan loplan2 : inner_leplan) { + for (Operator op3 : loplan2.getSources()) { + if (op3 instanceof UserFuncExpression) { + UserFuncExpression uf = (UserFuncExpression) op3; + EvalFunc<?> ef = (EvalFunc<?>) PigContext.instantiateFuncFromSpec(uf + .getFuncSpec()); + + // check if the evaluate function is + // algebraic so that we + // can apply our optimization + if (ef instanceof Algebraic) { + bOptimization = true; + Operator op4 = loplan2.getSuccessors(op3).get(0); + if (op4 instanceof DereferenceExpression) { + DereferenceExpression deref = (DereferenceExpression) op4; + } + } else { + bOptimization = false; + } + } else { + + } + } + } + + } + } + } + } + + return bOptimization; + } + + @Override + /** + * If the OperatorPlan which was checked is matched the rule, transform it. + */ + public void transform(OperatorPlan matched) throws FrontendException { + // TODO Auto-generated method stub + + // the original rollup index in comparison to others operations + // before being transformed + int rollupUFIndex = 0; + + // the number of fields that involve in the rollup operation + int rollupSize = 0; + + // the original position of the first field of the rollup operation + int rollupOldFieldIndex = 0; + + // number of fields that involve in the CUBE clause + int dimensionSize = 0; + + // the rollup first field index in comparison to others fields after + // being transformed + int rollupFieldIndex = 0; + + // number of user function + int ufSize = 0; + + // number of Algebraic functions that used after rollup + int nAlgebraic = 0; + + subPlan = new OperatorSubPlan(currentPlan); + + LOForEach foreach1 = (LOForEach) matched.getSources().get(0); + + List<LogicalFieldSchema> foreach_field_lst = foreach1.getSchema().getFields(); + + LOCogroup cogroup = (LOCogroup) matched.getSuccessors(foreach1).get(0); + + LOForEach foreach2 = (LOForEach) currentPlan.getSuccessors(cogroup).get(0); + + //Count number of Algebraic functions that used after rollup + //this will be used to create number of value in the marker tuple. + Iterator<Operator> it2 = foreach2.getInnerPlan().getOperators(); + while(it2.hasNext()) { + Operator op = it2.next(); + if (op instanceof LOGenerate){ + List<LogicalExpressionPlan> inner_leplan = ((LOGenerate) op).getOutputPlans(); + for (LogicalExpressionPlan op2 : inner_leplan) { + for (Operator op3 : op2.getSources()) { + if (op3 instanceof UserFuncExpression) { + nAlgebraic++; + } + } + } + } + } + + Iterator<Operator> it = foreach1.getInnerPlan().getOperators(); + while (it.hasNext()) { + Operator op = it.next(); + if (op instanceof LOGenerate) { + List<LogicalExpressionPlan> innerLEPlan = ((LOGenerate) op).getOutputPlans(); + + // If there is a rollup and multiple cubes, we move + // the rollup operation to the end of the operations. + // Update the indexes of the rollup operation and its + // fields. + for (LogicalExpressionPlan op2 : innerLEPlan) { + for (Operator op3 : op2.getSources()) { + if (op3 instanceof UserFuncExpression) { + + ufSize++; + + UserFuncExpression uf = (UserFuncExpression) op3; + + dimensionSize += uf.getFieldSchema().schema.getFields().get(0).schema.getFields() + .size(); + + if (uf.getFuncSpec().toString().equals(ROLLUP_UDF)) { + + nRollup--; + if (nRollup == 0) { + // the original position of the first + // field of the rollup operation + rollupOldFieldIndex = dimensionSize + - uf.getFieldSchema().schema.getFields().get(0).schema.getFields() + .size(); + + LogicalFieldSchema first_rollup = uf.getFieldSchema().schema.getFields().get(0).schema + .getFields().get(0); + + for (LogicalFieldSchema a : foreach_field_lst) + if (a.alias.equals(first_rollup.alias)) { + rollupUFIndex = innerLEPlan.indexOf(op2); + break; + } + + rollupSize = uf.getFieldSchema().schema.getFields().get(0).schema.getFields() + .size(); + + uf.setPivot(cogroup.getPivot()); + uf.setRollupHIIOptimizable(true); + cogroup.setRollupFieldIndex(dimensionSize - rollupSize); + cogroup.setRollupOldFieldIndex(rollupOldFieldIndex); + } + } + } + } + } + + for (LogicalExpressionPlan op2 : innerLEPlan) + for (Operator op3 : op2.getSources()) { + if (op3 instanceof UserFuncExpression) { + UserFuncExpression uf = (UserFuncExpression) op3; + if (uf.getFuncSpec().toString().equals(ROLLUP_UDF)) { + cogroup.setRollupFieldIndex(dimensionSize - rollupSize); + cogroup.setDimensionSize(dimensionSize); + cogroup.setNumberAlgebraic(nAlgebraic); + } + } + } + + // Move the rollup operation to the end of the operation + // list in case it doensnt stand at the end + if (rollupUFIndex < ufSize - 1) { + LogicalExpressionPlan temp = innerLEPlan.get(rollupUFIndex); + + for (int l = rollupUFIndex; l < ufSize - 1; l++) + innerLEPlan.set(l, innerLEPlan.get(l + 1)); + + innerLEPlan.set(ufSize - 1, temp); + } + rollupFieldIndex = dimensionSize - rollupSize; + } + } + + // Change the default partitioner to the RollupHII partitioner + cogroup.setCustomPartitioner(ROLLUP_PARTITIONER); + + // Create a new LORollupHIIForEach logical operator + LORollupHIIForEach hfe = new LORollupHIIForEach(foreach2); + + // Setup the pivot position for the new LORollupHIIForEach logical + // operator. Setup the old and new rollup index. + hfe.setPivot(cogroup.getPivot()); + + if (cogroup.getPivot() == 0) { + hfe.setOnlyIRG(); + } + + hfe.setRollupFieldIndex(rollupFieldIndex); + hfe.setRollupOldFieldIndex(rollupOldFieldIndex); + hfe.setRollupSize(rollupSize); + hfe.setDimensionSize(dimensionSize); + + // Replace the old LOForEach to our new LORollupHIIForEach. + // Transformation done. + currentPlan.replace(foreach2, hfe); + + subPlan.add(foreach1); + subPlan.add(hfe); + } + + @Override + public OperatorPlan reportChanges() { + + // TODO Auto-generated method stub + return subPlan; + } + + } +} Modified: pig/trunk/src/org/apache/pig/parser/AliasMasker.g URL: http://svn.apache.org/viewvc/pig/trunk/src/org/apache/pig/parser/AliasMasker.g?rev=1645056&r1=1645055&r2=1645056&view=diff ============================================================================== --- pig/trunk/src/org/apache/pig/parser/AliasMasker.g (original) +++ pig/trunk/src/org/apache/pig/parser/AliasMasker.g Fri Dec 12 20:15:26 2014 @@ -247,6 +247,10 @@ cube_clause : ^( CUBE cube_item ) ; +pivot_clause + : ^( PIVOT INTEGER ) +; + cube_item : rel ( cube_by_clause ) ; @@ -260,7 +264,7 @@ cube_or_rollup ; cube_rollup_list - : ^( ( CUBE | ROLLUP ) cube_by_expr_list ) + : ^( CUBE cube_by_expr_list ) | ^( ROLLUP cube_by_expr_list pivot_clause? ) ; cube_by_expr_list @@ -642,6 +646,7 @@ eid : rel_str_op | FOREACH | CUBE | ROLLUP + | PIVOT | MATCHES | ORDER | RANK Modified: pig/trunk/src/org/apache/pig/parser/AstPrinter.g URL: http://svn.apache.org/viewvc/pig/trunk/src/org/apache/pig/parser/AstPrinter.g?rev=1645056&r1=1645055&r2=1645056&view=diff ============================================================================== --- pig/trunk/src/org/apache/pig/parser/AstPrinter.g (original) +++ pig/trunk/src/org/apache/pig/parser/AstPrinter.g Fri Dec 12 20:15:26 2014 @@ -106,6 +106,10 @@ parallel_clause : ^( PARALLEL INTEGER ) { sb.append(" ").append($PARALLEL.text).append(" ").append($INTEGER.text); } ; +pivot_clause + : ^( PIVOT INTEGER ) { sb.append(" ").append($PIVOT.text).append(" ").append($INTEGER.text); } +; + alias : IDENTIFIER { sb.append($IDENTIFIER.text); } ; @@ -262,7 +266,7 @@ cube_or_rollup ; cube_rollup_list - : ^( ( CUBE { sb.append($CUBE.text).append("("); } | ROLLUP { sb.append($ROLLUP.text).append("("); } ) cube_by_expr_list { sb.append(")"); }) + : ^( CUBE { sb.append($CUBE.text).append("("); } cube_by_expr_list { sb.append(")"); } ) | ^( ROLLUP { sb.append($ROLLUP.text).append("("); } cube_by_expr_list { sb.append(")"); } ) ; cube_by_expr_list @@ -270,7 +274,7 @@ cube_by_expr_list ; cube_by_expr - : col_range | expr | STAR { sb.append($STAR.text); } + : col_range | expr | STAR { sb.append($STAR.text); } { sb.append(" "); } ; group_clause @@ -672,6 +676,7 @@ eid : rel_str_op | FOREACH { sb.append($FOREACH.text); } | CUBE { sb.append($CUBE.text); } | ROLLUP { sb.append($ROLLUP.text); } + | PIVOT { sb.append($PIVOT.text); } | MATCHES { sb.append($MATCHES.text); } | ORDER { sb.append($ORDER.text); } | RANK { sb.append($RANK.text); } Modified: pig/trunk/src/org/apache/pig/parser/AstValidator.g URL: http://svn.apache.org/viewvc/pig/trunk/src/org/apache/pig/parser/AstValidator.g?rev=1645056&r1=1645055&r2=1645056&view=diff ============================================================================== --- pig/trunk/src/org/apache/pig/parser/AstValidator.g (original) +++ pig/trunk/src/org/apache/pig/parser/AstValidator.g Fri Dec 12 20:15:26 2014 @@ -296,6 +296,10 @@ cube_clause : ^( CUBE cube_item ) ; +pivot_clause + : ^( PIVOT INTEGER ) +; + cube_item : rel ( cube_by_clause ) ; @@ -309,7 +313,7 @@ cube_or_rollup ; cube_rollup_list - : ^( ( CUBE | ROLLUP ) cube_by_expr_list ) + : ^( CUBE cube_by_expr_list ) | ^( ROLLUP cube_by_expr_list pivot_clause? ) ; cube_by_expr_list @@ -663,6 +667,7 @@ eid : rel_str_op | FOREACH | CUBE | ROLLUP + | PIVOT | MATCHES | ORDER | RANK Modified: pig/trunk/src/org/apache/pig/parser/LogicalPlanBuilder.java URL: http://svn.apache.org/viewvc/pig/trunk/src/org/apache/pig/parser/LogicalPlanBuilder.java?rev=1645056&r1=1645055&r2=1645056&view=diff ============================================================================== --- pig/trunk/src/org/apache/pig/parser/LogicalPlanBuilder.java (original) +++ pig/trunk/src/org/apache/pig/parser/LogicalPlanBuilder.java Fri Dec 12 20:15:26 2014 @@ -451,10 +451,30 @@ public class LogicalPlanBuilder { return new LOCube(plan); } + void setPivotRollupCubeOp(LOCube op, Integer pivot) throws ParserValidationException { + if(pivot!=null) + op.setPivot(pivot); + } + String buildCubeOp(SourceLocation loc, LOCube op, String alias, String inputAlias, List<String> operations, MultiMap<Integer, LogicalExpressionPlan> expressionPlans) throws ParserValidationException { + // check value of pivot if it is valid or not, if not pivot position + // is specified, the pivot at middle position will be chosen + try { + if(op.getPivot()!=-1) { + if (op.getPivot() < 0 || op.getPivot() >= expressionPlans.get(0).size()) { + FrontendException fe = new FrontendException("PIVOT is out of bound"); + throw fe; + } + } + else + op.setPivot((int)(Math.round(expressionPlans.get(0).size()/2.0))); + } catch (FrontendException e) { + throw new ParserValidationException(intStream, loc, e); + } + // check if continuously occurring cube operations be combined combineCubeOperations((ArrayList<String>) operations, expressionPlans); @@ -713,6 +733,7 @@ public class LogicalPlanBuilder { // build group by operator try { + groupby.setPivot(op.getPivot()); return buildGroupOp(loc, (LOCogroup) groupby, op.getAlias(), inpAliases, exprPlansCopy, GROUPTYPE.REGULAR, innerFlags, null); } catch (ParserValidationException pve) { Modified: pig/trunk/src/org/apache/pig/parser/LogicalPlanGenerator.g URL: http://svn.apache.org/viewvc/pig/trunk/src/org/apache/pig/parser/LogicalPlanGenerator.g?rev=1645056&r1=1645055&r2=1645056&view=diff ============================================================================== --- pig/trunk/src/org/apache/pig/parser/LogicalPlanGenerator.g (original) +++ pig/trunk/src/org/apache/pig/parser/LogicalPlanGenerator.g Fri Dec 12 20:15:26 2014 @@ -493,12 +493,20 @@ func_args returns[List<String> args] // It also outputs the order of operations i.e in this case CUBE operation followed by ROLLUP operation // These inputs are passed to buildCubeOp methods which then builds the logical plan for CUBE operator. // If user specifies STAR or RANGE expression for dimensions then it will be expanded inside buildCubeOp. +pivot_clause returns[int pivot] + : ^( PIVOT INTEGER ) + { + $pivot = Integer.parseInt( $INTEGER.text ); + } +; + cube_clause returns[String alias] scope { LOCube cubeOp; MultiMap<Integer, LogicalExpressionPlan> cubePlans; List<String> operations; int inputIndex; + int pivot; } scope GScope; @init { @@ -548,7 +556,7 @@ cube_rollup_list returns[String operatio @init { $plans = new ArrayList<LogicalExpressionPlan>(); } - : ^( ( CUBE { $operation = "CUBE"; } | ROLLUP { $operation = "ROLLUP"; } ) cube_by_expr_list { $plans = $cube_by_expr_list.plans; } ) + : ^( CUBE { $operation = "CUBE"; } cube_by_expr_list { $plans = $cube_by_expr_list.plans; } ) | ^( ROLLUP { $operation = "ROLLUP"; } cube_by_expr_list { $plans = $cube_by_expr_list.plans; } pivot_clause? { if ($pivot_clause.tree!=null) builder.setPivotRollupCubeOp($cube_clause::cubeOp, $pivot_clause.pivot); } ) ; cube_by_expr_list returns[List<LogicalExpressionPlan> plans] @@ -1941,6 +1949,7 @@ eid returns[String id] : rel_str_op { $i | COGROUP { $id = $COGROUP.text; } | CUBE { $id = $CUBE.text; } | ROLLUP { $id = $ROLLUP.text; } + | PIVOT { $id = $PIVOT.text; } | JOIN { $id = $JOIN.text; } | CROSS { $id = $CROSS.text; } | UNION { $id = $UNION.text; } Modified: pig/trunk/src/org/apache/pig/parser/QueryLexer.g URL: http://svn.apache.org/viewvc/pig/trunk/src/org/apache/pig/parser/QueryLexer.g?rev=1645056&r1=1645055&r2=1645056&view=diff ============================================================================== --- pig/trunk/src/org/apache/pig/parser/QueryLexer.g (original) +++ pig/trunk/src/org/apache/pig/parser/QueryLexer.g Fri Dec 12 20:15:26 2014 @@ -153,6 +153,9 @@ ONSCHEMA : 'ONSCHEMA' PARALLEL : 'PARALLEL' ; +PIVOT : 'PIVOT' +; + PARTITION : 'PARTITION' ; Modified: pig/trunk/src/org/apache/pig/parser/QueryParser.g URL: http://svn.apache.org/viewvc/pig/trunk/src/org/apache/pig/parser/QueryParser.g?rev=1645056&r1=1645055&r2=1645056&view=diff ============================================================================== --- pig/trunk/src/org/apache/pig/parser/QueryParser.g (original) +++ pig/trunk/src/org/apache/pig/parser/QueryParser.g Fri Dec 12 20:15:26 2014 @@ -595,7 +595,10 @@ union_clause : UNION^ ONSCHEMA? rel_list cube_clause : CUBE rel BY cube_rollup_list ( COMMA cube_rollup_list )* -> ^( CUBE rel ^( BY cube_rollup_list+ ) ) ; -cube_rollup_list : ( CUBE | ROLLUP )^ LEFT_PAREN! real_arg ( COMMA! real_arg )* RIGHT_PAREN! +cube_rollup_list : ( CUBE^ LEFT_PAREN! real_arg ( COMMA! real_arg )* RIGHT_PAREN! ) | ( ROLLUP^ LEFT_PAREN! real_arg ( COMMA! real_arg )* RIGHT_PAREN! pivot_clause? ) +; + +pivot_clause : PIVOT^ INTEGER ; flatten_clause : FLATTEN^ LEFT_PAREN! expr RIGHT_PAREN!
