Added: pig/trunk/src/org/apache/pig/backend/hadoop/executionengine/physicalLayer/relationalOperators/PORollupHIIForEach.java URL: http://svn.apache.org/viewvc/pig/trunk/src/org/apache/pig/backend/hadoop/executionengine/physicalLayer/relationalOperators/PORollupHIIForEach.java?rev=1645056&view=auto ============================================================================== --- pig/trunk/src/org/apache/pig/backend/hadoop/executionengine/physicalLayer/relationalOperators/PORollupHIIForEach.java (added) +++ pig/trunk/src/org/apache/pig/backend/hadoop/executionengine/physicalLayer/relationalOperators/PORollupHIIForEach.java Fri Dec 12 20:15:26 2014 @@ -0,0 +1,1423 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.pig.backend.hadoop.executionengine.physicalLayer.relationalOperators; + +import java.io.IOException; +import java.util.ArrayList; +import java.util.BitSet; +import java.util.Iterator; +import java.util.List; + +import org.apache.pig.PigException; +import org.apache.pig.backend.executionengine.ExecException; +import org.apache.pig.backend.hadoop.executionengine.physicalLayer.POStatus; +import org.apache.pig.backend.hadoop.executionengine.physicalLayer.PhysicalOperator; +import org.apache.pig.backend.hadoop.executionengine.physicalLayer.Result; +import org.apache.pig.backend.hadoop.executionengine.physicalLayer.expressionOperators.ExpressionOperator; +import org.apache.pig.backend.hadoop.executionengine.physicalLayer.expressionOperators.POProject; +import org.apache.pig.backend.hadoop.executionengine.physicalLayer.expressionOperators.PORelationToExprProject; +import org.apache.pig.backend.hadoop.executionengine.physicalLayer.expressionOperators.POUserFunc; +import org.apache.pig.backend.hadoop.executionengine.physicalLayer.plans.PhyPlanVisitor; +import org.apache.pig.backend.hadoop.executionengine.physicalLayer.plans.PhysicalPlan; +import org.apache.pig.data.AccumulativeBag; +import org.apache.pig.data.BagFactory; +import org.apache.pig.data.DataBag; +import org.apache.pig.data.DataType; +import org.apache.pig.data.SchemaTupleClassGenerator.GenContext; +import org.apache.pig.data.SchemaTupleFactory; +import org.apache.pig.data.Tuple; +import org.apache.pig.data.TupleFactory; +import org.apache.pig.data.TupleMaker; +import org.apache.pig.impl.logicalLayer.schema.Schema; +import org.apache.pig.impl.plan.DependencyOrderWalker; +import org.apache.pig.impl.plan.NodeIdGenerator; +import org.apache.pig.impl.plan.OperatorKey; +import org.apache.pig.impl.plan.VisitorException; +import org.apache.pig.pen.util.ExampleTuple; +import org.apache.pig.pen.util.LineageTracer; + +/** + * This class provides a new ForEach physical operator to handle the ROLLUP with + * hybrid IRG when the Rollup Optimizer is activated. This class contains almost + * the same as POForEach class excepts some functions for the Hybrid IRG stuffs. + */ +// We intentionally skip type checking in backend for performance reasons +@SuppressWarnings("unchecked") +public class PORollupHIIForEach extends POForEach { + private static final long serialVersionUID = 1L; + + protected List<PhysicalPlan> inputPlans; + protected List<PhysicalOperator> opsToBeReset; + + protected static final TupleFactory mTupleFactory = TupleFactory.getInstance(); + + // Since the plan has a generate, this needs to be maintained + // as the generate can potentially return multiple tuples for + // same call. + protected boolean processingPlan = false; + + // its holds the iterators of the databags given by the input expressions + // which need flattening. + transient protected Iterator<Tuple>[] its = null; + + // This holds the outputs given out by the input expressions of any datatype + protected Object[] bags = null; + + // This is the template whcih contains tuples and is flattened out in + // createTuple() to generate the final output + protected Object[] data = null; + + // store result types of the plan leaves + protected byte[] resultTypes = null; + + // store whether or not an accumulative UDF has terminated early + protected BitSet earlyTermination = null; + + // array version of isToBeFlattened - this is purely + // for optimization - instead of calling isToBeFlattened.get(i) + // we can do the quicker array access - isToBeFlattenedArray[i]. + // Also we can store "boolean" values rather than "Boolean" objects + // so we can also save on the Boolean.booleanValue() calls + protected boolean[] isToBeFlattenedArray; + + ExampleTuple tIn = null; + protected int noItems; + + protected PhysicalOperator[] planLeafOps = null; + + protected transient AccumulativeTupleBuffer buffer; + + protected Tuple inpTuple; + + private Schema schema; + + // start adding new variables + + // The first tuple that stores the value of the previous Rollup Dimension + // for the first IRG + protected Tuple prevRollupDimension = null; + + // The second tuple that stores the value of the previous Rollup Dimension + // for the second IRG + protected Tuple prevRollupDimension2 = null; + + protected Tuple currentRollupDimension = null; + + // This holds the payload values for the first IRG + protected DataBag[][] tmpResult; + + // This holds the payload values for the second IRG + protected DataBag[][] tmpResult2; + + // This holds the result tuples for the first IRG + protected Result[] returnRes; + + // This holds the result tuples for the second IRG + protected Result[] returnRes2; + + // To check if we can work on the second IRG or not + protected boolean secondPass = false; + + // The pivot position of the rollup operation + protected int pivot = -1; + + // To check if we finished the first IRG or not + protected boolean finIRG1 = false; + + // To check if we finished the second IRG or not + protected boolean finIRG2 = false; + + protected int noUserFunc = 0; + + //the size of total fields that involve in CUBE clause + protected int dimensionSize = 0; + + // These variables below are used in case the rollup operation has been + // moved to the end of the operation list. + + //the index of the first field involves in ROLLUP + protected int rollupFieldIndex = 0; + + //Use to check if we are using IRG or IRG+IRG + protected boolean onlyIRG = false; + + //Use to check the field at pivot position + protected int conditionPosition = 0; + + //Number of fields that involve in ROLLUP + protected int rollupSize = 0; + + //the original index of the first field involves in ROLLUP in case it was moved to the end + //(if we have the combination of cube and rollup) + protected int rollupOldFieldIndex = 0; + + //This array stores the order of each fields in CUBE clause + //before ROLLUP is moved (in case CUBE clause is the combination + //of CUBE and ROLLUP. + protected int outputIndex[] = null; + + //Check if we have already met the marker tuple or not + protected boolean finished = false; + + protected static final BagFactory mBagFactory = BagFactory.getInstance(); + + // finish adding new variables + + /** + * We create a template for output the fields in a tuple in case the rollup + * operation has been moved to the end of the operation list + * + * @param len + */ + public void outputIndexInit(int len) { + outputIndex = new int[len]; + for (int i = 0; i < len - this.rollupOldFieldIndex; i++) + if (i < this.rollupOldFieldIndex) + outputIndex[i] = i; + else + outputIndex[i] = i + rollupSize; + + int count = this.rollupOldFieldIndex; + + for (int i = len - this.rollupFieldIndex; i < len; i++) + outputIndex[i] = count++; + } + + public void setOnlyIRG() { + onlyIRG = true; + } + + /** + * Set the original index of the first field of Rollup operation In case the + * rollup operation has been moved to the end of the operation list + * + * @param rofi + */ + public void setRollupOldFieldIndex(int rofi) { + this.rollupOldFieldIndex = rofi; + } + + public int getRollupOldFieldIndex() { + return this.rollupOldFieldIndex; + } + + public void setRollupSize(int rs) { + this.rollupSize = rs; + } + + public int getRollupSize() { + return this.rollupSize; + } + + public void setDimensionSize(int ds) { + this.dimensionSize = ds; + } + + public int getDimensionSize() { + return this.dimensionSize; + } + + /** + * Set the updated index of the first field of Rollup operation and also + * update the new pivot position due to the change of the rollup operation + * position In case the rollup operation has been moved to the end of the + * operation list + * + * @param rfi + */ + public void setRollupFieldIndex(int rfi) { + this.rollupFieldIndex = rfi; + pivot = pivot + rollupFieldIndex; + conditionPosition = pivot; + } + + public int getRollupFieldIndex() { + return this.rollupFieldIndex; + } + + public void setPivot(int pvt) { + this.pivot = pvt; + } + + public int getPivot() { + return this.pivot; + } + + public PORollupHIIForEach(OperatorKey k) { + this(k, -1, null, null); + } + + public PORollupHIIForEach(OperatorKey k, int rp) { + this(k, rp, null, null); + } + + public PORollupHIIForEach(OperatorKey k, List inp) { + this(k, -1, inp, null); + } + + public PORollupHIIForEach(OperatorKey k, int rp, List<PhysicalPlan> inp, List<Boolean> isToBeFlattened) { + super(k, rp); + setUpFlattens(isToBeFlattened); + this.inputPlans = inp; + opsToBeReset = new ArrayList<PhysicalOperator>(); + getLeaves(); + } + + public PORollupHIIForEach(OperatorKey operatorKey, int requestedParallelism, List<PhysicalPlan> innerPlans, + List<Boolean> flattenList, Schema schema) { + this(operatorKey, requestedParallelism, innerPlans, flattenList); + this.schema = schema; + } + + public void visit(PhyPlanVisitor v) throws VisitorException { + v.visitPOForEach(this); + } + + @Override + public String name() { + return getAliasString() + "New Rollup HII For Each" + " (" + getFlatStr() + ")" + "[" + + DataType.findTypeName(resultType) + "]" + " - " + mKey.toString(); + } + + String getFlatStr() { + if (isToBeFlattenedArray == null) { + return ""; + } + StringBuilder sb = new StringBuilder(); + for (Boolean b : isToBeFlattenedArray) { + sb.append(b); + sb.append(','); + } + if (sb.length() > 0) { + sb.deleteCharAt(sb.length() - 1); + } + return sb.toString(); + } + + public boolean supportsMultipleInputs() { + return false; + } + + @Override + public boolean supportsMultipleOutputs() { + return false; + } + + @Override + public void setAccumulative() { + super.setAccumulative(); + for (PhysicalPlan p : inputPlans) { + Iterator<PhysicalOperator> iter = p.iterator(); + while (iter.hasNext()) { + PhysicalOperator po = iter.next(); + if (po instanceof ExpressionOperator || po instanceof PODistinct) { + po.setAccumulative(); + } + } + } + } + + @Override + public void setAccumStart() { + super.setAccumStart(); + for (PhysicalPlan p : inputPlans) { + Iterator<PhysicalOperator> iter = p.iterator(); + while (iter.hasNext()) { + PhysicalOperator po = iter.next(); + if (po instanceof ExpressionOperator || po instanceof PODistinct) { + po.setAccumStart(); + } + } + } + } + + @Override + public void setAccumEnd() { + super.setAccumEnd(); + for (PhysicalPlan p : inputPlans) { + Iterator<PhysicalOperator> iter = p.iterator(); + while (iter.hasNext()) { + PhysicalOperator po = iter.next(); + if (po instanceof ExpressionOperator || po instanceof PODistinct) { + po.setAccumEnd(); + } + } + } + } + + /** + * Compute the rollup operation for the first IRG + * + * @throws ExecException + */ + protected void computeRollup() throws ExecException { + int len = prevRollupDimension.size(); + int iMulti = -1; + int index = 0; + + //Check if the ROLLUP has been moved to the end in case + //there are CUBE and ROLLUP, update the index field that + //we need to do ROLLUP. + if (rollupFieldIndex != 0) + index = rollupFieldIndex - 1; + + if (prevRollupDimension.get(len - 1) != null) { + //find the maximum value that differs the currentRollupDimension and prevRollupDimension + //in case there is a combination of CUBE(s) and ROLLUP(s). + for (int i = 0; i < rollupFieldIndex; i++) + if (DataType.compare(currentRollupDimension.get(i), prevRollupDimension.get(i)) != 0) { + iMulti = rollupFieldIndex - 1; + break; + } + + //Calculate the rollup if the currentRollupDimension and preRollupDimension are different + //in field that has index is in range of ROLLUP or there are multiple ROLLUP/CUBE or this is + //the call from finish() + for (int i = index; i < len - 1; i++) { + if (DataType.compare(currentRollupDimension.get(i), prevRollupDimension.get(i)) != 0 + || (rollupFieldIndex != 0 && finIRG1 == true) || iMulti != -1) { + + // find the maximum value of the first index, which differs + // the currentRollupDimension and prevRollupDimension, and the pivot + int iTemp = Math.max(i, iMulti); + int x = Math.max(iTemp, pivot); + + // create the missing tuples for rollup operation in the + // first IRG (in HII) or the IRG(in only IRG) + for (int j = len - 2; j >= x; j--) { + Tuple group = mTupleFactory.newTuple(); + for (int k = 0; k < len; k++) { + if (k <= j) { + group.append(prevRollupDimension.get(k)); + } else { + group.append(null); + } + } + + //Store the value of this tuple in the tmpResult + Tuple out = mTupleFactory.newTuple(); + out.append(group); + + for (int k = 0; k < noUserFunc; k++) + out.append(tmpResult[j + 1][k]); + + //call the processPlan for this new tuple to update + //the value in tmpResult. + attachInputToPlans(out); + returnRes[j + 1] = processPlan(j); + //After returning these missing tuples, clear them in the tmpResult. + for (int k = 0; k < noUserFunc; k++) { + tmpResult[j + 1][k].clear(); + } + } + break; + } + } + } else { + for (int j = 0; j < len; j++) + for (int k = 0; k < noUserFunc; k++) { + tmpResult[j][k].clear(); + } + } + } + + /** + * Compute the rollup operation for the second IRG + * + * @throws ExecException + */ + protected void computeRollup2() throws ExecException { + int len = prevRollupDimension2.size(); + + int index = 0; + int iMulti = -1; + + //Check if the ROLLUP has been moved to the end in case + //there are CUBE and ROLLUP, update the index field that + //we need to do ROLLUP. + if (rollupFieldIndex != 0) + index = rollupFieldIndex - 1; + + //find the maximum value that differs the currentRollupDimension and prevRollupDimension + //in case there is a combination of CUBE(s) and ROLLUP(s). + for (int i = 0; i < rollupFieldIndex; i++) + if (DataType.compare(currentRollupDimension.get(i), prevRollupDimension2.get(i)) != 0) { + iMulti = rollupFieldIndex - 1; + break; + } + + //Calculate the rollup if the currentRollupDimension and preRollupDimension2 are different + //in field that has index is in range of ROLLUP or there are multiple ROLLUP/CUBE or this is + //the call from finish() + for (int i = index; i < pivot; i++) { + if (DataType.compare(currentRollupDimension.get(i), prevRollupDimension2.get(i)) != 0 || finIRG2 == true + || iMulti != -1) { + int x = Math.max(iMulti, i); + // create the missing tuples for rollup operation in the second + // IRG + for (int j = pivot - 2; j >= x; j--) { + Tuple group = mTupleFactory.newTuple(); + for (int k = 0; k < len; k++) { + if (k <= j) { + group.append(prevRollupDimension2.get(k)); + } else { + group.append(null); + } + } + + //Store the value of this tuple in the tmpResult + Tuple out = mTupleFactory.newTuple(); + out.append(group); + for (int k = 0; k < noUserFunc; k++) + out.append(tmpResult2[j + 1][k]); + + //call the processPlan for this new tuple to update + //the value in tmpResult. + attachInputToPlans(out); + returnRes2[j + 1] = processPlan(j); + //After returning these missing tuples, clear them in the tmpResult. + for (int k = 0; k < noUserFunc; k++) + tmpResult2[j + 1][k].clear(); + } + break; + } + } + } + + /** + * Call the final aggregation for the IRGs and return the results. + * + * @return returnRes + * @throws ExecException + */ + public Result[] finish() throws ExecException { + if (prevRollupDimension != null) { + + if (rollupFieldIndex == 0) + currentRollupDimension = mTupleFactory.newTuple(prevRollupDimension.size()); + + finIRG1 = true; + secondPass = false; + computeRollup(); + secondPass = true; + } + + //compute the final aggregation for the first IRG + if (prevRollupDimension2 == null && prevRollupDimension != null) { + //If there is only one ROLLUP, create new tuple + //If there is multiple ROLLUP or combination of + //ROLLUP and CUBE, we've already calculated the + //last tuple ((,,,,) for example) when we compute rollup + if (rollupFieldIndex == 0) + computeFinalAggregation(); + return returnRes; + } + + //if this is a IRG+IRG, we must compute the final aggregation for the + //second IRG. + if (secondPass) { + //If there is only one ROLLUP, create new tuple + //If there is multiple ROLLUP or combination of + //ROLLUP and CUBE, we've already calculated the + //last tuple ((,,,,) for example) when we compute rollup + if (rollupFieldIndex == 0) + currentRollupDimension = mTupleFactory.newTuple(prevRollupDimension2.size()); + finIRG2 = true; + computeRollup2(); + if (pivot != 0) + computeFinalAggregation2(); + } + + return returnRes; + } + + /** + * Compute the final aggregation for the second IRG + * + * @throws ExecException + */ + protected void computeFinalAggregation2() throws ExecException { + Tuple group = mTupleFactory.newTuple(); + //Create a tuple that all fields are null, do the rollup + //for the rest values and tuples that are still stored in + //returnRes2 and tmpResult2 + for (int k = 0; k < prevRollupDimension2.size(); k++) + group.append(null); + Tuple out = mTupleFactory.newTuple(); + out.append(group); + for (int k = 0; k < noUserFunc; k++) + out.append(tmpResult2[0][k]); + attachInputToPlans(out); + + if(returnRes == null) { + returnRes = new Result[dimensionSize]; + for (int i = 0; i < dimensionSize; i++) { + returnRes[i] = null; + } + } + + if (rollupFieldIndex == 0) + if(returnRes!=null) + returnRes[0] = processPlan(-1); + else + returnRes2[0] = processPlan(-1); + //Move all the not-null tuples from returnRes2 to returnRes, + //so the returnRes will contain all the remaining tuples from two IRGs. + for (int i = 0; i < prevRollupDimension2.size(); i++) + if (returnRes[i] == null && returnRes2[i] != null) + returnRes[i] = returnRes2[i]; + } + + /** + * Compute the final aggregation for the first IRG + * + * @throws ExecException + */ + protected void computeFinalAggregation() throws ExecException { + Tuple group = mTupleFactory.newTuple(); + //Create a tuple that all fields are null, do the rollup + //for the rest values and tuples that are still stored in + //returnRes and tmpResult + for (int k = 0; k < prevRollupDimension.size(); k++) + group.append(null); + Tuple out = mTupleFactory.newTuple(); + out.append(group); + for (int k = 0; k < noUserFunc; k++) + out.append(tmpResult[0][k]); + attachInputToPlans(out); + //-1 is passed to processPlan, this is the last tuple + //that this PORollupHIIForEach will output + returnRes[0] = processPlan(-1); + } + + private boolean isEarlyTerminated() { + return isEarlyTerminated; + } + + private void earlyTerminate() { + isEarlyTerminated = true; + } + + /** + * Calls getNext on the generate operator inside the nested physical plan + * and returns it maintaining an additional state to denote the begin and + * end of the nested plan processing. + **/ + @Override + public Result getNextTuple() throws ExecException { + try { + Result res = null; + Result inp = null; + // The nested plan is under processing + // So return tuples that the generate oper + // returns + + // Return the result if it's still also in the returnRes + if (prevRollupDimension != null) { + for (int i = prevRollupDimension.size() - 1; i >= 0; i--) + if (returnRes[i] != null) { + res = returnRes[i]; + returnRes[i] = null; + return res; + } + } + + // Return the result if it's still also in the returnRes2 + // We only go to the for loop if prevRollupDimension2 is not null + // and we have not called yet the finish function. + if (prevRollupDimension2 != null && !finished) { + for (int i = prevRollupDimension2.size() - 1; i >= 0; i--) + if (returnRes2[i] != null) { + res = returnRes2[i]; + returnRes2[i] = null; + return res; + } + } + + if (processingPlan) { + while (true) { + res = processPlan(currentRollupDimension.size() - 1); + + if (res.returnStatus == POStatus.STATUS_OK) { + return res; + } + if (res.returnStatus == POStatus.STATUS_EOP) { + processingPlan = false; + for (PhysicalPlan plan : inputPlans) { + plan.detachInput(); + } + break; + } + if (res.returnStatus == POStatus.STATUS_ERR) { + return res; + } + if (res.returnStatus == POStatus.STATUS_NULL) { + continue; + } + } + } + // The nested plan processing is done or is + // yet to begin. So process the input and start + // nested plan processing on the input tuple + // read + while (true) { + inp = processInput(); + if (inp.returnStatus == POStatus.STATUS_EOP || inp.returnStatus == POStatus.STATUS_ERR) { + return inp; + } + if (inp.returnStatus == POStatus.STATUS_NULL) { + continue; + } + + inpTuple = (Tuple) inp.result; + + // Initiate the currentRollupDimension + currentRollupDimension = null; + + if (inp.returnStatus == POStatus.STATUS_EOP) { + return inp; + } + + int len = 0; + if (inpTuple.getType(0) == DataType.TUPLE) { + currentRollupDimension = (Tuple) inpTuple.get(0); + len = currentRollupDimension.size(); + + boolean checkLast = false; + + // The special record which has size larger than the default + // one to mark that we went through the last record, compute the + // final rollup aggregation by calling finish() +// if (prevRollupDimension != null) +// if (len > prevRollupDimension.size()) +// checkLast = true; + + if (len > dimensionSize) + checkLast = true; + + if (checkLast) { + // Get the reducer's index + int checkFirstReducer = (Integer) currentRollupDimension.get(len - 1); + Result tmp[] = finish(); + + if (tmp != null) { + finished = true; // called finished + res = new Result(); + // Change the NULL tuple to one of the remaining tuples + if(returnRes!=null) { + for(int i = dimensionSize - 1; i >=0; i--) + if(returnRes[i]!=null) { + res.result = returnRes[i].result; + res.returnStatus = POStatus.STATUS_OK; + returnRes[i] = null; + break; + } + } else { + for(int i = dimensionSize - 1; i >=0; i--) + if(returnRes2[i]!=null) { + res.result = returnRes2[i].result; + res.returnStatus = POStatus.STATUS_OK; + returnRes2[i] = null; + break; + } + } + // If this marker tuple is not for reducer0 + // so we just output the remaining results from the + // pivot to the end of the length + if (checkFirstReducer != 0) { + for (int i = 0; i < pivot; i++) + returnRes[i] = null; + if (pivot == 0) + returnRes[0] = null; + } + return res; + } + //tmp is null so this reducer received only the marker tuple + else{ + res = new Result(); + res.result = null; + res.returnStatus = POStatus.STATUS_EOP; + return res; + } + } + + // Initiate the output index template + if (outputIndex == null && rollupFieldIndex != 0) + this.outputIndexInit(len); + + } + + if (currentRollupDimension == null) { + res.returnStatus = POStatus.STATUS_ERR; + return res; + } + + // Check if the field at the (updated - in case we has moved the + // rollup operation to the end of the operation list) pivot + // position + // of the tuple is null or not. If it is not null, we compute + // the + // rollup using the first rollup, else, we compute the it using + // the second IRG + if (currentRollupDimension.get(conditionPosition) != null) { + secondPass = false; + if (prevRollupDimension != null) { + computeRollup(); + } else { + noUserFunc = 0; + for (int i = 0; i < noItems; i++) { + if ((planLeafOps[i]) instanceof POUserFunc) { + noUserFunc++; + } + } + tmpResult = new DataBag[len][noUserFunc]; + returnRes = new Result[len]; + + for (int i = 0; i < len; i++) { + for (int j = 0; j < noUserFunc; j++) { + tmpResult[i][j] = mBagFactory.newDefaultBag(); + } + returnRes[i] = null; + } + } + prevRollupDimension = currentRollupDimension; + } else { + secondPass = true; + if (prevRollupDimension2 != null) { + computeRollup2(); + } else { + noUserFunc = 0; + for (int i = 0; i < noItems; i++) { + if ((planLeafOps[i]) instanceof POUserFunc) { + noUserFunc++; + } + } + tmpResult2 = new DataBag[len][noUserFunc]; + returnRes2 = new Result[len]; + + for (int i = 0; i < len; i++) { + for (int j = 0; j < noUserFunc; j++) { + tmpResult2[i][j] = mBagFactory.newDefaultBag(); + } + returnRes2[i] = null; + } + } + prevRollupDimension2 = currentRollupDimension; + } + attachInputToPlans((Tuple) inp.result); + + for (PhysicalOperator po : opsToBeReset) { + po.reset(); + } + + if (isAccumulative()) { + for (int i = 0; i < inpTuple.size(); i++) { + if (inpTuple.getType(i) == DataType.BAG) { + // we only need to check one bag, because all the + // bags + // share the same buffer + buffer = ((AccumulativeBag) inpTuple.get(i)).getTuplebuffer(); + break; + } + } + + setAccumStart(); + while (true) { + if (!isEarlyTerminated() && buffer.hasNextBatch()) { + try { + buffer.nextBatch(); + } catch (IOException e) { + throw new ExecException(e); + } + } else { + inpTuple = ((POPackage.POPackageTupleBuffer) buffer).illustratorMarkup(null, inpTuple, 0); + // buffer.clear(); + setAccumEnd(); + } + + if (!secondPass) + returnRes[0] = processPlan(currentRollupDimension.size() - 1); + else if (pivot == 0) + returnRes2[0] = processPlan(pivot); + else + returnRes2[0] = processPlan(pivot - 1); + + if (res.returnStatus == POStatus.STATUS_BATCH_OK) { + // attach same input again to process next batch + attachInputToPlans((Tuple) inp.result); + } else if (res.returnStatus == POStatus.STATUS_EARLY_TERMINATION) { + // if this bubbled up, then we just need to pass a + // null value through the pipe + // so that POUserFunc will properly return the + // values + attachInputToPlans(null); + earlyTerminate(); + } else { + break; + } + } + + } else { + // if we are still in IRG1, we compute the rollup + // and store it in the returnRes + if (!secondPass) + returnRes[0] = processPlan(currentRollupDimension.size() - 1); + // else, we process the rollup and store it in the + // returnRes2 + // if the pivot is zero, it's IRG, else, we process at the + // (pivot - 1) + // because the pivot position user specified is always + // larger than the + // index in the rollup fields by one. + else if (pivot == 0) + returnRes2[0] = processPlan(pivot); + else + returnRes2[0] = processPlan(pivot - 1); + } + + processingPlan = true; + + // We return the result that we stored in returnRes or + // returnRes2 + for (int i = currentRollupDimension.size() - 1; i >= 0; i--) { + if (!secondPass) { + if (returnRes[i] != null) { + res = returnRes[i]; + returnRes[i] = null; + break; + } + } else { + if (returnRes2[i] != null) { + res = returnRes2[i]; + returnRes2[i] = null; + break; + } + } + } + return res; + } + } catch (RuntimeException e) { + throw new ExecException("Error while executing RollupHIIForEach at " + this.getOriginalLocations(), e); + } + } + + private boolean isEarlyTerminated = false; + private TupleMaker<? extends Tuple> tupleMaker; + + private boolean knownSize = false; + + protected Result processPlan(int pos) throws ExecException { + if (schema != null && tupleMaker == null) { + // Note here that if SchemaTuple is currently turned on, then any + // UDF's in the chain + // must follow good practices. Namely, they should not append to the + // Tuple that comes + // out of an iterator (a practice which is fairly common, but is not + // recommended). + tupleMaker = SchemaTupleFactory.getInstance(schema, false, GenContext.FOREACH); + if (tupleMaker != null) { + knownSize = true; + } + } + if (tupleMaker == null) { + tupleMaker = TupleFactory.getInstance(); + } + + Result res = new Result(); + + // We check if all the databags have exhausted the tuples. If so we + // enforce the reading of new data by setting data and its to null + if (its != null) { + boolean restartIts = true; + for (int i = 0; i < noItems; ++i) { + if (its[i] != null && isToBeFlattenedArray[i] == true) { + restartIts &= !its[i].hasNext(); + } + } + // this means that all the databags have reached their last + // elements. so we need to force reading of fresh databags + if (restartIts) { + its = null; + data = null; + } + } + + if (its == null) { + // getNext being called for the first time OR starting with a set of + // new data from inputs + its = new Iterator[noItems]; + bags = new Object[noItems]; + earlyTermination = new BitSet(noItems); + + int cnt = 0; + + for (int i = 0; i < noItems; ++i) { + // Getting the iterators + // populate the input data + Result inputData = null; + switch (resultTypes[i]) { + case DataType.BAG: + case DataType.TUPLE: + case DataType.BYTEARRAY: + case DataType.MAP: + case DataType.BOOLEAN: + case DataType.INTEGER: + case DataType.DOUBLE: + case DataType.LONG: + case DataType.FLOAT: + case DataType.BIGINTEGER: + case DataType.BIGDECIMAL: + case DataType.DATETIME: + case DataType.CHARARRAY: + inputData = planLeafOps[i].getNext(resultTypes[i]); + // We stores the values that we want to compute the rollup + // in tmpResult for the first IRG and in tmpResult2 for the second IRG + if (((planLeafOps[i]) instanceof POUserFunc) && (inputData.result != null) && (pos != -1)) + if (!secondPass) { + tmpResult[pos][cnt++].add(mTupleFactory.newTuple(inputData.result)); + } else { + tmpResult2[pos][cnt++].add(mTupleFactory.newTuple(inputData.result)); + } + break; + default: { + int errCode = 2080; + String msg = "Foreach currently does not handle type " + DataType.findTypeName(resultTypes[i]); + throw new ExecException(msg, errCode, PigException.BUG); + } + + } + + // we accrue information about what accumulators have early + // terminated + // in the case that they all do, we can finish + if (inputData.returnStatus == POStatus.STATUS_EARLY_TERMINATION) { + if (!earlyTermination.get(i)) + earlyTermination.set(i); + + continue; + } + + if (inputData.returnStatus == POStatus.STATUS_BATCH_OK) { + continue; + } + + if (inputData.returnStatus == POStatus.STATUS_EOP) { + // we are done with all the elements. Time to return. + its = null; + bags = null; + return inputData; + } + // if we see a error just return it + if (inputData.returnStatus == POStatus.STATUS_ERR) { + return inputData; + } + + // Object input = null; + + bags[i] = inputData.result; + + if (inputData.result instanceof DataBag && isToBeFlattenedArray[i]) { + its[i] = ((DataBag) bags[i]).iterator(); + } else { + its[i] = null; + } + } + } + + // if accumulating, we haven't got data yet for some fields, just return + if (isAccumulative() && isAccumStarted()) { + if (earlyTermination.cardinality() < noItems) { + res.returnStatus = POStatus.STATUS_BATCH_OK; + } else { + res.returnStatus = POStatus.STATUS_EARLY_TERMINATION; + } + return res; + } + + while (true) { + if (data == null) { + // getNext being called for the first time or starting on new + // input data + // we instantiate the template array and start populating it + // with data + data = new Object[noItems]; + for (int i = 0; i < noItems; ++i) { + if (isToBeFlattenedArray[i] && bags[i] instanceof DataBag) { + if (its[i].hasNext()) { + data[i] = its[i].next(); + } else { + // the input set is null, so we return. This is + // caught above and this function recalled with + // new inputs. + its = null; + data = null; + res.returnStatus = POStatus.STATUS_NULL; + return res; + } + } else { + data[i] = bags[i]; + } + + } + if (getReporter() != null) { + getReporter().progress(); + } + // createTuple(data); + + res.result = createTuple(data); + + res.returnStatus = POStatus.STATUS_OK; + return res; + } else { + // we try to find the last expression which needs flattening and + // start iterating over it + // we also try to update the template array + for (int index = noItems - 1; index >= 0; --index) { + if (its[index] != null && isToBeFlattenedArray[index]) { + if (its[index].hasNext()) { + data[index] = its[index].next(); + res.result = createTuple(data); + res.returnStatus = POStatus.STATUS_OK; + return res; + } else { + its[index] = ((DataBag) bags[index]).iterator(); + data[index] = its[index].next(); + } + } + } + } + } + } + + /** + * We create a new tuple for the final flattened tuple, in case the rollup + * operation has been moved to the end of the operation list, we re-order + * the fields as the same order as the input's by using the outputIndex we + * initialized before. + * + * @param data: array that is the template for the final flattened tuple + * @return the final flattened tuple + */ + protected Tuple createTuple(Object[] data) throws ExecException { + Tuple out = tupleMaker.newTuple(); + Tuple temp = mTupleFactory.newTuple(); + int idx = 0; + for (int i = 0; i < data.length; ++i) { + Object in = data[i]; + + if ((isToBeFlattenedArray[i] || rollupFieldIndex != 0) && in instanceof Tuple) { + Tuple t = (Tuple) in; + int size = t.size(); + + //Output the fields'order in the tuple due to + //the outputIndex. In this case, the ROLLUP was + //moved to the end so we need to re-order the field + //inside a tuple to output the fields as the original orders. + if (rollupFieldIndex != 0) { + if (!isToBeFlattenedArray[i]) { + for (int j = 0; j < size; j++) + temp.append(t.get(outputIndex[j])); + Object inn = temp; + out.append(inn); + } else { + for (int j = 0; j < size; j++) + out.append(t.get(outputIndex[j])); + } + //There's no moving of ROLLUP to the end, we don't need + //to re-order the fields'index. + } else { + for (int j = 0; j < size; ++j) { + if (knownSize) { + out.set(idx++, t.get(j)); + } else { + out.append(t.get(j)); + } + } + } + } else { + if (knownSize) { + out.set(idx++, in); + } else { + out.append(in); + } + } + } + if (inpTuple != null) { + return illustratorMarkup(inpTuple, out, 0); + } else { + return illustratorMarkup2(data, out); + } + } + + /** + * Make a deep copy of this operator. + * + * @throws CloneNotSupportedException + */ + @Override + public PORollupHIIForEach clone() throws CloneNotSupportedException { + List<PhysicalPlan> plans = new ArrayList<PhysicalPlan>(inputPlans.size()); + for (PhysicalPlan plan : inputPlans) { + plans.add(plan.clone()); + } + List<Boolean> flattens = null; + if (isToBeFlattenedArray != null) { + flattens = new ArrayList<Boolean>(isToBeFlattenedArray.length); + for (boolean b : isToBeFlattenedArray) { + flattens.add(b); + } + } + + List<PhysicalOperator> ops = new ArrayList<PhysicalOperator>(opsToBeReset.size()); + for (PhysicalOperator op : opsToBeReset) { + ops.add(op); + } + PORollupHIIForEach clone = new PORollupHIIForEach(new OperatorKey(mKey.scope, NodeIdGenerator + .getGenerator().getNextNodeId(mKey.scope)), requestedParallelism, plans, flattens); + clone.setOpsToBeReset(ops); + clone.setResultType(getResultType()); + clone.addOriginalLocation(alias, getOriginalLocations()); + return clone; + } + + protected void attachInputToPlans(Tuple t) { + // super.attachInput(t); + for (PhysicalPlan p : inputPlans) { + p.attachInput(t); + } + } + + public void getLeaves() { + if (inputPlans != null) { + int i = -1; + if (isToBeFlattenedArray == null) { + isToBeFlattenedArray = new boolean[inputPlans.size()]; + } + planLeafOps = new PhysicalOperator[inputPlans.size()]; + for (PhysicalPlan p : inputPlans) { + ++i; + PhysicalOperator leaf = p.getLeaves().get(0); + planLeafOps[i] = leaf; + if (leaf instanceof POProject && leaf.getResultType() == DataType.TUPLE + && ((POProject) leaf).isProjectToEnd()) { + isToBeFlattenedArray[i] = true; + } + } + } + // we are calculating plan leaves + // so lets reinitialize + reInitialize(); + } + + private void reInitialize() { + if (planLeafOps != null) { + noItems = planLeafOps.length; + resultTypes = new byte[noItems]; + for (int i = 0; i < resultTypes.length; i++) { + resultTypes[i] = planLeafOps[i].getResultType(); + } + } else { + noItems = 0; + resultTypes = null; + } + + if (inputPlans != null) { + for (PhysicalPlan pp : inputPlans) { + try { + ResetFinder lf = new ResetFinder(pp, opsToBeReset); + lf.visit(); + } catch (VisitorException ve) { + String errMsg = "Internal Error: Unexpected error looking for nested operators which need to be reset in FOREACH"; + throw new RuntimeException(errMsg, ve); + } + } + } + } + + public List<PhysicalPlan> getInputPlans() { + return inputPlans; + } + + public void setInputPlans(List<PhysicalPlan> plans) { + inputPlans = plans; + planLeafOps = null; + getLeaves(); + } + + public void addInputPlan(PhysicalPlan plan, boolean flatten) { + inputPlans.add(plan); + // add to planLeafOps + // copy existing leaves + PhysicalOperator[] newPlanLeafOps = new PhysicalOperator[planLeafOps.length + 1]; + for (int i = 0; i < planLeafOps.length; i++) { + newPlanLeafOps[i] = planLeafOps[i]; + } + // add to the end + newPlanLeafOps[planLeafOps.length] = plan.getLeaves().get(0); + planLeafOps = newPlanLeafOps; + + // add to isToBeFlattenedArray + // copy existing values + boolean[] newIsToBeFlattenedArray = new boolean[isToBeFlattenedArray.length + 1]; + for (int i = 0; i < isToBeFlattenedArray.length; i++) { + newIsToBeFlattenedArray[i] = isToBeFlattenedArray[i]; + } + // add to end + newIsToBeFlattenedArray[isToBeFlattenedArray.length] = flatten; + isToBeFlattenedArray = newIsToBeFlattenedArray; + + // we just added a leaf - reinitialize + reInitialize(); + } + + public void setToBeFlattened(List<Boolean> flattens) { + setUpFlattens(flattens); + } + + public List<Boolean> getToBeFlattened() { + List<Boolean> result = null; + if (isToBeFlattenedArray != null) { + result = new ArrayList<Boolean>(); + for (int i = 0; i < isToBeFlattenedArray.length; i++) { + result.add(isToBeFlattenedArray[i]); + } + } + return result; + } + + public boolean inProcessing() { + return processingPlan; + } + + protected void setUpFlattens(List<Boolean> isToBeFlattened) { + if (isToBeFlattened == null) { + isToBeFlattenedArray = null; + } else { + isToBeFlattenedArray = new boolean[isToBeFlattened.size()]; + int i = 0; + for (Iterator<Boolean> it = isToBeFlattened.iterator(); it.hasNext();) { + isToBeFlattenedArray[i++] = it.next(); + } + } + } + + /** + * Visits a pipeline and calls reset on all the nodes. Currently only pays + * attention to limit nodes, each of which need to be told to reset their + * limit. + */ + protected class ResetFinder extends PhyPlanVisitor { + + ResetFinder(PhysicalPlan plan, List<PhysicalOperator> toBeReset) { + super(plan, new DependencyOrderWalker<PhysicalOperator, PhysicalPlan>(plan)); + } + + @Override + public void visitDistinct(PODistinct d) throws VisitorException { + // FIXME: add only if limit is present + opsToBeReset.add(d); + } + + @Override + public void visitLimit(POLimit limit) throws VisitorException { + opsToBeReset.add(limit); + } + + @Override + public void visitSort(POSort sort) throws VisitorException { + // FIXME: add only if limit is present + opsToBeReset.add(sort); + } + + /* + * (non-Javadoc) + * + * @see + * org.apache.pig.backend.hadoop.executionengine.physicalLayer.plans + * .PhyPlanVisitor + * #visitProject(org.apache.pig.backend.hadoop.executionengine + * .physicalLayer.expressionOperators.POProject) + */ + @Override + public void visitProject(POProject proj) throws VisitorException { + if (proj instanceof PORelationToExprProject) { + opsToBeReset.add(proj); + } + } + } + + /** + * @return the opsToBeReset + */ + public List<PhysicalOperator> getOpsToBeReset() { + return opsToBeReset; + } + + /** + * @param opsToBeReset + * the opsToBeReset to set + */ + public void setOpsToBeReset(List<PhysicalOperator> opsToBeReset) { + this.opsToBeReset = opsToBeReset; + } + + protected Tuple illustratorMarkup2(Object[] in, Object out) { + if (illustrator != null) { + ExampleTuple tOut = new ExampleTuple((Tuple) out); + illustrator.getLineage().insert(tOut); + boolean synthetic = false; + for (Object tIn : in) { + synthetic |= ((ExampleTuple) tIn).synthetic; + illustrator.getLineage().union(tOut, (Tuple) tIn); + } + illustrator.addData(tOut); + int i; + for (i = 0; i < noItems; ++i) { + if (((DataBag) bags[i]).size() < 2) { + break; + } + } + if (i >= noItems && !illustrator.getEqClassesShared()) { + illustrator.getEquivalenceClasses().get(0).add(tOut); + } + tOut.synthetic = synthetic; + return tOut; + } else { + return (Tuple) out; + } + } + + @Override + public Tuple illustratorMarkup(Object in, Object out, int eqClassIndex) { + if (illustrator != null) { + ExampleTuple tOut = new ExampleTuple((Tuple) out); + illustrator.addData(tOut); + if (!illustrator.getEqClassesShared()) { + illustrator.getEquivalenceClasses().get(0).add(tOut); + } + LineageTracer lineageTracer = illustrator.getLineage(); + lineageTracer.insert(tOut); + tOut.synthetic = ((ExampleTuple) in).synthetic; + lineageTracer.union((ExampleTuple) in, tOut); + return tOut; + } else { + return (Tuple) out; + } + } + +}
Modified: pig/trunk/src/org/apache/pig/backend/hadoop/executionengine/physicalLayer/util/PlanHelper.java URL: http://svn.apache.org/viewvc/pig/trunk/src/org/apache/pig/backend/hadoop/executionengine/physicalLayer/util/PlanHelper.java?rev=1645056&r1=1645055&r2=1645056&view=diff ============================================================================== --- pig/trunk/src/org/apache/pig/backend/hadoop/executionengine/physicalLayer/util/PlanHelper.java (original) +++ pig/trunk/src/org/apache/pig/backend/hadoop/executionengine/physicalLayer/util/PlanHelper.java Fri Dec 12 20:15:26 2014 @@ -72,6 +72,7 @@ import org.apache.pig.backend.hadoop.exe import org.apache.pig.backend.hadoop.executionengine.physicalLayer.relationalOperators.POPoissonSample; import org.apache.pig.backend.hadoop.executionengine.physicalLayer.relationalOperators.POPreCombinerLocalRearrange; import org.apache.pig.backend.hadoop.executionengine.physicalLayer.relationalOperators.POReservoirSample; +import org.apache.pig.backend.hadoop.executionengine.physicalLayer.relationalOperators.PORollupHIIForEach; import org.apache.pig.backend.hadoop.executionengine.physicalLayer.relationalOperators.POSkewedJoin; import org.apache.pig.backend.hadoop.executionengine.physicalLayer.relationalOperators.POSort; import org.apache.pig.backend.hadoop.executionengine.physicalLayer.relationalOperators.POSplit; @@ -236,6 +237,12 @@ public class PlanHelper { } @Override + public void visitPORollupHIIForEach(PORollupHIIForEach hfe) throws VisitorException { + super.visitPORollupHIIForEach(hfe); + visit(hfe); + } + + @Override public void visitUnion(POUnion un) throws VisitorException { super.visitUnion(un); visit(un); Modified: pig/trunk/src/org/apache/pig/builtin/RollupDimensions.java URL: http://svn.apache.org/viewvc/pig/trunk/src/org/apache/pig/builtin/RollupDimensions.java?rev=1645056&r1=1645055&r2=1645056&view=diff ============================================================================== --- pig/trunk/src/org/apache/pig/builtin/RollupDimensions.java (original) +++ pig/trunk/src/org/apache/pig/builtin/RollupDimensions.java Fri Dec 12 20:15:26 2014 @@ -47,6 +47,10 @@ public class RollupDimensions extends Ev private static BagFactory bf = BagFactory.getInstance(); private static TupleFactory tf = TupleFactory.getInstance(); private final String allMarker; + // the pivot position + private int pivot = -1; + // to check if rollup is optimized or not + private boolean rollupHIIoptimizable = false; public RollupDimensions() { this(null); @@ -57,6 +61,18 @@ public class RollupDimensions extends Ev this.allMarker = allMarker; } + public void setRollupHIIOptimizable(boolean check) { + this.rollupHIIoptimizable = check; + } + + public boolean getRollupHIIOptimizable() { + return this.rollupHIIoptimizable; + } + + public void setPivot(int pvt) throws IOException { + this.pivot = pvt; + } + @Override public DataBag exec(Tuple tuple) throws IOException { List<Tuple> result = Lists.newArrayListWithCapacity(tuple.size() + 1); @@ -66,12 +82,32 @@ public class RollupDimensions extends Ev return bf.newDefaultBag(result); } - private void iterativelyRollup(List<Tuple> result, Tuple input) throws ExecException { - Tuple tempTup = tf.newTuple(input.getAll()); - for (int i = input.size() - 1; i >= 0; i--) { - tempTup.set(i, allMarker); - result.add(tf.newTuple(tempTup.getAll())); - } + private void iterativelyRollup(List<Tuple> result, Tuple input) + throws IOException { + + Tuple tempTup = tf.newTuple(input.getAll()); + + //if (this.rollupHIIoptimizable != null) { // rule is enabled + if (this.rollupHIIoptimizable == true) { + if (this.pivot == -1) // user did not specify the pivot position + // --> IRG approach + return; + else { // user did specify the pivot position --> IRG + IRG + if (this.pivot == 0) // we use the IRG approach + return; + else { // we use IRG+IRG approach + for (int i = this.pivot - 1; i < input.size(); i++) + tempTup.set(i, allMarker); + result.add(tf.newTuple(tempTup.getAll())); + } + } + } + else { // we can not optimize --> Vanilla approach + for (int i = input.size() - 1; i >= 0; i--) { + tempTup.set(i, allMarker); + result.add(tf.newTuple(tempTup.getAll())); + } + } } @Override Modified: pig/trunk/src/org/apache/pig/newplan/logical/expression/ExpToPhyTranslationVisitor.java URL: http://svn.apache.org/viewvc/pig/trunk/src/org/apache/pig/newplan/logical/expression/ExpToPhyTranslationVisitor.java?rev=1645056&r1=1645055&r2=1645056&view=diff ============================================================================== --- pig/trunk/src/org/apache/pig/newplan/logical/expression/ExpToPhyTranslationVisitor.java (original) +++ pig/trunk/src/org/apache/pig/newplan/logical/expression/ExpToPhyTranslationVisitor.java Fri Dec 12 20:15:26 2014 @@ -60,6 +60,7 @@ import org.apache.pig.backend.hadoop.exe import org.apache.pig.backend.hadoop.executionengine.physicalLayer.expressionOperators.POUserFunc; import org.apache.pig.backend.hadoop.executionengine.physicalLayer.expressionOperators.Subtract; import org.apache.pig.backend.hadoop.executionengine.physicalLayer.plans.PhysicalPlan; +import org.apache.pig.builtin.RollupDimensions; import org.apache.pig.data.DataType; import org.apache.pig.impl.PigContext; import org.apache.pig.impl.logicalLayer.FrontendException; @@ -78,13 +79,13 @@ public class ExpToPhyTranslationVisitor // This value points to the current LogicalRelationalOperator we are working on protected LogicalRelationalOperator currentOp; - - public ExpToPhyTranslationVisitor(OperatorPlan plan, LogicalRelationalOperator op, PhysicalPlan phyPlan, + + public ExpToPhyTranslationVisitor(OperatorPlan plan, LogicalRelationalOperator op, PhysicalPlan phyPlan, Map<Operator, PhysicalOperator> map) throws FrontendException { this(plan, new DependencyOrderWalker(plan), op, phyPlan, map); } - - public ExpToPhyTranslationVisitor(OperatorPlan plan, PlanWalker walker, LogicalRelationalOperator op, PhysicalPlan phyPlan, + + public ExpToPhyTranslationVisitor(OperatorPlan plan, PlanWalker walker, LogicalRelationalOperator op, PhysicalPlan phyPlan, Map<Operator, PhysicalOperator> map) throws FrontendException { super(plan, walker); currentOp = op; @@ -92,7 +93,7 @@ public class ExpToPhyTranslationVisitor currentPlan = phyPlan; currentPlans = new LinkedList<PhysicalPlan>(); } - + protected Map<Operator, PhysicalOperator> logToPhyMap; protected Deque<PhysicalPlan> currentPlans; @@ -102,7 +103,7 @@ public class ExpToPhyTranslationVisitor protected NodeIdGenerator nodeGen = NodeIdGenerator.getGenerator(); protected PigContext pc; - + public void setPigContext(PigContext pc) { this.pc = pc; } @@ -110,13 +111,13 @@ public class ExpToPhyTranslationVisitor public PhysicalPlan getPhysicalPlan() { return currentPlan; } - - private void attachBinaryComparisonOperator( BinaryExpression op, + + private void attachBinaryComparisonOperator( BinaryExpression op, BinaryComparisonOperator exprOp ) throws FrontendException { // We dont have aliases in ExpressionOperators // exprOp.setAlias(op.getAlias()); - - + + exprOp.setOperandType(op.getLhs().getType()); exprOp.setLhs((ExpressionOperator) logToPhyMap.get(op.getLhs())); exprOp.setRhs((ExpressionOperator) logToPhyMap.get(op.getRhs())); @@ -140,13 +141,13 @@ public class ExpToPhyTranslationVisitor } } } - - private void attachBinaryExpressionOperator( BinaryExpression op, + + private void attachBinaryExpressionOperator( BinaryExpression op, BinaryExpressionOperator exprOp ) throws FrontendException { // We dont have aliases in ExpressionOperators // exprOp.setAlias(op.getAlias()); - - + + exprOp.setResultType(op.getLhs().getType()); exprOp.setLhs((ExpressionOperator) logToPhyMap.get(op.getLhs())); exprOp.setRhs((ExpressionOperator) logToPhyMap.get(op.getRhs())); @@ -173,81 +174,81 @@ public class ExpToPhyTranslationVisitor @Override public void visit( AndExpression op ) throws FrontendException { - + // System.err.println("Entering And"); BinaryComparisonOperator exprOp = new POAnd(new OperatorKey(DEFAULT_SCOPE, nodeGen.getNextNodeId(DEFAULT_SCOPE))); - + attachBinaryComparisonOperator(op, exprOp); } - + @Override public void visit( OrExpression op ) throws FrontendException { - + // System.err.println("Entering Or"); BinaryComparisonOperator exprOp = new POOr(new OperatorKey(DEFAULT_SCOPE, nodeGen.getNextNodeId(DEFAULT_SCOPE))); - + attachBinaryComparisonOperator(op, exprOp); } - + @Override public void visit( EqualExpression op ) throws FrontendException { - + BinaryComparisonOperator exprOp = new EqualToExpr(new OperatorKey( DEFAULT_SCOPE, nodeGen.getNextNodeId(DEFAULT_SCOPE))); - + attachBinaryComparisonOperator(op, exprOp); } - + @Override public void visit( NotEqualExpression op ) throws FrontendException { - + BinaryComparisonOperator exprOp = new NotEqualToExpr(new OperatorKey( DEFAULT_SCOPE, nodeGen.getNextNodeId(DEFAULT_SCOPE))); - + attachBinaryComparisonOperator(op, exprOp); } - + @Override public void visit( GreaterThanExpression op ) throws FrontendException { - + BinaryComparisonOperator exprOp = new GreaterThanExpr(new OperatorKey( DEFAULT_SCOPE, nodeGen.getNextNodeId(DEFAULT_SCOPE))); - + attachBinaryComparisonOperator(op, exprOp); } - + @Override public void visit( GreaterThanEqualExpression op ) throws FrontendException { - + BinaryComparisonOperator exprOp = new GTOrEqualToExpr(new OperatorKey( DEFAULT_SCOPE, nodeGen.getNextNodeId(DEFAULT_SCOPE))); - + attachBinaryComparisonOperator(op, exprOp); } - + @Override public void visit( LessThanExpression op ) throws FrontendException { - + BinaryComparisonOperator exprOp = new LessThanExpr(new OperatorKey( DEFAULT_SCOPE, nodeGen.getNextNodeId(DEFAULT_SCOPE))); - + attachBinaryComparisonOperator(op, exprOp); } - - + + @Override public void visit( LessThanEqualExpression op ) throws FrontendException { - + BinaryComparisonOperator exprOp = new LTOrEqualToExpr(new OperatorKey( DEFAULT_SCOPE, nodeGen.getNextNodeId(DEFAULT_SCOPE))); - + attachBinaryComparisonOperator(op, exprOp); } - + @Override public void visit(ProjectExpression op) throws FrontendException { POProject exprOp; - + if(op.getAttachedRelationalOp() instanceof LOGenerate && op.getPlan().getSuccessors(op)==null && !(op.findReferent() instanceof LOInnerLoad)) { exprOp = new PORelationToExprProject(new OperatorKey(DEFAULT_SCOPE, nodeGen @@ -256,7 +257,7 @@ public class ExpToPhyTranslationVisitor exprOp = new POProject(new OperatorKey(DEFAULT_SCOPE, nodeGen .getNextNodeId(DEFAULT_SCOPE))); } - + if (op.getFieldSchema()==null && op.isRangeOrStarProject()) exprOp.setResultType(DataType.TUPLE); else @@ -278,9 +279,9 @@ public class ExpToPhyTranslationVisitor // TODO implement this // exprOp.setOverloaded(op.getOverloaded()); logToPhyMap.put(op, exprOp); - currentPlan.add(exprOp); + currentPlan.add(exprOp); } - + @Override public void visit( MapLookupExpression op ) throws FrontendException { ExpressionOperator physOp = new POMapLookUp(new OperatorKey(DEFAULT_SCOPE, @@ -302,10 +303,10 @@ public class ExpToPhyTranslationVisitor throw new LogicalToPhysicalTranslatorException(msg, errCode, PigException.BUG, e); } } - + @Override public void visit(org.apache.pig.newplan.logical.expression.ConstantExpression op) throws FrontendException { - + // System.err.println("Entering Constant"); ConstantExpression ce = new ConstantExpression(new OperatorKey(DEFAULT_SCOPE, nodeGen.getNextNodeId(DEFAULT_SCOPE))); @@ -318,7 +319,7 @@ public class ExpToPhyTranslationVisitor logToPhyMap.put(op, ce); // System.err.println("Exiting Constant"); } - + @Override public void visit( CastExpression op ) throws FrontendException { POCast pCast = new POCast(new OperatorKey(DEFAULT_SCOPE, nodeGen @@ -351,10 +352,10 @@ public class ExpToPhyTranslationVisitor throw new LogicalToPhysicalTranslatorException(msg, errCode, PigException.BUG, e); } } - + @Override public void visit( NotExpression op ) throws FrontendException { - + PONot pNot = new PONot(new OperatorKey(DEFAULT_SCOPE, nodeGen .getNextNodeId(DEFAULT_SCOPE))); // physOp.setAlias(op.getAlias()); @@ -374,7 +375,7 @@ public class ExpToPhyTranslationVisitor throw new LogicalToPhysicalTranslatorException(msg, errCode, PigException.BUG, e); } } - + @Override public void visit( IsNullExpression op ) throws FrontendException { POIsNull pIsNull = new POIsNull(new OperatorKey(DEFAULT_SCOPE, nodeGen @@ -408,7 +409,7 @@ public class ExpToPhyTranslationVisitor ExpressionOperator from = (ExpressionOperator) logToPhyMap.get(op .getExpression()); pNegative.setExpr(from); - pNegative.setResultType(op.getType()); + pNegative.setResultType(op.getType()); try { currentPlan.connect(from, pNegative); } catch (PlanException e) { @@ -417,60 +418,60 @@ public class ExpToPhyTranslationVisitor throw new LogicalToPhysicalTranslatorException(msg, errCode, PigException.BUG, e); } } - + @Override - public void visit( AddExpression op ) throws FrontendException { - BinaryExpressionOperator exprOp = new Add(new OperatorKey(DEFAULT_SCOPE, nodeGen.getNextNodeId(DEFAULT_SCOPE))); - + public void visit( AddExpression op ) throws FrontendException { + BinaryExpressionOperator exprOp = new Add(new OperatorKey(DEFAULT_SCOPE, nodeGen.getNextNodeId(DEFAULT_SCOPE))); + attachBinaryExpressionOperator(op, exprOp); } - + @Override - public void visit( RegexExpression op ) throws FrontendException { - BinaryExpressionOperator exprOp = new PORegexp(new OperatorKey(DEFAULT_SCOPE, nodeGen.getNextNodeId(DEFAULT_SCOPE))); - + public void visit( RegexExpression op ) throws FrontendException { + BinaryExpressionOperator exprOp = new PORegexp(new OperatorKey(DEFAULT_SCOPE, nodeGen.getNextNodeId(DEFAULT_SCOPE))); + attachBinaryExpressionOperator(op, exprOp); - + List<Operator> successors = op.getPlan().getSuccessors(op); if (successors.get(1) instanceof org.apache.pig.newplan.logical.expression.ConstantExpression) { ((PORegexp)exprOp).setConstExpr(true); } } - + @Override - public void visit( SubtractExpression op ) throws FrontendException { - BinaryExpressionOperator exprOp = new Subtract(new OperatorKey(DEFAULT_SCOPE, nodeGen.getNextNodeId(DEFAULT_SCOPE))); - + public void visit( SubtractExpression op ) throws FrontendException { + BinaryExpressionOperator exprOp = new Subtract(new OperatorKey(DEFAULT_SCOPE, nodeGen.getNextNodeId(DEFAULT_SCOPE))); + attachBinaryExpressionOperator(op, exprOp); } - + @Override - public void visit( MultiplyExpression op ) throws FrontendException { - BinaryExpressionOperator exprOp = new Multiply(new OperatorKey(DEFAULT_SCOPE, nodeGen.getNextNodeId(DEFAULT_SCOPE))); - + public void visit( MultiplyExpression op ) throws FrontendException { + BinaryExpressionOperator exprOp = new Multiply(new OperatorKey(DEFAULT_SCOPE, nodeGen.getNextNodeId(DEFAULT_SCOPE))); + attachBinaryExpressionOperator(op, exprOp); } - + @Override - public void visit( DivideExpression op ) throws FrontendException { - BinaryExpressionOperator exprOp = new Divide(new OperatorKey(DEFAULT_SCOPE, nodeGen.getNextNodeId(DEFAULT_SCOPE))); - + public void visit( DivideExpression op ) throws FrontendException { + BinaryExpressionOperator exprOp = new Divide(new OperatorKey(DEFAULT_SCOPE, nodeGen.getNextNodeId(DEFAULT_SCOPE))); + attachBinaryExpressionOperator(op, exprOp); } - + @Override - public void visit( ModExpression op ) throws FrontendException { - BinaryExpressionOperator exprOp = new Mod(new OperatorKey(DEFAULT_SCOPE, nodeGen.getNextNodeId(DEFAULT_SCOPE))); - + public void visit( ModExpression op ) throws FrontendException { + BinaryExpressionOperator exprOp = new Mod(new OperatorKey(DEFAULT_SCOPE, nodeGen.getNextNodeId(DEFAULT_SCOPE))); + attachBinaryExpressionOperator(op, exprOp); } - + @Override public void visit( BinCondExpression op ) throws FrontendException { - + POBinCond exprOp = new POBinCond( new OperatorKey(DEFAULT_SCOPE, nodeGen.getNextNodeId(DEFAULT_SCOPE)) ); - + exprOp.setResultType(op.getType()); exprOp.setCond((ExpressionOperator) logToPhyMap.get(op.getCondition())); exprOp.setLhs((ExpressionOperator) logToPhyMap.get(op.getLhs())); @@ -495,17 +496,34 @@ public class ExpToPhyTranslationVisitor } } } - + @SuppressWarnings("unchecked") @Override - public void visit( UserFuncExpression op ) throws FrontendException { + public void visit( UserFuncExpression op ) throws FrontendException { Object f = PigContext.instantiateFuncFromSpec(op.getFuncSpec()); PhysicalOperator p; + String ROLLUP_UDF = RollupDimensions.class.getName(); if (f instanceof EvalFunc) { p = new POUserFunc(new OperatorKey(DEFAULT_SCOPE, nodeGen .getNextNodeId(DEFAULT_SCOPE)), -1, null, op.getFuncSpec(), (EvalFunc) f); ((POUserFunc)p).setSignature(op.getSignature()); + if( op.getFuncSpec().toString().equals(ROLLUP_UDF)) { + //Set the pivot value + ((POUserFunc)p).setPivot(op.getPivot()); + if(op.getRollupHIIOptimizable()!=false) { + ((POUserFunc)p).setRollupHIIOptimizable(true); + //Set value for RollupHIIOptimizable and pivot of RollupDimension + EvalFunc<?> tmp = ((POUserFunc)p).getFunc(); + ((RollupDimensions)tmp).setRollupHIIOptimizable(true); + try { + ((RollupDimensions)tmp).setPivot(op.getPivot()); + } catch (IOException e) { + // TODO Auto-generated catch block + e.printStackTrace(); + } + } + } //reinitialize input schema from signature if (((POUserFunc)p).getFunc().getInputSchema() == null) { ((POUserFunc)p).setFuncInputSchema(op.getSignature()); @@ -535,7 +553,7 @@ public class ExpToPhyTranslationVisitor } } logToPhyMap.put(op, p); - + //We need to track all the scalars if( op instanceof ScalarExpression ) { Operator refOp = ((ScalarExpression)op).getImplicitReferencedOperator(); @@ -543,20 +561,20 @@ public class ExpToPhyTranslationVisitor } } - + @Override public void visit( DereferenceExpression op ) throws FrontendException { POProject exprOp = new POProject(new OperatorKey(DEFAULT_SCOPE, nodeGen .getNextNodeId(DEFAULT_SCOPE))); exprOp.setResultType(op.getType()); - exprOp.setColumns((ArrayList<Integer>)op.getBagColumns()); + exprOp.setColumns((ArrayList<Integer>)op.getBagColumns()); exprOp.setStar(false); logToPhyMap.put(op, exprOp); currentPlan.add(exprOp); - + PhysicalOperator from = logToPhyMap.get( op.getReferredExpression() ); - + if( from != null ) { currentPlan.connect(from, exprOp); } Modified: pig/trunk/src/org/apache/pig/newplan/logical/expression/UserFuncExpression.java URL: http://svn.apache.org/viewvc/pig/trunk/src/org/apache/pig/newplan/logical/expression/UserFuncExpression.java?rev=1645056&r1=1645055&r2=1645056&view=diff ============================================================================== --- pig/trunk/src/org/apache/pig/newplan/logical/expression/UserFuncExpression.java (original) +++ pig/trunk/src/org/apache/pig/newplan/logical/expression/UserFuncExpression.java Fri Dec 12 20:15:26 2014 @@ -57,6 +57,26 @@ public class UserFuncExpression extends private static int sigSeq=0; private boolean viaDefine=false; //this represents whether the function was instantiate via a DEFINE statement or not + private boolean rollupHIIoptimizable = false; + //the pivot value + private int pivot = -1; + + public void setPivot(int pvt) { + this.pivot = pvt; + } + + public int getPivot() { + return this.pivot; + } + + public void setRollupHIIOptimizable(boolean check) { + this.rollupHIIoptimizable = check; + } + + public boolean getRollupHIIOptimizable() { + return this.rollupHIIoptimizable; + } + public UserFuncExpression(OperatorPlan plan, FuncSpec funcSpec) { super("UserFunc", plan); mFuncSpec = funcSpec; @@ -66,7 +86,6 @@ public class UserFuncExpression extends } } - public UserFuncExpression(OperatorPlan plan, FuncSpec funcSpec, List<LogicalExpression> args) { this( plan, funcSpec ); Modified: pig/trunk/src/org/apache/pig/newplan/logical/optimizer/LogicalPlanOptimizer.java URL: http://svn.apache.org/viewvc/pig/trunk/src/org/apache/pig/newplan/logical/optimizer/LogicalPlanOptimizer.java?rev=1645056&r1=1645055&r2=1645056&view=diff ============================================================================== --- pig/trunk/src/org/apache/pig/newplan/logical/optimizer/LogicalPlanOptimizer.java (original) +++ pig/trunk/src/org/apache/pig/newplan/logical/optimizer/LogicalPlanOptimizer.java Fri Dec 12 20:15:26 2014 @@ -44,6 +44,7 @@ import org.apache.pig.newplan.logical.ru import org.apache.pig.newplan.logical.rules.PredicatePushdownOptimizer; import org.apache.pig.newplan.logical.rules.PushDownForEachFlatten; import org.apache.pig.newplan.logical.rules.PushUpFilter; +import org.apache.pig.newplan.logical.rules.RollupHIIOptimizer; import org.apache.pig.newplan.logical.rules.SplitFilter; import org.apache.pig.newplan.logical.rules.StreamTypeCastInserter; import org.apache.pig.newplan.optimizer.PlanOptimizer; @@ -56,6 +57,7 @@ public class LogicalPlanOptimizer extend private boolean allRulesDisabled = false; private SetMultimap<RulesReportKey, String> rulesReport = TreeMultimap.create(); private PigContext pc = null; + private static final String MAPREDUCE_FW = "MAPREDUCE"; public LogicalPlanOptimizer(OperatorPlan p, int iterations, Set<String> turnOffRules) { this(p, iterations, turnOffRules, null); @@ -203,6 +205,20 @@ public class LogicalPlanOptimizer extend if (!s.isEmpty()) ls.add(s); + // RollupHIIOptimizer Set + // This set of rules for rollup hii + // If pig is not running in MR mode, this rule will be disabled + if (pc!=null) + if (pc.getExecType().toString().equals(MAPREDUCE_FW)) { + s = new HashSet<Rule>(); + // Optimize RollupHII + r = new RollupHIIOptimizer("RollupHIIOptimizer"); + checkAndAddRule(s, r); + if (!s.isEmpty()) + ls.add(s); + } else { + LOG.info("Not MR mode. RollupHIIOptimizer is disabled"); + } return ls; }
