Modified: pig/branches/spark/src/org/apache/pig/impl/util/avro/AvroStorageSchemaConversionUtilities.java URL: http://svn.apache.org/viewvc/pig/branches/spark/src/org/apache/pig/impl/util/avro/AvroStorageSchemaConversionUtilities.java?rev=1733627&r1=1733626&r2=1733627&view=diff ============================================================================== --- pig/branches/spark/src/org/apache/pig/impl/util/avro/AvroStorageSchemaConversionUtilities.java (original) +++ pig/branches/spark/src/org/apache/pig/impl/util/avro/AvroStorageSchemaConversionUtilities.java Fri Mar 4 18:17:39 2016 @@ -28,12 +28,13 @@ import org.apache.avro.AvroRuntimeExcept import org.apache.avro.Schema; import org.apache.avro.Schema.Field; import org.apache.avro.Schema.Type; +import org.apache.commons.logging.Log; +import org.apache.commons.logging.LogFactory; import org.apache.pig.LoadPushDown.RequiredField; import org.apache.pig.ResourceSchema; import org.apache.pig.LoadPushDown.RequiredFieldList; import org.apache.pig.backend.executionengine.ExecException; import org.apache.pig.data.DataType; -import org.mortbay.log.Log; import com.google.common.collect.Lists; import com.google.common.collect.Maps; @@ -44,6 +45,7 @@ import com.google.common.collect.Sets; * and vice versa. */ public class AvroStorageSchemaConversionUtilities { + private static final Log LOG = LogFactory.getLog(AvroStorageSchemaConversionUtilities.class); /** * Determines the pig object type of the Avro schema. @@ -222,7 +224,7 @@ public class AvroStorageSchemaConversion switch(mapAvroSchema.getType()) { case RECORD: ResourceSchema innerResourceSchemaRecord = - avroSchemaToResourceSchema(fieldSchema.getValueType(), schemasInStack, + avroSchemaToResourceSchema(mapAvroSchema, schemasInStack, alreadyDefinedSchemas, allowRecursiveSchema); mapSchemaFields[0] = new ResourceSchema.ResourceFieldSchema(); mapSchemaFields[0].setType(DataType.TUPLE); @@ -235,7 +237,7 @@ public class AvroStorageSchemaConversion case MAP: case ARRAY: ResourceSchema innerResourceSchema = - avroSchemaToResourceSchema(fieldSchema.getValueType(), schemasInStack, + avroSchemaToResourceSchema(mapAvroSchema, schemasInStack, alreadyDefinedSchemas, allowRecursiveSchema); rf.setSchema(innerResourceSchema); break; @@ -606,7 +608,7 @@ public class AvroStorageSchemaConversion return null; } } catch (ExecException e) { - Log.warn("ExecException caught in newSchemaFromRequiredFieldList", e); + LOG.warn("ExecException caught in newSchemaFromRequiredFieldList", e); return null; } if (rf.getSubFields() == null) {
Modified: pig/branches/spark/src/org/apache/pig/impl/util/avro/AvroTupleWrapper.java URL: http://svn.apache.org/viewvc/pig/branches/spark/src/org/apache/pig/impl/util/avro/AvroTupleWrapper.java?rev=1733627&r1=1733626&r2=1733627&view=diff ============================================================================== --- pig/branches/spark/src/org/apache/pig/impl/util/avro/AvroTupleWrapper.java (original) +++ pig/branches/spark/src/org/apache/pig/impl/util/avro/AvroTupleWrapper.java Fri Mar 4 18:17:39 2016 @@ -28,6 +28,7 @@ import org.apache.avro.generic.GenericAr import org.apache.avro.generic.GenericData; import org.apache.avro.generic.GenericEnumSymbol; import org.apache.avro.generic.IndexedRecord; +import org.apache.commons.logging.Log; import org.apache.commons.logging.LogFactory; import org.apache.pig.backend.executionengine.ExecException; import org.apache.pig.data.DataByteArray; @@ -47,6 +48,7 @@ import java.util.Map; */ public final class AvroTupleWrapper <T extends IndexedRecord> implements Tuple { + private static final Log LOG = LogFactory.getLog(AvroTupleWrapper.class); /** * The Avro object wrapped in the pig Tuple. @@ -130,13 +132,17 @@ public final class AvroTupleWrapper <T e case BYTES: return new DataByteArray(((ByteBuffer) o).array()); case UNION: - return unionResolver(o); + return getPigObject(o); default: return o; } } - public static Object unionResolver(Object o) { + /** + * @param o An Avro object to convert to an equivalent type in Pig + * @return Equivalent Pig object + */ + public static Object getPigObject(Object o) { if (o instanceof org.apache.avro.util.Utf8) { return o.toString(); } else if (o instanceof IndexedRecord) { @@ -165,8 +171,7 @@ public final class AvroTupleWrapper <T e try { all.add(get(f.pos())); } catch (ExecException e) { - LogFactory.getLog(getClass()).error( - "could not process tuple with contents " + avroObject, e); + LOG.error("could not process tuple with contents " + avroObject, e); return null; } } Modified: pig/branches/spark/src/org/apache/pig/newplan/logical/expression/ExpToPhyTranslationVisitor.java URL: http://svn.apache.org/viewvc/pig/branches/spark/src/org/apache/pig/newplan/logical/expression/ExpToPhyTranslationVisitor.java?rev=1733627&r1=1733626&r2=1733627&view=diff ============================================================================== --- pig/branches/spark/src/org/apache/pig/newplan/logical/expression/ExpToPhyTranslationVisitor.java (original) +++ pig/branches/spark/src/org/apache/pig/newplan/logical/expression/ExpToPhyTranslationVisitor.java Fri Mar 4 18:17:39 2016 @@ -60,7 +60,6 @@ import org.apache.pig.backend.hadoop.exe import org.apache.pig.backend.hadoop.executionengine.physicalLayer.expressionOperators.POUserFunc; import org.apache.pig.backend.hadoop.executionengine.physicalLayer.expressionOperators.Subtract; import org.apache.pig.backend.hadoop.executionengine.physicalLayer.plans.PhysicalPlan; -import org.apache.pig.builtin.RollupDimensions; import org.apache.pig.data.DataType; import org.apache.pig.impl.PigContext; import org.apache.pig.impl.logicalLayer.FrontendException; @@ -502,36 +501,24 @@ public class ExpToPhyTranslationVisitor public void visit( UserFuncExpression op ) throws FrontendException { Object f = PigContext.instantiateFuncFromSpec(op.getFuncSpec()); PhysicalOperator p; - String ROLLUP_UDF = RollupDimensions.class.getName(); if (f instanceof EvalFunc) { p = new POUserFunc(new OperatorKey(DEFAULT_SCOPE, nodeGen .getNextNodeId(DEFAULT_SCOPE)), -1, null, op.getFuncSpec(), (EvalFunc) f); ((POUserFunc)p).setSignature(op.getSignature()); - if( op.getFuncSpec().toString().equals(ROLLUP_UDF)) { - //Set the pivot value - ((POUserFunc)p).setPivot(op.getPivot()); - if(op.getRollupHIIOptimizable()!=false) { - ((POUserFunc)p).setRollupHIIOptimizable(true); - //Set value for RollupHIIOptimizable and pivot of RollupDimension - EvalFunc<?> tmp = ((POUserFunc)p).getFunc(); - ((RollupDimensions)tmp).setRollupHIIOptimizable(true); - try { - ((RollupDimensions)tmp).setPivot(op.getPivot()); - } catch (IOException e) { - // TODO Auto-generated catch block - e.printStackTrace(); - } - } - } //reinitialize input schema from signature if (((POUserFunc)p).getFunc().getInputSchema() == null) { ((POUserFunc)p).setFuncInputSchema(op.getSignature()); + ((EvalFunc) f).setInputSchema(((POUserFunc)p).getFunc().getInputSchema()); } List<String> cacheFiles = ((EvalFunc)f).getCacheFiles(); if (cacheFiles != null) { ((POUserFunc)p).setCacheFiles(cacheFiles); } + List<String> shipFiles = ((EvalFunc)f).getShipFiles(); + if (shipFiles != null) { + ((POUserFunc)p).setShipFiles(shipFiles); + } } else { p = new POUserComparisonFunc(new OperatorKey(DEFAULT_SCOPE, nodeGen .getNextNodeId(DEFAULT_SCOPE)), -1, Modified: pig/branches/spark/src/org/apache/pig/newplan/logical/expression/UserFuncExpression.java URL: http://svn.apache.org/viewvc/pig/branches/spark/src/org/apache/pig/newplan/logical/expression/UserFuncExpression.java?rev=1733627&r1=1733626&r2=1733627&view=diff ============================================================================== --- pig/branches/spark/src/org/apache/pig/newplan/logical/expression/UserFuncExpression.java (original) +++ pig/branches/spark/src/org/apache/pig/newplan/logical/expression/UserFuncExpression.java Fri Mar 4 18:17:39 2016 @@ -26,6 +26,7 @@ import java.util.Properties; import java.util.regex.Matcher; import java.util.regex.Pattern; +import org.apache.pig.Algebraic; import org.apache.pig.EvalFunc; import org.apache.pig.FuncSpec; import org.apache.pig.builtin.InvokerGenerator; @@ -57,26 +58,6 @@ public class UserFuncExpression extends private static int sigSeq=0; private boolean viaDefine=false; //this represents whether the function was instantiate via a DEFINE statement or not - private boolean rollupHIIoptimizable = false; - //the pivot value - private int pivot = -1; - - public void setPivot(int pvt) { - this.pivot = pvt; - } - - public int getPivot() { - return this.pivot; - } - - public void setRollupHIIOptimizable(boolean check) { - this.rollupHIIoptimizable = check; - } - - public boolean getRollupHIIOptimizable() { - return this.rollupHIIoptimizable; - } - public UserFuncExpression(OperatorPlan plan, FuncSpec funcSpec) { super("UserFunc", plan); mFuncSpec = funcSpec; @@ -86,6 +67,7 @@ public class UserFuncExpression extends } } + public UserFuncExpression(OperatorPlan plan, FuncSpec funcSpec, List<LogicalExpression> args) { this( plan, funcSpec ); @@ -241,10 +223,20 @@ public class UserFuncExpression extends } ef.setUDFContextSignature(signature); - Properties props = UDFContext.getUDFContext().getUDFProperties(ef.getClass()); Schema translatedInputSchema = Util.translateSchema(inputSchema); if(translatedInputSchema != null) { + Properties props = UDFContext.getUDFContext().getUDFProperties(ef.getClass()); props.put("pig.evalfunc.inputschema."+signature, translatedInputSchema); + if (ef instanceof Algebraic) { + // In case of Algebraic func, set original inputSchema to Initial, + // Intermed, Final + for (String func : new String[]{((Algebraic)ef).getInitial(), + ((Algebraic)ef).getIntermed(), ((Algebraic)ef).getFinal()}) { + Class c = PigContext.instantiateFuncFromSpec(new FuncSpec(func)).getClass(); + props = UDFContext.getUDFContext().getUDFProperties(c); + props.put("pig.evalfunc.inputschema."+signature, translatedInputSchema); + } + } } // Store inputSchema into the UDF context ef.setInputSchema(translatedInputSchema); Modified: pig/branches/spark/src/org/apache/pig/newplan/logical/optimizer/LogicalPlanOptimizer.java URL: http://svn.apache.org/viewvc/pig/branches/spark/src/org/apache/pig/newplan/logical/optimizer/LogicalPlanOptimizer.java?rev=1733627&r1=1733626&r2=1733627&view=diff ============================================================================== --- pig/branches/spark/src/org/apache/pig/newplan/logical/optimizer/LogicalPlanOptimizer.java (original) +++ pig/branches/spark/src/org/apache/pig/newplan/logical/optimizer/LogicalPlanOptimizer.java Fri Mar 4 18:17:39 2016 @@ -44,7 +44,6 @@ import org.apache.pig.newplan.logical.ru import org.apache.pig.newplan.logical.rules.PredicatePushdownOptimizer; import org.apache.pig.newplan.logical.rules.PushDownForEachFlatten; import org.apache.pig.newplan.logical.rules.PushUpFilter; -import org.apache.pig.newplan.logical.rules.RollupHIIOptimizer; import org.apache.pig.newplan.logical.rules.SplitFilter; import org.apache.pig.newplan.logical.rules.StreamTypeCastInserter; import org.apache.pig.newplan.optimizer.PlanOptimizer; @@ -57,7 +56,6 @@ public class LogicalPlanOptimizer extend private boolean allRulesDisabled = false; private SetMultimap<RulesReportKey, String> rulesReport = TreeMultimap.create(); private PigContext pc = null; - private static final String MAPREDUCE_FW = "MAPREDUCE"; public LogicalPlanOptimizer(OperatorPlan p, int iterations, Set<String> turnOffRules) { this(p, iterations, turnOffRules, null); @@ -205,20 +203,6 @@ public class LogicalPlanOptimizer extend if (!s.isEmpty()) ls.add(s); - // RollupHIIOptimizer Set - // This set of rules for rollup hii - // If pig is not running in MR mode, this rule will be disabled - if (pc!=null) - if (pc.getExecType().toString().equals(MAPREDUCE_FW)) { - s = new HashSet<Rule>(); - // Optimize RollupHII - r = new RollupHIIOptimizer("RollupHIIOptimizer"); - checkAndAddRule(s, r); - if (!s.isEmpty()) - ls.add(s); - } else { - LOG.info("Not MR mode. RollupHIIOptimizer is disabled"); - } return ls; } Modified: pig/branches/spark/src/org/apache/pig/newplan/logical/relational/LOCogroup.java URL: http://svn.apache.org/viewvc/pig/branches/spark/src/org/apache/pig/newplan/logical/relational/LOCogroup.java?rev=1733627&r1=1733626&r2=1733627&view=diff ============================================================================== --- pig/branches/spark/src/org/apache/pig/newplan/logical/relational/LOCogroup.java (original) +++ pig/branches/spark/src/org/apache/pig/newplan/logical/relational/LOCogroup.java Fri Mar 4 18:17:39 2016 @@ -70,60 +70,7 @@ public class LOCogroup extends LogicalRe * static constant to refer to the option of selecting a group type */ public final static Integer OPTION_GROUPTYPE = 1; - - //the pivot value - private int pivot = -1; - //the index of the first field involves in ROLLUP - private int rollupFieldIndex = 0; - //the original index of the first field involves in ROLLUP in case it was moved to the end - //(if we have the combination of cube and rollup) - private int rollupOldFieldIndex = 0; - //the size of total fields that involve in CUBE clause - private int dimensionSize = 0; - - //number of algebraic function that used after rollup - private int nAlgebraic = 0; - - public void setPivot(int pvt) { - this.pivot = pvt; - } - - public int getPivot() { - return this.pivot; - } - - public void setDimensionSize(int ds) { - this.dimensionSize = ds; - } - - public int getDimensionSize() { - return this.dimensionSize; - } - - public void setNumberAlgebraic(int na) { - this.nAlgebraic = na; - } - - public int getNumberAlgebraic() { - return this.nAlgebraic; - } - - public void setRollupOldFieldIndex(int rofi) { - this.rollupOldFieldIndex = rofi; - } - - public int getRollupOldFieldIndex() { - return this.rollupOldFieldIndex; - } - - public void setRollupFieldIndex(int rfi) { - this.rollupFieldIndex = rfi; - } - - public int getRollupFieldIndex() { - return this.rollupFieldIndex; - } - + /** * Constructor for use in defining rule patterns * @param plan Modified: pig/branches/spark/src/org/apache/pig/newplan/logical/relational/LOCube.java URL: http://svn.apache.org/viewvc/pig/branches/spark/src/org/apache/pig/newplan/logical/relational/LOCube.java?rev=1733627&r1=1733626&r2=1733627&view=diff ============================================================================== --- pig/branches/spark/src/org/apache/pig/newplan/logical/relational/LOCube.java (original) +++ pig/branches/spark/src/org/apache/pig/newplan/logical/relational/LOCube.java Fri Mar 4 18:17:39 2016 @@ -84,16 +84,6 @@ import org.apache.pig.newplan.logical.ex public class LOCube extends LogicalRelationalOperator { private MultiMap<Integer, LogicalExpressionPlan> mExpressionPlans; private List<String> operations; - //the pivot position - private int pivot = -1; - - public void setPivot(int pvt) { - this.pivot = pvt; - } - - public int getPivot() { - return this.pivot; - } public LOCube(LogicalPlan plan) { super("LOCube", plan); Modified: pig/branches/spark/src/org/apache/pig/newplan/logical/relational/LogToPhyTranslationVisitor.java URL: http://svn.apache.org/viewvc/pig/branches/spark/src/org/apache/pig/newplan/logical/relational/LogToPhyTranslationVisitor.java?rev=1733627&r1=1733626&r2=1733627&view=diff ============================================================================== --- pig/branches/spark/src/org/apache/pig/newplan/logical/relational/LogToPhyTranslationVisitor.java (original) +++ pig/branches/spark/src/org/apache/pig/newplan/logical/relational/LogToPhyTranslationVisitor.java Fri Mar 4 18:17:39 2016 @@ -58,7 +58,6 @@ import org.apache.pig.backend.hadoop.exe import org.apache.pig.backend.hadoop.executionengine.physicalLayer.relationalOperators.PONative; import org.apache.pig.backend.hadoop.executionengine.physicalLayer.relationalOperators.POPackage; import org.apache.pig.backend.hadoop.executionengine.physicalLayer.relationalOperators.PORank; -import org.apache.pig.backend.hadoop.executionengine.physicalLayer.relationalOperators.PORollupHIIForEach; import org.apache.pig.backend.hadoop.executionengine.physicalLayer.relationalOperators.POSkewedJoin; import org.apache.pig.backend.hadoop.executionengine.physicalLayer.relationalOperators.POSort; import org.apache.pig.backend.hadoop.executionengine.physicalLayer.relationalOperators.POSplit; @@ -422,6 +421,7 @@ public class LogToPhyTranslationVisitor poSort = new POSort(new OperatorKey(scope, nodeGen .getNextNodeId(scope)), -1, null, newPhysicalPlan, newOrderPlan, null); + //poSort.setRequestedParallelism(loRank.getRequestedParallelism()); poSort.addOriginalLocation(loRank.getAlias(), loRank.getLocation()); @@ -901,128 +901,6 @@ public class LogToPhyTranslationVisitor translateSoftLinks(foreach); } - @Override - public void visit(LORollupHIIForEach hforeach) throws FrontendException { - String scope = DEFAULT_SCOPE; - - List<PhysicalPlan> innerPlans = new ArrayList<PhysicalPlan>(); - - org.apache.pig.newplan.logical.relational.LogicalPlan inner = hforeach.getInnerPlan(); - LOGenerate gen = (LOGenerate) inner.getSinks().get(0); - - List<LogicalExpressionPlan> exps = gen.getOutputPlans(); - List<Operator> preds = inner.getPredecessors(gen); - - currentPlans.push(currentPlan); - - // we need to translate each predecessor of LOGenerate into a physical plan. - // The physical plan should contain the expression plan for this predecessor plus - // the subtree starting with this predecessor - for (int i = 0; i < exps.size(); i++) { - currentPlan = new PhysicalPlan(); - // translate the expression plan - PlanWalker childWalker = new ReverseDependencyOrderWalkerWOSeenChk(exps.get(i)); - pushWalker(childWalker); - childWalker.walk(new ExpToPhyTranslationVisitor(exps.get(i), childWalker, gen, - currentPlan, logToPhyMap)); - popWalker(); - - List<Operator> leaves = exps.get(i).getSinks(); - for (Operator l : leaves) { - PhysicalOperator op = logToPhyMap.get(l); - if (l instanceof ProjectExpression) { - int input = ((ProjectExpression) l).getInputNum(); - - // for each sink projection, get its input logical plan and translate it - Operator pred = preds.get(input); - childWalker = new SubtreeDependencyOrderWalker(inner, pred); - pushWalker(childWalker); - childWalker.walk(this); - popWalker(); - - // get the physical operator of the leaf of input logical plan - PhysicalOperator leaf = logToPhyMap.get(pred); - - if (pred instanceof LOInnerLoad) { - // if predecessor is only an LOInnerLoad, remove the project that - // comes from LOInnerLoad and change the column of project that - // comes from expression plan - currentPlan.remove(leaf); - logToPhyMap.remove(pred); - - POProject leafProj = (POProject) leaf; - try { - if (leafProj.isStar()) { - ((POProject) op).setStar(true); - } else if (leafProj.isProjectToEnd()) { - ((POProject) op).setProjectToEnd(leafProj.getStartCol()); - } else { - ((POProject) op).setColumn(leafProj.getColumn()); - } - - } catch (ExecException e) { - throw new FrontendException(hforeach, "Cannot get column from " + leaf, - 2230, e); - } - - } else { - currentPlan.connect(leaf, op); - } - } - } - innerPlans.add(currentPlan); - } - - currentPlan = currentPlans.pop(); - - // PhysicalOperator poGen = new POGenerate(new OperatorKey("", - // r.nextLong()), inputs, toBeFlattened); - boolean[] flatten = gen.getFlattenFlags(); - List<Boolean> flattenList = new ArrayList<Boolean>(); - for (boolean fl : flatten) { - flattenList.add(fl); - } - // Create new PORollupHIIForEach for translation from Logical Plan to Physical Plan - PORollupHIIForEach poHFE = new PORollupHIIForEach(new OperatorKey(scope, nodeGen.getNextNodeId(scope)), - hforeach.getRequestedParallelism(), innerPlans, flattenList); - - // if the pivot position is zero, set the pivot position for physical op is zero - if(hforeach.getPivot() == 0) - poHFE.setPivot(0); - //else, decrease pivot position by one, because the position user specified and the - //rollup field index is different by one - else - poHFE.setPivot(hforeach.getPivot() - 1); - //get the start field index and size of rollup position in case the rollup does not stand at the front - poHFE.setRollupFieldIndex(hforeach.getRollupFieldIndex()); - poHFE.setRollupOldFieldIndex(hforeach.getRollupOldFieldIndex()); - poHFE.setRollupSize(hforeach.getRollupSize()); - poHFE.setDimensionSize(hforeach.getDimensionSize()); - - poHFE.addOriginalLocation(hforeach.getAlias(), hforeach.getLocation()); - poHFE.setResultType(DataType.BAG); - logToPhyMap.put(hforeach, poHFE); - currentPlan.add(poHFE); - - // generate cannot have multiple inputs - List<Operator> op = hforeach.getPlan().getPredecessors(hforeach); - - // generate may not have any predecessors - if (op == null) - return; - - PhysicalOperator from = logToPhyMap.get(op.get(0)); - try { - currentPlan.connect(from, poHFE); - } catch (Exception e) { - int errCode = 2015; - String msg = "Invalid physical operators in the physical plan"; - throw new LogicalToPhysicalTranslatorException(msg, errCode, PigException.BUG, e); - } - - translateSoftLinks(hforeach); - } - /** * This function takes in a List of LogicalExpressionPlan and converts them to * a list of PhysicalPlans @@ -1133,19 +1011,6 @@ public class LogToPhyTranslationVisitor case REGULAR: POPackage poPackage = compileToLR_GR_PackTrio(cg, cg.getCustomPartitioner(), cg.getInner(), cg.getExpressionPlans()); poPackage.getPkgr().setPackageType(PackageType.GROUP); - if(cg.getPivot()!=-1) { - //Set the pivot value - poPackage.setPivot(cg.getPivot()); - //Set the size of total fields that involve in CUBE clause - poPackage.setDimensionSize(cg.getDimensionSize()); - //Set the index of the first field involves in ROLLUP - poPackage.setRollupFieldIndex(cg.getRollupFieldIndex()); - //Set the original index of the first field involves in ROLLUP in case it was moved to the end - //(if we have the combination of cube and rollup) - poPackage.setRollupOldFieldIndex(cg.getRollupOldFieldIndex()); - //Set number of algebraic functions that used after rollup - poPackage.setNumberAlgebraic(cg.getNumberAlgebraic()); - } logToPhyMap.put(cg, poPackage); break; case MERGE: @@ -1887,7 +1752,7 @@ public class LogToPhyTranslationVisitor throw new LogicalToPhysicalTranslatorException(msg, errCode, PigException.BUG, e); } - CompilerUtils.addEmptyBagOuterJoin(fePlan, Util.translateSchema(inputSchema)); + CompilerUtils.addEmptyBagOuterJoin(fePlan, Util.translateSchema(inputSchema), false, null); } Modified: pig/branches/spark/src/org/apache/pig/newplan/logical/relational/LogicalPlan.java URL: http://svn.apache.org/viewvc/pig/branches/spark/src/org/apache/pig/newplan/logical/relational/LogicalPlan.java?rev=1733627&r1=1733626&r2=1733627&view=diff ============================================================================== --- pig/branches/spark/src/org/apache/pig/newplan/logical/relational/LogicalPlan.java (original) +++ pig/branches/spark/src/org/apache/pig/newplan/logical/relational/LogicalPlan.java Fri Mar 4 18:17:39 2016 @@ -263,7 +263,6 @@ public class LogicalPlan extends BaseOpe disabledOptimizerRules.add("ColumnMapKeyPrune"); disabledOptimizerRules.add("AddForEach"); disabledOptimizerRules.add("GroupByConstParallelSetter"); - disabledOptimizerRules.add("RollupHIIOptimizer"); } try { Modified: pig/branches/spark/src/org/apache/pig/newplan/logical/relational/LogicalRelationalNodesVisitor.java URL: http://svn.apache.org/viewvc/pig/branches/spark/src/org/apache/pig/newplan/logical/relational/LogicalRelationalNodesVisitor.java?rev=1733627&r1=1733626&r2=1733627&view=diff ============================================================================== --- pig/branches/spark/src/org/apache/pig/newplan/logical/relational/LogicalRelationalNodesVisitor.java (original) +++ pig/branches/spark/src/org/apache/pig/newplan/logical/relational/LogicalRelationalNodesVisitor.java Fri Mar 4 18:17:39 2016 @@ -57,9 +57,6 @@ public abstract class LogicalRelationalN public void visit(LOForEach foreach) throws FrontendException { } - public void visit(LORollupHIIForEach horeach) throws FrontendException { - } - public void visit(LOGenerate gen) throws FrontendException { } Modified: pig/branches/spark/src/org/apache/pig/newplan/logical/relational/LogicalSchema.java URL: http://svn.apache.org/viewvc/pig/branches/spark/src/org/apache/pig/newplan/logical/relational/LogicalSchema.java?rev=1733627&r1=1733626&r2=1733627&view=diff ============================================================================== --- pig/branches/spark/src/org/apache/pig/newplan/logical/relational/LogicalSchema.java (original) +++ pig/branches/spark/src/org/apache/pig/newplan/logical/relational/LogicalSchema.java Fri Mar 4 18:17:39 2016 @@ -363,7 +363,7 @@ public class LogicalSchema { if (mode==MergeMode.UnionInner) { if (fs1.type!=fs2.type) // We don't merge inner schema of different type for union, throw exception - throw new FrontendException("Incompatable field schema: left is \"" + fs1.toString(false) + "\", right is \"" + fs2.toString(false) + "\"", 1031); + throw new FrontendException("Incompatible field schema: left is \"" + fs1.toString(false) + "\", right is \"" + fs2.toString(false) + "\"", 1031); else mergedType = fs1.type; } @@ -371,7 +371,7 @@ public class LogicalSchema { if (fs1.type==DataType.NULL||fs1.type==DataType.BYTEARRAY) // If declared schema does not have type part mergedType = fs2.type; else if (!DataType.castable(fs1.type, fs2.type)) - throw new FrontendException("Incompatable field schema: declared is \"" + fs1.toString(false) + "\", infered is \"" + fs2.toString(false) + "\"", 1031); + throw new FrontendException("Incompatible field schema: declared is \"" + fs1.toString(false) + "\", infered is \"" + fs2.toString(false) + "\"", 1031); else mergedType = fs1.type; // If compatible type, we take the declared type } else { @@ -438,7 +438,7 @@ public class LogicalSchema { // Only check compatibility mergedSubSchema = LogicalSchema.merge(fs1.schema, fs2.schema, MergeMode.LoadForEachInner); } catch (FrontendException e) { - throw new FrontendException("Incompatable field schema: left is \"" + fs1.toString(false) + "\", right is \"" + fs2.toString(false) + "\"", 1031); + throw new FrontendException("Incompatible field schema: left is \"" + fs1.toString(false) + "\", right is \"" + fs2.toString(false) + "\"", 1031); } } } @@ -757,7 +757,7 @@ public class LogicalSchema { if (mode==MergeMode.Union) // In union, incompatible type result a null schema return null; else - throw new FrontendException("Incompatable schema: left is \"" + s1.toString(false) + "\", right is \"" + s2.toString(false) + "\"", 1031); + throw new FrontendException("Incompatible schema: left is \"" + s1.toString(false) + "\", right is \"" + s2.toString(false) + "\"", 1031); } LogicalSchema mergedSchema = new LogicalSchema(); Modified: pig/branches/spark/src/org/apache/pig/newplan/logical/relational/MapSideMergeValidator.java URL: http://svn.apache.org/viewvc/pig/branches/spark/src/org/apache/pig/newplan/logical/relational/MapSideMergeValidator.java?rev=1733627&r1=1733626&r2=1733627&view=diff ============================================================================== --- pig/branches/spark/src/org/apache/pig/newplan/logical/relational/MapSideMergeValidator.java (original) +++ pig/branches/spark/src/org/apache/pig/newplan/logical/relational/MapSideMergeValidator.java Fri Mar 4 18:17:39 2016 @@ -39,7 +39,8 @@ public class MapSideMergeValidator { if (!(lo instanceof LOFilter || lo instanceof LOGenerate || lo instanceof LOInnerLoad || lo instanceof LOLoad || lo instanceof LOSplitOutput - || lo instanceof LOSplit + || lo instanceof LOSplit + || (lo instanceof LOJoin && ((LOJoin)lo).getJoinType() == LOJoin.JOINTYPE.REPLICATED) || isAcceptableSortOp(lo) || isAcceptableForEachOp(lo))) { throw new LogicalToPhysicalTranslatorException(errMsg, errCode); @@ -58,8 +59,7 @@ public class MapSideMergeValidator { private boolean isAcceptableForEachOp(Operator lo) throws LogicalToPhysicalTranslatorException { if (lo instanceof LOForEach) { OperatorPlan innerPlan = ((LOForEach) lo).getInnerPlan(); - validateMapSideMerge(innerPlan.getSinks(), innerPlan); - return !containsUDFs((LOForEach) lo); + return validateMapSideMerge(innerPlan.getSinks(), innerPlan); } else { return false; } @@ -82,22 +82,4 @@ public class MapSideMergeValidator { } return true; } - - private boolean containsUDFs(LOForEach fo) throws LogicalToPhysicalTranslatorException { - LogicalPlan logExpPlan = fo.getInnerPlan(); - UDFFinder udfFinder; - try { - udfFinder = new UDFFinder(logExpPlan); - udfFinder.visit(); - // TODO (dvryaboy): in the future we could relax this rule by tracing what fields - // are being passed into the UDF, and only refusing if the UDF is working on the - // join key. Transforms of other fields should be ok. - if (udfFinder.getUDFList().size() != 0) { - return true; - } - } catch (FrontendException e) { - throw new LogicalToPhysicalTranslatorException(e); - } - return false; - } } Modified: pig/branches/spark/src/org/apache/pig/newplan/logical/rules/OptimizerUtils.java URL: http://svn.apache.org/viewvc/pig/branches/spark/src/org/apache/pig/newplan/logical/rules/OptimizerUtils.java?rev=1733627&r1=1733626&r2=1733627&view=diff ============================================================================== --- pig/branches/spark/src/org/apache/pig/newplan/logical/rules/OptimizerUtils.java (original) +++ pig/branches/spark/src/org/apache/pig/newplan/logical/rules/OptimizerUtils.java Fri Mar 4 18:17:39 2016 @@ -27,7 +27,6 @@ import org.apache.pig.newplan.logical.ex import org.apache.pig.newplan.logical.expression.UserFuncExpression; import org.apache.pig.newplan.logical.relational.LOForEach; import org.apache.pig.newplan.logical.relational.LOGenerate; -import org.apache.pig.newplan.logical.relational.LORollupHIIForEach; import org.apache.pig.newplan.logical.relational.LogicalPlan; public class OptimizerUtils { @@ -42,16 +41,6 @@ public class OptimizerUtils { } /** - * Find generate op from the rolluphiiforeach operator. - * @param foreach the LORollupHIIForEach instance - * @return LOGenerate instance - */ - public static LOGenerate findGenerate(LORollupHIIForEach hfe) { - LogicalPlan inner = hfe.getInnerPlan(); - return (LOGenerate) inner.getSinks().get(0); - } - - /** * Check if a given LOGenerate operator has any flatten fields. * @param gen the given LOGenerate instance * @return true if LOGenerate instance contains flatten fields, false otherwise Modified: pig/branches/spark/src/org/apache/pig/newplan/logical/rules/PartitionFilterOptimizer.java URL: http://svn.apache.org/viewvc/pig/branches/spark/src/org/apache/pig/newplan/logical/rules/PartitionFilterOptimizer.java?rev=1733627&r1=1733626&r2=1733627&view=diff ============================================================================== --- pig/branches/spark/src/org/apache/pig/newplan/logical/rules/PartitionFilterOptimizer.java (original) +++ pig/branches/spark/src/org/apache/pig/newplan/logical/rules/PartitionFilterOptimizer.java Fri Mar 4 18:17:39 2016 @@ -20,16 +20,19 @@ package org.apache.pig.newplan.logical.r import java.io.IOException; import java.util.ArrayList; +import java.util.Arrays; import java.util.HashMap; import java.util.List; import java.util.Map; +import org.apache.commons.logging.Log; +import org.apache.commons.logging.LogFactory; import org.apache.hadoop.mapreduce.Job; import org.apache.pig.Expression; -import org.apache.pig.LoadFunc; -import org.apache.pig.LoadMetadata; import org.apache.pig.Expression.BinaryExpression; import org.apache.pig.Expression.Column; +import org.apache.pig.LoadFunc; +import org.apache.pig.LoadMetadata; import org.apache.pig.impl.logicalLayer.FrontendException; import org.apache.pig.newplan.FilterExtractor; import org.apache.pig.newplan.Operator; @@ -45,6 +48,8 @@ import org.apache.pig.newplan.optimizer. import org.apache.pig.newplan.optimizer.Transformer; public class PartitionFilterOptimizer extends Rule { + + private static final Log LOG = LogFactory.getLog(PartitionFilterOptimizer.class); private String[] partitionKeys; /** @@ -100,6 +105,7 @@ public class PartitionFilterOptimizer ex public class PartitionFilterPushDownTransformer extends Transformer { protected OperatorSubPlan subPlan; + private boolean planChanged; @Override public boolean check(OperatorPlan matched) throws FrontendException { @@ -136,7 +142,10 @@ public class PartitionFilterOptimizer ex @Override public OperatorPlan reportChanges() { - return subPlan; + // Return null in case there is no partition filter extracted + // which means the plan hasn't changed. + // If not return the modified plan which has filters removed. + return planChanged ? subPlan : null; } @Override @@ -148,6 +157,7 @@ public class PartitionFilterOptimizer ex FilterExtractor filterFinder = new PartitionFilterExtractor(loFilter.getFilterPlan(), getMappedKeys(partitionKeys)); filterFinder.visit(); + LOG.info("Partition keys are " + Arrays.asList(partitionKeys)); Expression partitionFilter = filterFinder.getPushDownExpression(); if(partitionFilter != null) { @@ -157,7 +167,9 @@ public class PartitionFilterOptimizer ex // LoadFunc.getSchema() updateMappedColNames(partitionFilter); try { + LOG.info("Setting partition filter [" + partitionFilter + "] on loader " + loadMetadata); loadMetadata.setPartitionFilter(partitionFilter); + planChanged = true; } catch (IOException e) { throw new FrontendException( e ); } Modified: pig/branches/spark/src/org/apache/pig/newplan/logical/rules/PredicatePushdownOptimizer.java URL: http://svn.apache.org/viewvc/pig/branches/spark/src/org/apache/pig/newplan/logical/rules/PredicatePushdownOptimizer.java?rev=1733627&r1=1733626&r2=1733627&view=diff ============================================================================== --- pig/branches/spark/src/org/apache/pig/newplan/logical/rules/PredicatePushdownOptimizer.java (original) +++ pig/branches/spark/src/org/apache/pig/newplan/logical/rules/PredicatePushdownOptimizer.java Fri Mar 4 18:17:39 2016 @@ -24,13 +24,15 @@ import java.util.HashMap; import java.util.List; import java.util.Map; +import org.apache.commons.logging.Log; +import org.apache.commons.logging.LogFactory; import org.apache.hadoop.mapreduce.Job; import org.apache.pig.Expression; +import org.apache.pig.Expression.BinaryExpression; +import org.apache.pig.Expression.Column; import org.apache.pig.LoadFunc; import org.apache.pig.LoadMetadata; import org.apache.pig.LoadPredicatePushdown; -import org.apache.pig.Expression.BinaryExpression; -import org.apache.pig.Expression.Column; import org.apache.pig.impl.logicalLayer.FrontendException; import org.apache.pig.newplan.Operator; import org.apache.pig.newplan.OperatorPlan; @@ -46,6 +48,8 @@ import org.apache.pig.newplan.optimizer. public class PredicatePushdownOptimizer extends Rule { + private static final Log LOG = LogFactory.getLog(PredicatePushdownOptimizer.class); + public PredicatePushdownOptimizer(String name) { super(name, false); } @@ -153,6 +157,7 @@ public class PredicatePushdownOptimizer // LoadFunc.getSchema() updateMappedColNames(pushDownPredicate); try { + LOG.info("Setting predicate pushdown filter [" + pushDownPredicate + "] on loader " + loadPredPushdown); loadPredPushdown.setPushdownPredicate(pushDownPredicate); } catch (IOException e) { throw new FrontendException( e ); Modified: pig/branches/spark/src/org/apache/pig/newplan/logical/visitor/TypeCheckingExpVisitor.java URL: http://svn.apache.org/viewvc/pig/branches/spark/src/org/apache/pig/newplan/logical/visitor/TypeCheckingExpVisitor.java?rev=1733627&r1=1733626&r2=1733627&view=diff ============================================================================== --- pig/branches/spark/src/org/apache/pig/newplan/logical/visitor/TypeCheckingExpVisitor.java (original) +++ pig/branches/spark/src/org/apache/pig/newplan/logical/visitor/TypeCheckingExpVisitor.java Fri Mar 4 18:17:39 2016 @@ -227,7 +227,7 @@ public class TypeCheckingExpVisitor exte private String generateIncompatibleTypesMessage(BinaryExpression binOp) throws FrontendException { String msg = binOp.toString(); - if (currentRelOp.getAlias()!=null){ + if (currentRelOp != null && currentRelOp.getAlias() != null) { msg = "In alias " + currentRelOp.getAlias() + ", "; } LogicalFieldSchema lhsFs = binOp.getLhs().getFieldSchema(); @@ -488,7 +488,7 @@ public class TypeCheckingExpVisitor exte public void visit(CastExpression cast) throws FrontendException { byte inType = cast.getExpression().getType(); byte outType = cast.getType(); - if(outType == DataType.BYTEARRAY){ + if(outType == DataType.BYTEARRAY && inType != outType) { int errCode = 1051; String msg = "Cannot cast to bytearray"; msgCollector.collect(msg, MessageType.Error) ; @@ -710,7 +710,7 @@ public class TypeCheckingExpVisitor exte String msg = "Unable to get list of overloaded methods."; throw new TypeCheckerException(func, msg, errCode, PigException.INPUT, e); } - + // EvalFunc's schema type SchemaType udfSchemaType = ef.getSchemaType(); @@ -820,7 +820,7 @@ public class TypeCheckingExpVisitor exte * input schema * @param func - * udf expression - * @param udfSchemaType - + * @param udfSchemaType - * schema type of the udf * @return the funcSpec that supports the schema that is best suited to s. * The best suited schema is one that has the lowest score as @@ -912,6 +912,7 @@ public class TypeCheckingExpVisitor exte /* (non-Javadoc) * @see java.util.Comparator#compare(java.lang.Object, java.lang.Object) */ + @Override public int compare(Pair<Long, FuncSpec> o1, Pair<Long, FuncSpec> o2) { if(o1.first < o2.first) return -1; @@ -1135,11 +1136,11 @@ public class TypeCheckingExpVisitor exte //if there's no more UDF field: take the last one which is the vararg field udfFieldSchema = j.hasNext() ? j.next() : udfFieldSchema; - + if(ignoreByteArrays && inputFieldSchema.type == DataType.BYTEARRAY) { continue; } - + if (inputFieldSchema.type != udfFieldSchema.type) { return false; } @@ -1313,7 +1314,7 @@ public class TypeCheckingExpVisitor exte if(s1==null || s2==null) return INF; List<FieldSchema> sFields = s1.getFields(); List<FieldSchema> fsFields = s2.getFields(); - + if((s2Type == SchemaType.NORMAL) && (sFields.size()!=fsFields.size())) return INF; if((s2Type == SchemaType.VARARG) && (sFields.size() < fsFields.size())) @@ -1332,9 +1333,9 @@ public class TypeCheckingExpVisitor exte // of this function if (sFS.type == DataType.BYTEARRAY) continue; - + //if we get to the vararg field (if defined) : take it repeatedly - FieldSchema fsFS = ((s2Type == SchemaType.VARARG) && i >= s2.size()) ? + FieldSchema fsFS = ((s2Type == SchemaType.VARARG) && i >= s2.size()) ? fsFields.get(s2.size() - 1) : fsFields.get(i); if(DataType.isSchemaType(sFS.type)){ @@ -1361,7 +1362,7 @@ public class TypeCheckingExpVisitor exte for (FieldSchema fFSch : fsLst) { ++i; //if we get to the vararg field (if defined) : take it repeatedly - FieldSchema tFSch = ((toSchType == SchemaType.VARARG) && i >= tsLst.size()) ? + FieldSchema tFSch = ((toSchType == SchemaType.VARARG) && i >= tsLst.size()) ? tsLst.get(tsLst.size() - 1) : tsLst.get(i); if (fFSch.type == tFSch.type) { continue; Modified: pig/branches/spark/src/org/apache/pig/newplan/logical/visitor/TypeCheckingRelVisitor.java URL: http://svn.apache.org/viewvc/pig/branches/spark/src/org/apache/pig/newplan/logical/visitor/TypeCheckingRelVisitor.java?rev=1733627&r1=1733626&r2=1733627&view=diff ============================================================================== --- pig/branches/spark/src/org/apache/pig/newplan/logical/visitor/TypeCheckingRelVisitor.java (original) +++ pig/branches/spark/src/org/apache/pig/newplan/logical/visitor/TypeCheckingRelVisitor.java Fri Mar 4 18:17:39 2016 @@ -767,12 +767,14 @@ public class TypeCheckingRelVisitor exte throw new FrontendException(msg, errCode, PigException.INPUT, false, null) ; } byte innerType = ((LogicalExpression)innerPlans.get(0).getSources().get(0)).getType() ; - groupType = DataType.mergeType(groupType, innerType) ; - if (groupType == -1) + byte newGroupType = DataType.mergeType(groupType, innerType) ; + if (newGroupType == -1) { int errCode = 1107; - String msg = "Cannot merge join keys, incompatible types"; + String msg = "Cannot merge join keys, incompatible types. Outer: " + DataType.findTypeName(groupType) + "; inner: " + DataType.findTypeName(innerType); throw new FrontendException(msg, errCode, PigException.INPUT) ; + } else { + groupType = newGroupType; } } @@ -806,12 +808,14 @@ public class TypeCheckingRelVisitor exte throw new FrontendException(msg, errCode, PigException.INPUT, false, null) ; } byte innerType = ((LogicalExpression)innerPlans.get(0).getSources().get(0)).getType() ; - groupType = DataType.mergeType(groupType, innerType) ; - if (groupType == -1) + byte newGroupType = DataType.mergeType(groupType, innerType) ; + if (newGroupType == -1) { int errCode = 1107; - String msg = "Cannot merge join keys, incompatible types"; + String msg = "Cannot merge join keys, incompatible types. Outer: " + DataType.findTypeName(groupType) + "; inner: " + DataType.findTypeName(innerType); throw new FrontendException(msg, errCode, PigException.INPUT) ; + } else { + groupType = newGroupType; } } Modified: pig/branches/spark/src/org/apache/pig/parser/AliasMasker.g URL: http://svn.apache.org/viewvc/pig/branches/spark/src/org/apache/pig/parser/AliasMasker.g?rev=1733627&r1=1733626&r2=1733627&view=diff ============================================================================== --- pig/branches/spark/src/org/apache/pig/parser/AliasMasker.g (original) +++ pig/branches/spark/src/org/apache/pig/parser/AliasMasker.g Fri Mar 4 18:17:39 2016 @@ -247,10 +247,6 @@ cube_clause : ^( CUBE cube_item ) ; -pivot_clause - : ^( PIVOT INTEGER ) -; - cube_item : rel ( cube_by_clause ) ; @@ -264,7 +260,7 @@ cube_or_rollup ; cube_rollup_list - : ^( CUBE cube_by_expr_list ) | ^( ROLLUP cube_by_expr_list pivot_clause? ) + : ^( ( CUBE | ROLLUP ) cube_by_expr_list ) ; cube_by_expr_list @@ -646,7 +642,6 @@ eid : rel_str_op | FOREACH | CUBE | ROLLUP - | PIVOT | MATCHES | ORDER | RANK Modified: pig/branches/spark/src/org/apache/pig/parser/AstPrinter.g URL: http://svn.apache.org/viewvc/pig/branches/spark/src/org/apache/pig/parser/AstPrinter.g?rev=1733627&r1=1733626&r2=1733627&view=diff ============================================================================== --- pig/branches/spark/src/org/apache/pig/parser/AstPrinter.g (original) +++ pig/branches/spark/src/org/apache/pig/parser/AstPrinter.g Fri Mar 4 18:17:39 2016 @@ -106,10 +106,6 @@ parallel_clause : ^( PARALLEL INTEGER ) { sb.append(" ").append($PARALLEL.text).append(" ").append($INTEGER.text); } ; -pivot_clause - : ^( PIVOT INTEGER ) { sb.append(" ").append($PIVOT.text).append(" ").append($INTEGER.text); } -; - alias : IDENTIFIER { sb.append($IDENTIFIER.text); } ; @@ -266,7 +262,7 @@ cube_or_rollup ; cube_rollup_list - : ^( CUBE { sb.append($CUBE.text).append("("); } cube_by_expr_list { sb.append(")"); } ) | ^( ROLLUP { sb.append($ROLLUP.text).append("("); } cube_by_expr_list { sb.append(")"); } ) + : ^( ( CUBE { sb.append($CUBE.text).append("("); } | ROLLUP { sb.append($ROLLUP.text).append("("); } ) cube_by_expr_list { sb.append(")"); }) ; cube_by_expr_list @@ -274,7 +270,7 @@ cube_by_expr_list ; cube_by_expr - : col_range | expr | STAR { sb.append($STAR.text); } { sb.append(" "); } + : col_range | expr | STAR { sb.append($STAR.text); } ; group_clause @@ -676,7 +672,6 @@ eid : rel_str_op | FOREACH { sb.append($FOREACH.text); } | CUBE { sb.append($CUBE.text); } | ROLLUP { sb.append($ROLLUP.text); } - | PIVOT { sb.append($PIVOT.text); } | MATCHES { sb.append($MATCHES.text); } | ORDER { sb.append($ORDER.text); } | RANK { sb.append($RANK.text); } Modified: pig/branches/spark/src/org/apache/pig/parser/AstValidator.g URL: http://svn.apache.org/viewvc/pig/branches/spark/src/org/apache/pig/parser/AstValidator.g?rev=1733627&r1=1733626&r2=1733627&view=diff ============================================================================== --- pig/branches/spark/src/org/apache/pig/parser/AstValidator.g (original) +++ pig/branches/spark/src/org/apache/pig/parser/AstValidator.g Fri Mar 4 18:17:39 2016 @@ -296,10 +296,6 @@ cube_clause : ^( CUBE cube_item ) ; -pivot_clause - : ^( PIVOT INTEGER ) -; - cube_item : rel ( cube_by_clause ) ; @@ -313,7 +309,7 @@ cube_or_rollup ; cube_rollup_list - : ^( CUBE cube_by_expr_list ) | ^( ROLLUP cube_by_expr_list pivot_clause? ) + : ^( ( CUBE | ROLLUP ) cube_by_expr_list ) ; cube_by_expr_list @@ -667,7 +663,6 @@ eid : rel_str_op | FOREACH | CUBE | ROLLUP - | PIVOT | MATCHES | ORDER | RANK Modified: pig/branches/spark/src/org/apache/pig/parser/LogicalPlanBuilder.java URL: http://svn.apache.org/viewvc/pig/branches/spark/src/org/apache/pig/parser/LogicalPlanBuilder.java?rev=1733627&r1=1733626&r2=1733627&view=diff ============================================================================== --- pig/branches/spark/src/org/apache/pig/parser/LogicalPlanBuilder.java (original) +++ pig/branches/spark/src/org/apache/pig/parser/LogicalPlanBuilder.java Fri Mar 4 18:17:39 2016 @@ -451,30 +451,10 @@ public class LogicalPlanBuilder { return new LOCube(plan); } - void setPivotRollupCubeOp(LOCube op, Integer pivot) throws ParserValidationException { - if(pivot!=null) - op.setPivot(pivot); - } - String buildCubeOp(SourceLocation loc, LOCube op, String alias, String inputAlias, List<String> operations, MultiMap<Integer, LogicalExpressionPlan> expressionPlans) throws ParserValidationException { - // check value of pivot if it is valid or not, if not pivot position - // is specified, the pivot at middle position will be chosen - try { - if(op.getPivot()!=-1) { - if (op.getPivot() < 0 || op.getPivot() >= expressionPlans.get(0).size()) { - FrontendException fe = new FrontendException("PIVOT is out of bound"); - throw fe; - } - } - else - op.setPivot((int)(Math.round(expressionPlans.get(0).size()/2.0))); - } catch (FrontendException e) { - throw new ParserValidationException(intStream, loc, e); - } - // check if continuously occurring cube operations be combined combineCubeOperations((ArrayList<String>) operations, expressionPlans); @@ -733,7 +713,6 @@ public class LogicalPlanBuilder { // build group by operator try { - groupby.setPivot(op.getPivot()); return buildGroupOp(loc, (LOCogroup) groupby, op.getAlias(), inpAliases, exprPlansCopy, GROUPTYPE.REGULAR, innerFlags, null); } catch (ParserValidationException pve) { Modified: pig/branches/spark/src/org/apache/pig/parser/LogicalPlanGenerator.g URL: http://svn.apache.org/viewvc/pig/branches/spark/src/org/apache/pig/parser/LogicalPlanGenerator.g?rev=1733627&r1=1733626&r2=1733627&view=diff ============================================================================== --- pig/branches/spark/src/org/apache/pig/parser/LogicalPlanGenerator.g (original) +++ pig/branches/spark/src/org/apache/pig/parser/LogicalPlanGenerator.g Fri Mar 4 18:17:39 2016 @@ -493,20 +493,12 @@ func_args returns[List<String> args] // It also outputs the order of operations i.e in this case CUBE operation followed by ROLLUP operation // These inputs are passed to buildCubeOp methods which then builds the logical plan for CUBE operator. // If user specifies STAR or RANGE expression for dimensions then it will be expanded inside buildCubeOp. -pivot_clause returns[int pivot] - : ^( PIVOT INTEGER ) - { - $pivot = Integer.parseInt( $INTEGER.text ); - } -; - cube_clause returns[String alias] scope { LOCube cubeOp; MultiMap<Integer, LogicalExpressionPlan> cubePlans; List<String> operations; int inputIndex; - int pivot; } scope GScope; @init { @@ -556,7 +548,7 @@ cube_rollup_list returns[String operatio @init { $plans = new ArrayList<LogicalExpressionPlan>(); } - : ^( CUBE { $operation = "CUBE"; } cube_by_expr_list { $plans = $cube_by_expr_list.plans; } ) | ^( ROLLUP { $operation = "ROLLUP"; } cube_by_expr_list { $plans = $cube_by_expr_list.plans; } pivot_clause? { if ($pivot_clause.tree!=null) builder.setPivotRollupCubeOp($cube_clause::cubeOp, $pivot_clause.pivot); } ) + : ^( ( CUBE { $operation = "CUBE"; } | ROLLUP { $operation = "ROLLUP"; } ) cube_by_expr_list { $plans = $cube_by_expr_list.plans; } ) ; cube_by_expr_list returns[List<LogicalExpressionPlan> plans] @@ -1949,7 +1941,6 @@ eid returns[String id] : rel_str_op { $i | COGROUP { $id = $COGROUP.text; } | CUBE { $id = $CUBE.text; } | ROLLUP { $id = $ROLLUP.text; } - | PIVOT { $id = $PIVOT.text; } | JOIN { $id = $JOIN.text; } | CROSS { $id = $CROSS.text; } | UNION { $id = $UNION.text; } Modified: pig/branches/spark/src/org/apache/pig/parser/QueryLexer.g URL: http://svn.apache.org/viewvc/pig/branches/spark/src/org/apache/pig/parser/QueryLexer.g?rev=1733627&r1=1733626&r2=1733627&view=diff ============================================================================== --- pig/branches/spark/src/org/apache/pig/parser/QueryLexer.g (original) +++ pig/branches/spark/src/org/apache/pig/parser/QueryLexer.g Fri Mar 4 18:17:39 2016 @@ -153,9 +153,6 @@ ONSCHEMA : 'ONSCHEMA' PARALLEL : 'PARALLEL' ; -PIVOT : 'PIVOT' -; - PARTITION : 'PARTITION' ; Modified: pig/branches/spark/src/org/apache/pig/parser/QueryParser.g URL: http://svn.apache.org/viewvc/pig/branches/spark/src/org/apache/pig/parser/QueryParser.g?rev=1733627&r1=1733626&r2=1733627&view=diff ============================================================================== --- pig/branches/spark/src/org/apache/pig/parser/QueryParser.g (original) +++ pig/branches/spark/src/org/apache/pig/parser/QueryParser.g Fri Mar 4 18:17:39 2016 @@ -595,10 +595,7 @@ union_clause : UNION^ ONSCHEMA? rel_list cube_clause : CUBE rel BY cube_rollup_list ( COMMA cube_rollup_list )* -> ^( CUBE rel ^( BY cube_rollup_list+ ) ) ; -cube_rollup_list : ( CUBE^ LEFT_PAREN! real_arg ( COMMA! real_arg )* RIGHT_PAREN! ) | ( ROLLUP^ LEFT_PAREN! real_arg ( COMMA! real_arg )* RIGHT_PAREN! pivot_clause? ) -; - -pivot_clause : PIVOT^ INTEGER +cube_rollup_list : ( CUBE | ROLLUP )^ LEFT_PAREN! real_arg ( COMMA! real_arg )* RIGHT_PAREN! ; flatten_clause : FLATTEN^ LEFT_PAREN! expr RIGHT_PAREN! Modified: pig/branches/spark/src/org/apache/pig/parser/QueryParserDriver.java URL: http://svn.apache.org/viewvc/pig/branches/spark/src/org/apache/pig/parser/QueryParserDriver.java?rev=1733627&r1=1733626&r2=1733627&view=diff ============================================================================== --- pig/branches/spark/src/org/apache/pig/parser/QueryParserDriver.java (original) +++ pig/branches/spark/src/org/apache/pig/parser/QueryParserDriver.java Fri Mar 4 18:17:39 2016 @@ -315,32 +315,14 @@ public class QueryParserDriver { if (t.getText().equalsIgnoreCase(REGISTER_DEF)) { String path = t.getChild(0).getText(); path = path.substring(1, path.length()-1); - - if (path.endsWith(".jar")) { - if (t.getChildCount() != 1) { - throw new ParserException("REGISTER statement refers to JAR but has a USING..AS scripting engine clause. " + - "Statement: " + t.toStringTree()); - } - - try { - getPigServer().registerJar(path); - } catch (IOException ioe) { - throw new ParserException(ioe.getMessage()); - } - } else { - if (t.getChildCount() != 5) { - throw new ParserException("REGISTER statement for non-JAR file requires a USING scripting_lang AS namespace clause. " + - "Ex. REGISTER 'my_file.py' USING jython AS my_jython_udfs;"); - } - - String scriptingLang = t.getChild(2).getText(); - String namespace = t.getChild(4).getText(); - - try { - getPigServer().registerCode(path, scriptingLang, namespace); - } catch (IOException ioe) { - throw new ParserException(ioe.getMessage()); + try { + if (t.getChildCount() == 5) { + new RegisterResolver(getPigServer()).parseRegister(path, t.getChild(2).getText(), t.getChild(4).getText()); + } else { + new RegisterResolver(getPigServer()).parseRegister(path, null, null); } + } catch (IOException ioe) { + throw new ParserException(ioe.getMessage()); } } else { for (int i = 0; i < t.getChildCount(); i++) { Modified: pig/branches/spark/src/org/apache/pig/pen/ExampleGenerator.java URL: http://svn.apache.org/viewvc/pig/branches/spark/src/org/apache/pig/pen/ExampleGenerator.java?rev=1733627&r1=1733626&r2=1733627&view=diff ============================================================================== --- pig/branches/spark/src/org/apache/pig/pen/ExampleGenerator.java (original) +++ pig/branches/spark/src/org/apache/pig/pen/ExampleGenerator.java Fri Mar 4 18:17:39 2016 @@ -18,44 +18,42 @@ package org.apache.pig.pen; +import java.io.IOException; +import java.util.Collection; import java.util.HashMap; +import java.util.Iterator; import java.util.LinkedList; import java.util.List; import java.util.Map; -import java.util.Collection; -import java.util.Iterator; -import java.io.IOException; -import org.apache.pig.impl.util.IdentityHashSet; import org.apache.commons.logging.Log; import org.apache.commons.logging.LogFactory; +import org.apache.pig.PigException; import org.apache.pig.backend.executionengine.ExecException; import org.apache.pig.backend.hadoop.executionengine.mapReduceLayer.MRExecutionEngine; import org.apache.pig.backend.hadoop.executionengine.physicalLayer.PhysicalOperator; import org.apache.pig.backend.hadoop.executionengine.physicalLayer.plans.PhysicalPlan; import org.apache.pig.backend.hadoop.executionengine.physicalLayer.relationalOperators.POLoad; - - import org.apache.pig.data.DataBag; import org.apache.pig.data.Tuple; import org.apache.pig.impl.PigContext; -import org.apache.pig.PigException; import org.apache.pig.impl.io.FileLocalizer; import org.apache.pig.impl.logicalLayer.FrontendException; +import org.apache.pig.impl.util.IdentityHashSet; +import org.apache.pig.newplan.Operator; import org.apache.pig.newplan.logical.relational.LOForEach; +import org.apache.pig.newplan.logical.relational.LOLimit; import org.apache.pig.newplan.logical.relational.LOLoad; +import org.apache.pig.newplan.logical.relational.LOSort; import org.apache.pig.newplan.logical.relational.LogicalPlan; import org.apache.pig.newplan.logical.relational.LogicalRelationalOperator; import org.apache.pig.newplan.logical.relational.LogicalSchema; -import org.apache.pig.newplan.logical.relational.LOSort; -import org.apache.pig.newplan.logical.relational.LOLimit; -import org.apache.pig.newplan.Operator; import org.apache.pig.pen.util.DisplayExamples; import org.apache.pig.pen.util.LineageTracer; /** - * This class is used to generate example tuples for the ILLUSTRATE purpose - * + * This class is used to generate example tuples for the ILLUSTRATE purpose + * * */ public class ExampleGenerator { @@ -73,7 +71,7 @@ public class ExampleGenerator { Log log = LogFactory.getLog(getClass()); private int MAX_RECORDS = 10000; - + private Map<Operator, PhysicalOperator> logToPhyMap; private Map<PhysicalOperator, Operator> poLoadToLogMap; private Map<PhysicalOperator, Operator> poToLogMap; @@ -107,11 +105,11 @@ public class ExampleGenerator { public LineageTracer getLineage() { return lineage; } - + public Map<Operator, PhysicalOperator> getLogToPhyMap() { return logToPhyMap; } - + public void setMaxRecords(int max) { MAX_RECORDS = max; } @@ -131,7 +129,7 @@ public class ExampleGenerator { poLoadToLogMap = new HashMap<PhysicalOperator, Operator>(); logToDataMap = new HashMap<Operator, DataBag>(); poToLogMap = new HashMap<PhysicalOperator, Operator>(); - + // set up foreach inner data map forEachInnerLogToDataMap = new HashMap<LOForEach, Map<LogicalRelationalOperator, DataBag>>(); for (Map.Entry<LOForEach, Map<LogicalRelationalOperator, PhysicalOperator>> entry : forEachInnerLogToPhyMap.entrySet()) { @@ -150,7 +148,7 @@ public class ExampleGenerator { if (!hasLimit && lo instanceof LOLimit) hasLimit = true; } - + try { readBaseData(loads); } catch (ExecException e) { @@ -158,7 +156,7 @@ public class ExampleGenerator { throw e; } catch (FrontendException e) { log.error("Error reading data. " + e.getMessage()); - throw new RuntimeException(e.getMessage()); + throw new RuntimeException(e); } Map<Operator, DataBag> derivedData = null; @@ -191,7 +189,7 @@ public class ExampleGenerator { // create the final version of derivedData to give to the output derivedData = getData(physPlan); // System.out.println("Obtaining final derived data for output"); - + if (hasLimit) { augment.setLimit(); @@ -235,14 +233,14 @@ public class ExampleGenerator { PhysicalPlan result = execEngine.compile(plan, null); return result; } - + public Map<Operator, DataBag> getData() throws IOException, InterruptedException { return getData(physPlan); } - + private Map<Operator, DataBag> getData(PhysicalPlan plan) throws PigException, IOException, InterruptedException { - // get data on a physical plan possibly trimmed of one branch + // get data on a physical plan possibly trimmed of one branch lineage = new LineageTracer(); IllustratorAttacher attacher = new IllustratorAttacher(plan, lineage, MAX_RECORDS, poLoadToSchemaMap, pigContext); attacher.visit(); @@ -270,13 +268,13 @@ public class ExampleGenerator { phyToMRTransform(plan, attacher.getDataMap()); return logToDataMap; } - - public Map<Operator, DataBag> getData(Map<LOLoad, DataBag> newBaseData) throws Exception + + public Map<Operator, DataBag> getData(Map<LOLoad, DataBag> newBaseData) throws Exception { baseData = newBaseData; return getData(physPlan); } - + private void phyToMRTransform(PhysicalPlan plan, Map<PhysicalOperator, DataBag> phyToDataMap) { // remap the LO to PO as result of the MR compilation may have changed PO in the MR plans Map<PhysicalOperator, PhysicalOperator> phyToMRMap = localMRRunner.getPhyToMRMap(); @@ -288,14 +286,14 @@ public class ExampleGenerator { } } } - + private void getLogToDataMap(Map<PhysicalOperator, DataBag> phyToDataMap) { logToDataMap.clear(); for (Operator lo : logToPhyMap.keySet()) { if (logToPhyMap.get(lo) != null) logToDataMap.put(lo, phyToDataMap.get(logToPhyMap.get(lo))); } - + // set the LO-to-Data mapping for the ForEach inner plans for (Map.Entry<LOForEach, Map<LogicalRelationalOperator, DataBag>> entry : forEachInnerLogToDataMap.entrySet()) { entry.getValue().clear(); @@ -304,7 +302,7 @@ public class ExampleGenerator { } } } - + private void setLoadDataMap() { // This function sets up the LO-TO-Data map, eq. class, and lineage for the base data used in the coming runner // this must be called after logToDataMap has been properly (re)set and before the runner is started @@ -326,7 +324,7 @@ public class ExampleGenerator { } } } - + public Collection<IdentityHashSet<Tuple>> getEqClasses() { Map<LogicalRelationalOperator, Collection<IdentityHashSet<Tuple>>> logToEqclassesMap = getLoToEqClassMap(); LinkedList<IdentityHashSet<Tuple>> ret = new LinkedList<IdentityHashSet<Tuple>>(); @@ -342,7 +340,7 @@ public class ExampleGenerator { Map<LogicalRelationalOperator, Collection<IdentityHashSet<Tuple>>> ret = EquivalenceClasses.getLoToEqClassMap(physPlan, newPlan, logToPhyMap, logToDataMap, forEachInnerLogToPhyMap, poToEqclassesMap); // eq classes adjustments based upon logical operators - + for (Map.Entry<LogicalRelationalOperator, Collection<IdentityHashSet<Tuple>>> entry :ret.entrySet()) { if (entry.getKey() instanceof LOSort) { @@ -372,7 +370,7 @@ public class ExampleGenerator { } } } - + return ret; } } Modified: pig/branches/spark/src/org/apache/pig/scripting/BoundScript.java URL: http://svn.apache.org/viewvc/pig/branches/spark/src/org/apache/pig/scripting/BoundScript.java?rev=1733627&r1=1733626&r2=1733627&view=diff ============================================================================== --- pig/branches/spark/src/org/apache/pig/scripting/BoundScript.java (original) +++ pig/branches/spark/src/org/apache/pig/scripting/BoundScript.java Fri Mar 4 18:17:39 2016 @@ -31,8 +31,8 @@ import java.util.concurrent.Future; import org.apache.commons.logging.Log; import org.apache.commons.logging.LogFactory; -import org.apache.pig.PigServer; import org.apache.pig.PigRunner.ReturnCode; +import org.apache.pig.PigServer; import org.apache.pig.impl.PigContext; import org.apache.pig.tools.grunt.GruntParser; import org.apache.pig.tools.pigscript.parser.ParseException; @@ -45,41 +45,41 @@ import org.apache.pig.tools.pigstats.Scr * This represents an instance of a bound pipeline. */ public class BoundScript { - + private static final Log LOG = LogFactory.getLog(BoundScript.class); - + private List<String> queries = new ArrayList<String>(); private String name = null; - + private ScriptPigContext scriptContext = null; - + BoundScript(String query, ScriptPigContext scriptContext, String name) { this.queries.add(query); this.scriptContext = scriptContext; - this.name = name; + this.name = name; } - + BoundScript(List<String> queries, ScriptPigContext scriptContext, String name) { this.queries.addAll(queries); this.scriptContext = ScriptPigContext.get(); - this.name = name; + this.name = name; } - + /** - * Run a pipeline on Hadoop. - * If there are no stores in this pipeline then nothing will be run. + * Run a pipeline on Hadoop. + * If there are no stores in this pipeline then nothing will be run. * @return {@link PigStats}, null if there is no bound query to run. * @throws IOException */ public PigStats runSingle() throws IOException { return runSingle((Properties)null); } - + /** - * Run a pipeline on Hadoop. - * If there are no stores in this pipeline then nothing will be run. + * Run a pipeline on Hadoop. + * If there are no stores in this pipeline then nothing will be run. * @param prop Map of properties that Pig should set when running the script. * This is intended for use with scripting languages that do not support * the Properties object. @@ -98,14 +98,14 @@ public class BoundScript { if (prop != null) { scriptContext.getPigContext().getProperties().putAll(prop); } - PigStats ret = exec(queries.get(0)); + PigStats ret = exec(queries.get(0)); setPigStats(ret); return ret; } - + /** - * Run a pipeline on Hadoop. - * If there are no stores in this pipeline then nothing will be run. + * Run a pipeline on Hadoop. + * If there are no stores in this pipeline then nothing will be run. * @param propfile File with properties that Pig should set when running the script. * @return {@link PigStats}, null if there is no bound query to run. * @throws IOException @@ -123,17 +123,17 @@ public class BoundScript { } /** - * Run multiple instances of bound pipeline on Hadoop in parallel. - * If there are no stores in this pipeline then nothing will be run. - * Bind is called first with the list of maps of variables to bind. + * Run multiple instances of bound pipeline on Hadoop in parallel. + * If there are no stores in this pipeline then nothing will be run. + * Bind is called first with the list of maps of variables to bind. * @return a list of {@link PigStats}, one for each map of variables passed * to bind. * @throws IOException - */ - public List<PigStats> run() throws IOException { + */ + public List<PigStats> run() throws IOException { return run((Properties)null); } - + /** * Run multiple instances of bound pipeline on Hadoop in parallel. * @param prop Map of properties that Pig should set when running the script. @@ -148,7 +148,7 @@ public class BoundScript { if (queries.isEmpty()) { LOG.info("No bound query to run."); return stats; - } + } if (queries.size() == 1) { PigStats ps = runSingle(); stats.add(ps); @@ -157,20 +157,20 @@ public class BoundScript { if (prop != null) { scriptContext.getPigContext().getProperties().putAll(prop); } - List<PigProgressNotificationListener> listeners + List<PigProgressNotificationListener> listeners = ScriptState.get().getAllListeners(); - SyncProgressNotificationAdaptor adaptor + SyncProgressNotificationAdaptor adaptor = new SyncProgressNotificationAdaptor(listeners); List<Future<PigStats>> futures = new ArrayList<Future<PigStats>>(); ExecutorService executor = Executors.newFixedThreadPool(queries.size()); - for (int i=0; i<queries.size(); i++) { + for (int i=0; i<queries.size(); i++) { Properties props = new Properties(); props.putAll(scriptContext.getPigContext().getProperties()); PigContext ctx = new PigContext(scriptContext.getPigContext().getExecType(), props); MyCallable worker = new MyCallable(queries.get(i), ctx, adaptor); Future<PigStats> submit = executor.submit(worker); futures.add(submit); - } + } for (Future<PigStats> future : futures) { try { stats.add(future.get()); @@ -178,23 +178,23 @@ public class BoundScript { LOG.error("Pig pipeline failed to complete", e); PigStatsUtil.getEmptyPigStats(); PigStatsUtil.setErrorMessage(e.getMessage()); - PigStats failed = PigStatsUtil.getPigStats(ReturnCode.FAILURE); + PigStats failed = PigStatsUtil.getPigStats(ReturnCode.FAILURE); stats.add(failed); } catch (ExecutionException e) { LOG.error("Pig pipeline failed to complete", e); PigStatsUtil.getEmptyPigStats(); - PigStatsUtil.setErrorMessage(e.getMessage()); - PigStats failed = PigStatsUtil.getPigStats(ReturnCode.FAILURE); + PigStatsUtil.setErrorMessage(e.getMessage()); + PigStats failed = PigStatsUtil.getPigStats(ReturnCode.FAILURE); stats.add(failed); } } - + if (!stats.isEmpty()) { setPigStats(stats);; } return stats; } - + /** * Run multiple instances of bound pipeline on Hadoop in parallel. * @param propfile File with properties that Pig should set when running the script. @@ -210,12 +210,12 @@ public class BoundScript { prop.load(fin); } finally { if (fin != null) fin.close(); - } + } return run(prop); } /** - * Run illustrate for this pipeline. Results will be printed to stdout. + * Run illustrate for this pipeline. Results will be printed to stdout. * @throws IOException if illustrate fails. */ public void illustrate() throws IOException { @@ -224,7 +224,7 @@ public class BoundScript { return; } PigServer pigServer = new PigServer(scriptContext.getPigContext(), false); - registerQuery(pigServer, queries.get(0)); + registerQueryForDiagnostics(pigServer, queries.get(0)); pigServer.getExamples(null); } @@ -238,7 +238,7 @@ public class BoundScript { return; } PigServer pigServer = new PigServer(scriptContext.getPigContext(), false); - registerQuery(pigServer, queries.get(0)); + registerQueryForDiagnostics(pigServer, queries.get(0)); pigServer.explain(null, System.out); } @@ -254,11 +254,11 @@ public class BoundScript { return; } PigServer pigServer = new PigServer(scriptContext.getPigContext(), false); - registerQuery(pigServer, queries.get(0)); - pigServer.dumpSchema(alias); + registerQueryForDiagnostics(pigServer, queries.get(0)); + pigServer.dumpSchema(alias); } - //------------------------------------------------------------------------- + //------------------------------------------------------------------------- private PigStats exec(String query) throws IOException { LOG.info("Query to run:\n" + query); @@ -271,30 +271,30 @@ public class BoundScript { ScriptState.get().registerListener(listener); } PigServer pigServer = new PigServer(scriptContext.getPigContext(), false); - pigServer.setBatchOn(); GruntParser grunt = new GruntParser(new StringReader(query), pigServer); grunt.setInteractive(false); try { - grunt.parseStopOnError(true); + grunt.parseStopOnError(false); } catch (ParseException e) { throw new IOException("Failed to parse script " + e.getMessage(), e); } - pigServer.executeBatch(); return PigStats.get(); } - private void registerQuery(PigServer pigServer, String pl) throws IOException { + private void registerQueryForDiagnostics(PigServer pigServer, String pl) throws IOException { GruntParser grunt = new GruntParser(new StringReader(pl), pigServer); grunt.setInteractive(false); + // We want parsing to happen in batch. But no execution as this is for diagnostics pigServer.setBatchOn(); + pigServer.setSkipParseInRegisterForBatch(true); try { grunt.parseStopOnError(true); } catch (ParseException e) { throw new IOException("Failed to parse query: " + pl, e); } } - - private void setPigStats(PigStats stats) { + + private void setPigStats(PigStats stats) { ScriptEngine engine = scriptContext.getScriptEngine(); if (name != null) { engine.setPigStats(name, stats); @@ -304,28 +304,28 @@ public class BoundScript { } private void setPigStats(List<PigStats> lst) { - if (lst == null || lst.isEmpty()) return; + if (lst == null || lst.isEmpty()) return; String key = (name != null) ? name : this.toString(); ScriptEngine engine = scriptContext.getScriptEngine(); for (PigStats stats : lst) { engine.setPigStats(key, stats); - } + } } - + //------------------------------------------------------------------------- - + private class MyCallable implements Callable<PigStats> { - + private String query = null; private PigContext ctx = null; private PigProgressNotificationListener adaptor; - + public MyCallable(String pl, PigContext ctx, PigProgressNotificationListener adaptor) { query = pl; this.ctx = ctx; this.adaptor = adaptor; } - + @Override public PigStats call() throws Exception { LOG.info("Query to run:\n" + query); @@ -335,15 +335,13 @@ public class BoundScript { ScriptState.get().setScript(query); ScriptState.get().registerListener(adaptor); PigServer pigServer = new PigServer(ctx, true); - pigServer.setBatchOn(); GruntParser grunt = new GruntParser(new StringReader(query), pigServer); grunt.setInteractive(false); try { - grunt.parseStopOnError(true); + grunt.parseStopOnError(false); } catch (ParseException e) { throw new IOException("Failed to parse script", e); } - pigServer.executeBatch(); return PigStats.get(); } } Modified: pig/branches/spark/src/org/apache/pig/scripting/groovy/GroovyEvalFunc.java URL: http://svn.apache.org/viewvc/pig/branches/spark/src/org/apache/pig/scripting/groovy/GroovyEvalFunc.java?rev=1733627&r1=1733626&r2=1733627&view=diff ============================================================================== --- pig/branches/spark/src/org/apache/pig/scripting/groovy/GroovyEvalFunc.java (original) +++ pig/branches/spark/src/org/apache/pig/scripting/groovy/GroovyEvalFunc.java Fri Mar 4 18:17:39 2016 @@ -28,6 +28,7 @@ import java.lang.reflect.Constructor; import java.lang.reflect.InvocationTargetException; import java.lang.reflect.Method; import java.lang.reflect.Modifier; +import java.net.URL; import java.util.Map; import java.util.concurrent.ConcurrentHashMap; @@ -65,8 +66,21 @@ public class GroovyEvalFunc<T> extends E Class c = scriptClasses.get(path); if (null == c) { + File file = new File(path); + URL resource = null; + if (!file.exists()) { + resource = ScriptEngine.class.getResource(path); + if (resource == null) { + resource = ScriptEngine.class.getResource(File.separator + path); + } + if (resource == null) { + throw new IOException("Cannot find " + path); + } + } else { + resource = file.toURL(); + } try { - c = GroovyScriptEngine.getEngine().loadScriptByName(new File(path).toURI().toString()); + c = GroovyScriptEngine.getEngine().loadScriptByName(resource.toString()); } catch (ScriptException se) { throw new IOException(se); } catch (ResourceException re) { Modified: pig/branches/spark/src/org/apache/pig/tools/grunt/GruntParser.java URL: http://svn.apache.org/viewvc/pig/branches/spark/src/org/apache/pig/tools/grunt/GruntParser.java?rev=1733627&r1=1733626&r2=1733627&view=diff ============================================================================== --- pig/branches/spark/src/org/apache/pig/tools/grunt/GruntParser.java (original) +++ pig/branches/spark/src/org/apache/pig/tools/grunt/GruntParser.java Fri Mar 4 18:17:39 2016 @@ -66,6 +66,7 @@ import org.apache.pig.impl.io.FileLocali import org.apache.pig.impl.io.FileLocalizer.FetchFileRet; import org.apache.pig.impl.util.LogUtils; import org.apache.pig.impl.util.TupleFormat; +import org.apache.pig.parser.RegisterResolver; import org.apache.pig.tools.pigscript.parser.ParseException; import org.apache.pig.tools.pigscript.parser.PigScriptParser; import org.apache.pig.tools.pigscript.parser.PigScriptParserTokenManager; @@ -461,15 +462,7 @@ public class GruntParser extends PigScri path = parameterSubstitutionInGrunt(path); scriptingLang = parameterSubstitutionInGrunt(scriptingLang); namespace = parameterSubstitutionInGrunt(namespace); - if(path.endsWith(".jar")) { - if(scriptingLang != null || namespace != null) { - throw new ParseException("Cannot register a jar with a scripting language or namespace"); - } - mPigServer.registerJar(path); - } - else { - mPigServer.registerCode(path, scriptingLang, namespace); - } + new RegisterResolver(mPigServer).parseRegister(path, scriptingLang, namespace); } private String runPreprocessor(String scriptPath, List<String> params, List<String> paramFiles) @@ -1227,8 +1220,14 @@ public class GruntParser extends PigScri } else { tokensList.add(hcatBin); } + cmd = cmd.trim(); + if (!cmd.substring(0, 3).toLowerCase().equals("sql")) { + // Should never happen + throw new IOException("sql command not start with sql keyword"); + } + cmd = cmd.substring(3).trim(); tokensList.add("-e"); - tokensList.add(cmd.substring(cmd.indexOf("sql")).substring(4).replaceAll("\n", " ")); + tokensList.add(cmd.replaceAll("\n", " ")); String[] tokens = tokensList.toArray(new String[]{}); // create new environment = environment - HADOOP_CLASSPATH Modified: pig/branches/spark/src/org/apache/pig/tools/pigscript/parser/PigScriptParser.jj URL: http://svn.apache.org/viewvc/pig/branches/spark/src/org/apache/pig/tools/pigscript/parser/PigScriptParser.jj?rev=1733627&r1=1733626&r2=1733627&view=diff ============================================================================== --- pig/branches/spark/src/org/apache/pig/tools/pigscript/parser/PigScriptParser.jj (original) +++ pig/branches/spark/src/org/apache/pig/tools/pigscript/parser/PigScriptParser.jj Fri Mar 4 18:17:39 2016 @@ -321,6 +321,18 @@ TOKEN_MGR_DECLS : { saveState(prevState); prevState = GENERATE; } : SCHEMA_DEFINITION +| <"--"> + { + prevState = getState(prevState); + saveState(prevState); + prevState = GENERATE; + } : SINGLE_LINE_COMMENT +| <"/*"> + { + prevState = getState(prevState); + saveState(prevState); + prevState = GENERATE; + } : MULTI_LINE_COMMENT | <"'"> { prevState = getState(prevState); Modified: pig/branches/spark/src/org/apache/pig/tools/pigstats/OutputStats.java URL: http://svn.apache.org/viewvc/pig/branches/spark/src/org/apache/pig/tools/pigstats/OutputStats.java?rev=1733627&r1=1733626&r2=1733627&view=diff ============================================================================== --- pig/branches/spark/src/org/apache/pig/tools/pigstats/OutputStats.java (original) +++ pig/branches/spark/src/org/apache/pig/tools/pigstats/OutputStats.java Fri Mar 4 18:17:39 2016 @@ -51,20 +51,20 @@ public final class OutputStats { private boolean success; private POStore store = null; - + private Configuration conf; private static final Log LOG = LogFactory.getLog(OutputStats.class); - + public OutputStats(String location, long bytes, long records, boolean success) { this.location = location; this.bytes = bytes; - this.records = records; + this.records = records; this.success = success; try { this.name = new Path(location).getName(); } catch (Exception e) { - // location is a mal formatted URL + // location is a mal formatted URL this.name = location; } } @@ -81,6 +81,18 @@ public final class OutputStats { return bytes; } + public void setBytes(long bytes) { + this.bytes = bytes; + } + + public long getRecords() { + return records; + } + + public void setRecords(long records) { + this.records = records; + } + public long getNumberRecords() { return records; } @@ -94,6 +106,10 @@ public final class OutputStats { return success; } + public void setSuccessful(boolean success) { + this.success = success; + } + public String getAlias() { return (store == null) ? null : store.getAlias(); } @@ -129,11 +145,11 @@ public final class OutputStats { public void setPOStore(POStore store) { this.store = store; } - + public void setConf(Configuration conf) { this.conf = conf; } - + public Iterator<Tuple> iterator() throws IOException { final LoadFunc p; PigContext pigContext = ScriptState.get().getPigContext(); @@ -154,11 +170,12 @@ public final class OutputStats { String msg = "Unable to get results for: " + store.getSFile(); throw new ExecException(msg, errCode, PigException.BUG, e); } - - return new Iterator<Tuple>() { + + return new Iterator<Tuple>() { Tuple t; boolean atEnd; + @Override public boolean hasNext() { if (atEnd) return false; try { @@ -173,6 +190,7 @@ public final class OutputStats { return !atEnd; } + @Override public Tuple next() { Tuple next = t; if (next != null) { @@ -189,10 +207,11 @@ public final class OutputStats { return next; } + @Override public void remove() { throw new RuntimeException("Removal not supported"); } - + }; } } Modified: pig/branches/spark/src/org/apache/pig/tools/pigstats/PigStatsUtil.java URL: http://svn.apache.org/viewvc/pig/branches/spark/src/org/apache/pig/tools/pigstats/PigStatsUtil.java?rev=1733627&r1=1733626&r2=1733627&view=diff ============================================================================== --- pig/branches/spark/src/org/apache/pig/tools/pigstats/PigStatsUtil.java (original) +++ pig/branches/spark/src/org/apache/pig/tools/pigstats/PigStatsUtil.java Fri Mar 4 18:17:39 2016 @@ -44,6 +44,11 @@ public class PigStatsUtil { = "HDFS_BYTES_WRITTEN"; public static final String HDFS_BYTES_READ = "HDFS_BYTES_READ"; + public static final String FILE_BYTES_WRITTEN + = "FILE_BYTES_WRITTEN"; + public static final String FILE_BYTES_READ + = "FILE_BYTES_READ"; + public static final String MULTI_INPUTS_RECORD_COUNTER = "Input records from ";
