Modified: pig/branches/spark/src/org/apache/pig/backend/hadoop/executionengine/spark/running/PigInputFormatSpark.java URL: http://svn.apache.org/viewvc/pig/branches/spark/src/org/apache/pig/backend/hadoop/executionengine/spark/running/PigInputFormatSpark.java?rev=1791060&r1=1791059&r2=1791060&view=diff ============================================================================== --- pig/branches/spark/src/org/apache/pig/backend/hadoop/executionengine/spark/running/PigInputFormatSpark.java (original) +++ pig/branches/spark/src/org/apache/pig/backend/hadoop/executionengine/spark/running/PigInputFormatSpark.java Wed Apr 12 02:20:20 2017 @@ -42,28 +42,28 @@ import org.apache.pig.tools.pigstats.Pig public class PigInputFormatSpark extends PigInputFormat { - @Override - public RecordReader<Text, Tuple> createRecordReader(InputSplit split, - TaskAttemptContext context) throws IOException, - InterruptedException { - initLogger(); - resetUDFContext(); - //PigSplit#conf is the default hadoop configuration, we need get the configuration - //from context.getConfigration() to retrieve pig properties - PigSplit pigSplit = (PigSplit) split; - Configuration conf = context.getConfiguration(); - pigSplit.setConf(conf); - //Set current splitIndex in PigMapReduce.sJobContext.getConfiguration.get(PigImplConstants.PIG_SPLIT_INDEX) - //which will be used in POMergeCogroup#setup - if (PigMapReduce.sJobContext == null) { - PigMapReduce.sJobContext = HadoopShims.createJobContext(conf, new JobID()); - } - PigMapReduce.sJobContext.getConfiguration().setInt(PigImplConstants.PIG_SPLIT_INDEX, pigSplit.getSplitIndex()); - // Here JobConf is first available in spark Executor thread, we initialize PigContext,UDFContext and - // SchemaTupleBackend by reading properties from JobConf - initialize(conf); - return super.createRecordReader(split, context); +@Override +public RecordReader<Text, Tuple> createRecordReader(InputSplit split, + TaskAttemptContext context) throws IOException, + InterruptedException { + initLogger(); + resetUDFContext(); + //PigSplit#conf is the default hadoop configuration, we need get the configuration + //from context.getConfigration() to retrieve pig properties + PigSplit pigSplit = (PigSplit) split; + Configuration conf = context.getConfiguration(); + pigSplit.setConf(conf); + //Set current splitIndex in PigMapReduce.sJobContext.getConfiguration.get(PigImplConstants.PIG_SPLIT_INDEX) + //which will be used in POMergeCogroup#setup + if (PigMapReduce.sJobContext == null) { + PigMapReduce.sJobContext = HadoopShims.createJobContext(conf, new JobID()); } + PigMapReduce.sJobContext.getConfiguration().setInt(PigImplConstants.PIG_SPLIT_INDEX, pigSplit.getSplitIndex()); + // Here JobConf is first available in spark Executor thread, we initialize PigContext,UDFContext and + // SchemaTupleBackend by reading properties from JobConf + initialize(conf); + return super.createRecordReader(split, context); +} private void initialize(Configuration jobConf) throws IOException { MapRedUtil.setupUDFContext(jobConf); @@ -73,12 +73,12 @@ public class PigInputFormatSpark extends } private void resetUDFContext() { - UDFContext.getUDFContext().reset(); - } + UDFContext.getUDFContext().reset(); + } - private void initLogger() { - PigHadoopLogger pigHadoopLogger = PigHadoopLogger.getInstance(); - pigHadoopLogger.setReporter(PigStatusReporter.getInstance()); - PhysicalOperator.setPigLogger(pigHadoopLogger); - } -} \ No newline at end of file + private void initLogger() { + PigHadoopLogger pigHadoopLogger = PigHadoopLogger.getInstance(); + pigHadoopLogger.setReporter(PigStatusReporter.getInstance()); + PhysicalOperator.setPigLogger(pigHadoopLogger); + } +}
Modified: pig/branches/spark/src/org/apache/pig/backend/hadoop/executionengine/tez/plan/TezPlanContainer.java URL: http://svn.apache.org/viewvc/pig/branches/spark/src/org/apache/pig/backend/hadoop/executionengine/tez/plan/TezPlanContainer.java?rev=1791060&r1=1791059&r2=1791060&view=diff ============================================================================== --- pig/branches/spark/src/org/apache/pig/backend/hadoop/executionengine/tez/plan/TezPlanContainer.java (original) +++ pig/branches/spark/src/org/apache/pig/backend/hadoop/executionengine/tez/plan/TezPlanContainer.java Wed Apr 12 02:20:20 2017 @@ -291,7 +291,7 @@ public class TezPlanContainer extends Op Set<TezOperator> splitters2 = new HashSet<>(); Set<TezOperator> processedPredecessors = new HashSet<>(); // Find predecessors which are splitters - fetchSplitterPredecessors(plan, operToSegment, processedPredecessors, splitters1); + fetchSplitterPredecessors(plan, operToSegment, processedPredecessors, splitters1, false); if (!splitters1.isEmpty()) { // For the successor, traverse rest of the plan below it and // search the predecessors of its successors to find any predecessor that might be a splitter. @@ -300,7 +300,7 @@ public class TezPlanContainer extends Op processedPredecessors.clear(); processedPredecessors.add(successor); for (TezOperator succ : allSuccs) { - fetchSplitterPredecessors(plan, succ, processedPredecessors, splitters2); + fetchSplitterPredecessors(plan, succ, processedPredecessors, splitters2, true); } // Find the common ones splitters1.retainAll(splitters2); @@ -309,7 +309,7 @@ public class TezPlanContainer extends Op } private void fetchSplitterPredecessors(TezOperPlan plan, TezOperator tezOp, - Set<TezOperator> processedPredecessors, Set<TezOperator> splitters) { + Set<TezOperator> processedPredecessors, Set<TezOperator> splitters, boolean stopAtSplit) { List<TezOperator> predecessors = plan.getPredecessors(tezOp); if (predecessors != null) { for (TezOperator pred : predecessors) { @@ -319,9 +319,13 @@ public class TezPlanContainer extends Op } if (pred.isSplitter()) { splitters.add(pred); + if (!stopAtSplit) { + processedPredecessors.add(pred); + fetchSplitterPredecessors(plan, pred, processedPredecessors, splitters, stopAtSplit); + } } else if (!pred.needSegmentBelow()) { processedPredecessors.add(pred); - fetchSplitterPredecessors(plan, pred, processedPredecessors, splitters); + fetchSplitterPredecessors(plan, pred, processedPredecessors, splitters, stopAtSplit); } } } Modified: pig/branches/spark/src/org/apache/pig/backend/hadoop/executionengine/tez/plan/optimizer/AccumulatorOptimizer.java URL: http://svn.apache.org/viewvc/pig/branches/spark/src/org/apache/pig/backend/hadoop/executionengine/tez/plan/optimizer/AccumulatorOptimizer.java?rev=1791060&r1=1791059&r2=1791060&view=diff ============================================================================== --- pig/branches/spark/src/org/apache/pig/backend/hadoop/executionengine/tez/plan/optimizer/AccumulatorOptimizer.java (original) +++ pig/branches/spark/src/org/apache/pig/backend/hadoop/executionengine/tez/plan/optimizer/AccumulatorOptimizer.java Wed Apr 12 02:20:20 2017 @@ -37,6 +37,6 @@ public class AccumulatorOptimizer extend @Override public void visitTezOp(TezOperator tezOp) throws VisitorException { - AccumulatorOptimizerUtil.addAccumulator(tezOp.plan); + AccumulatorOptimizerUtil.addAccumulator(tezOp.plan, tezOp.plan.getRoots()); } } Modified: pig/branches/spark/src/org/apache/pig/backend/hadoop/executionengine/tez/plan/optimizer/SecondaryKeyOptimizerTez.java URL: http://svn.apache.org/viewvc/pig/branches/spark/src/org/apache/pig/backend/hadoop/executionengine/tez/plan/optimizer/SecondaryKeyOptimizerTez.java?rev=1791060&r1=1791059&r2=1791060&view=diff ============================================================================== --- pig/branches/spark/src/org/apache/pig/backend/hadoop/executionengine/tez/plan/optimizer/SecondaryKeyOptimizerTez.java (original) +++ pig/branches/spark/src/org/apache/pig/backend/hadoop/executionengine/tez/plan/optimizer/SecondaryKeyOptimizerTez.java Wed Apr 12 02:20:20 2017 @@ -102,7 +102,8 @@ public class SecondaryKeyOptimizerTez ex rearrangePlan = PlanHelper.getLocalRearrangePlanFromSplit(from.plan, connectingLR.getOperatorKey()); } - SecondaryKeyOptimizerInfo info = SecondaryKeyOptimizerUtil.applySecondaryKeySort(rearrangePlan, to.plan); + SecondaryKeyOptimizerUtil secondaryKeyOptUtil = new SecondaryKeyOptimizerUtil(); + SecondaryKeyOptimizerInfo info = secondaryKeyOptUtil.applySecondaryKeySort(rearrangePlan, to.plan); if (info != null) { numSortRemoved += info.getNumSortRemoved(); numDistinctChanged += info.getNumDistinctChanged(); Modified: pig/branches/spark/src/org/apache/pig/backend/hadoop/executionengine/util/AccumulatorOptimizerUtil.java URL: http://svn.apache.org/viewvc/pig/branches/spark/src/org/apache/pig/backend/hadoop/executionengine/util/AccumulatorOptimizerUtil.java?rev=1791060&r1=1791059&r2=1791060&view=diff ============================================================================== --- pig/branches/spark/src/org/apache/pig/backend/hadoop/executionengine/util/AccumulatorOptimizerUtil.java (original) +++ pig/branches/spark/src/org/apache/pig/backend/hadoop/executionengine/util/AccumulatorOptimizerUtil.java Wed Apr 12 02:20:20 2017 @@ -61,9 +61,8 @@ public class AccumulatorOptimizerUtil { return batchSize; } - public static void addAccumulator(PhysicalPlan plan) { + public static void addAccumulator(PhysicalPlan plan, List<PhysicalOperator> pos) { // See if this is a map-reduce job - List<PhysicalOperator> pos = plan.getRoots(); if (pos == null || pos.size() == 0) { return; } @@ -290,91 +289,4 @@ public class AccumulatorOptimizerUtil { return false; } - - public static void addAccumulatorSpark(PhysicalPlan plan) throws - VisitorException { - List<PhysicalOperator> pos = plan.getRoots(); - if (pos == null || pos.size() == 0) { - return; - } - - List<POGlobalRearrangeSpark> gras = PlanHelper.getPhysicalOperators(plan, - POGlobalRearrangeSpark.class); - - for (POGlobalRearrange gra : gras) { - addAccumulatorSparkForGRASubDAG(plan, gra); - } - } - - - private static void addAccumulatorSparkForGRASubDAG(PhysicalPlan plan, - POGlobalRearrange gra) throws VisitorException { - - List<PhysicalOperator> poPackages = plan.getSuccessors(gra); - - if (poPackages == null || poPackages.size() == 0) { - return; - } - // See if this is a POPackage - PhysicalOperator po_package = poPackages.get(0); - if (!po_package.getClass().equals(POPackage.class)) { - return; - } - - Packager pkgr = ((POPackage) po_package).getPkgr(); - // Check that this is a standard package, not a subclass - if (!pkgr.getClass().equals(Packager.class)) { - return; - } - - // if POPackage is for distinct, just return - if (pkgr.isDistinct()) { - return; - } - - // if any input to POPackage is inner, just return - boolean[] isInner = pkgr.getInner(); - for (boolean b : isInner) { - if (b) { - return; - } - } - - List<PhysicalOperator> l = plan.getSuccessors(po_package); - // there should be only one POForEach - if (l == null || l.size() == 0 || l.size() > 1) { - return; - } - - PhysicalOperator po_foreach = l.get(0); - if (!(po_foreach instanceof POForEach)) { - return; - } - - boolean foundUDF = false; - List<PhysicalPlan> list = ((POForEach) po_foreach).getInputPlans(); - for (PhysicalPlan p : list) { - PhysicalOperator po = p.getLeaves().get(0); - - // only expression operators are allowed - if (!(po instanceof ExpressionOperator)) { - return; - } - - if (((ExpressionOperator) po).containUDF()) { - foundUDF = true; - } - - if (!check(po)) { - return; - } - } - - if (foundUDF) { - // if all tests are passed, reducer can run in accumulative mode - LOG.info("Reducer is to run in accumulative mode."); - po_package.setAccumulative(); - po_foreach.setAccumulative(); - } - } } \ No newline at end of file Modified: pig/branches/spark/src/org/apache/pig/backend/hadoop/executionengine/util/SecondaryKeyOptimizerUtil.java URL: http://svn.apache.org/viewvc/pig/branches/spark/src/org/apache/pig/backend/hadoop/executionengine/util/SecondaryKeyOptimizerUtil.java?rev=1791060&r1=1791059&r2=1791060&view=diff ============================================================================== --- pig/branches/spark/src/org/apache/pig/backend/hadoop/executionengine/util/SecondaryKeyOptimizerUtil.java (original) +++ pig/branches/spark/src/org/apache/pig/backend/hadoop/executionengine/util/SecondaryKeyOptimizerUtil.java Wed Apr 12 02:20:20 2017 @@ -53,12 +53,12 @@ import org.apache.pig.impl.plan.Operator import org.apache.pig.impl.plan.PlanException; import org.apache.pig.impl.plan.VisitorException; [email protected] [email protected] public class SecondaryKeyOptimizerUtil { private static Log log = LogFactory.getLog(SecondaryKeyOptimizerUtil.class.getName()); private static boolean isSparkMode; - private SecondaryKeyOptimizerUtil() { + public SecondaryKeyOptimizerUtil() { } @@ -186,7 +186,7 @@ public class SecondaryKeyOptimizerUtil { return result; } - public static SecondaryKeyOptimizerInfo applySecondaryKeySort(PhysicalPlan mapPlan, PhysicalPlan reducePlan) throws VisitorException { + public SecondaryKeyOptimizerInfo applySecondaryKeySort(PhysicalPlan mapPlan, PhysicalPlan reducePlan) throws VisitorException { log.trace("Entering SecondaryKeyOptimizerUtil.addSecondaryKeySort"); SecondaryKeyOptimizerInfo secKeyOptimizerInfo = new SecondaryKeyOptimizerInfo(); List<SortKeyInfo> sortKeyInfos = new ArrayList<SortKeyInfo>(); @@ -245,29 +245,7 @@ public class SecondaryKeyOptimizerUtil { } PhysicalOperator root = reduceRoots.get(0); - PhysicalOperator currentNode = null; - if (!isSparkMode) { - if (!(root instanceof POPackage)) { - log.debug("Expected reduce root to be a POPackage, skip secondary key optimizing"); - return null; - } else { - currentNode = root; - } - } else { - if (!(root instanceof POGlobalRearrange)) { - log.debug("Expected reduce root to be a POGlobalRearrange, skip secondary key optimizing"); - return null; - } else { - List<PhysicalOperator> globalRearrangeSuccs = reducePlan - .getSuccessors(root); - if (globalRearrangeSuccs.size() == 1) { - currentNode = globalRearrangeSuccs.get(0); - } else { - log.debug("Expected successor of a POGlobalRearrange is POPackage, skip secondary key optimizing"); - return null; - } - } - } + PhysicalOperator currentNode = getCurrentNode(root,reducePlan); // visit the POForEach of the reduce plan. We can have Limit and Filter // in the middle @@ -442,6 +420,16 @@ public class SecondaryKeyOptimizerUtil { return secKeyOptimizerInfo; } + protected PhysicalOperator getCurrentNode(PhysicalOperator root, PhysicalPlan reducePlan) { + PhysicalOperator currentNode = null; + if (!(root instanceof POPackage)) { + log.debug("Expected reduce root to be a POPackage, skip secondary key optimizing"); + } else { + currentNode = root; + } + return currentNode; + } + private static void setSecondaryPlan(PhysicalPlan plan, POLocalRearrange rearrange, SortKeyInfo secondarySortKeyInfo) throws VisitorException { // Put plan to project secondary key to the POLocalRearrange Modified: pig/branches/spark/src/org/apache/pig/backend/hadoop/hbase/HBaseStorage.java URL: http://svn.apache.org/viewvc/pig/branches/spark/src/org/apache/pig/backend/hadoop/hbase/HBaseStorage.java?rev=1791060&r1=1791059&r2=1791060&view=diff ============================================================================== --- pig/branches/spark/src/org/apache/pig/backend/hadoop/hbase/HBaseStorage.java (original) +++ pig/branches/spark/src/org/apache/pig/backend/hadoop/hbase/HBaseStorage.java Wed Apr 12 02:20:20 2017 @@ -308,7 +308,7 @@ public class HBaseStorage extends LoadFu //so we need check whether UDFContext.getUDFContext().getClientSystemProps() //is null or not, if is null, defaultCaster =STRING_CASTER, otherwise is //UDFContext.getUDFContext().getClientSystemProps().getProperty(CASTER_PROPERTY, STRING_CASTER) - //Detail see PIG-4611 + //Detail see PIG-4920 String defaultCaster = UDFContext.getUDFContext().getClientSystemProps() != null ? UDFContext.getUDFContext().getClientSystemProps().getProperty(CASTER_PROPERTY, STRING_CASTER) : STRING_CASTER; String casterOption = configuredOptions_.getOptionValue("caster", defaultCaster); if (STRING_CASTER.equalsIgnoreCase(casterOption)) { Modified: pig/branches/spark/src/org/apache/pig/data/SelfSpillBag.java URL: http://svn.apache.org/viewvc/pig/branches/spark/src/org/apache/pig/data/SelfSpillBag.java?rev=1791060&r1=1791059&r2=1791060&view=diff ============================================================================== --- pig/branches/spark/src/org/apache/pig/data/SelfSpillBag.java (original) +++ pig/branches/spark/src/org/apache/pig/data/SelfSpillBag.java Wed Apr 12 02:20:20 2017 @@ -22,6 +22,8 @@ import org.apache.pig.backend.hadoop.exe import org.apache.pig.classification.InterfaceAudience; import org.apache.pig.classification.InterfaceStability; +import java.io.Serializable; + /** * Class to hold code common to self spilling bags such as InternalCachedBag */ @@ -30,8 +32,7 @@ import org.apache.pig.classification.Int public abstract class SelfSpillBag extends DefaultAbstractBag { private static final long serialVersionUID = 1L; // SelfSpillBag$MemoryLimits is not serializable - //in spark mode, if we don't set memLimit transient, it will throw NotSerializableExecption(See PIG-4611) - protected transient MemoryLimits memLimit; + protected MemoryLimits memLimit; public SelfSpillBag(int bagCount) { memLimit = new MemoryLimits(bagCount, -1); @@ -49,10 +50,11 @@ public abstract class SelfSpillBag exten * The number of objects that will fit into this memory limit is computed * using the average memory size of the objects whose size is given to this * class. + * In spark mode, MemoryLimits needs implement Serializable interface otherwise NotSerializableExecption will be thrown (See PIG-4611) */ @InterfaceAudience.Private @InterfaceStability.Evolving - public static class MemoryLimits { + public static class MemoryLimits implements Serializable { private long maxMemUsage; private long cacheLimit = Integer.MAX_VALUE; Modified: pig/branches/spark/src/org/apache/pig/impl/util/UDFContext.java URL: http://svn.apache.org/viewvc/pig/branches/spark/src/org/apache/pig/impl/util/UDFContext.java?rev=1791060&r1=1791059&r2=1791060&view=diff ============================================================================== --- pig/branches/spark/src/org/apache/pig/impl/util/UDFContext.java (original) +++ pig/branches/spark/src/org/apache/pig/impl/util/UDFContext.java Wed Apr 12 02:20:20 2017 @@ -23,11 +23,14 @@ import java.util.Arrays; import java.util.HashMap; import java.util.Properties; +import org.apache.commons.logging.Log; +import org.apache.commons.logging.LogFactory; import org.apache.hadoop.conf.Configuration; import org.apache.pig.backend.hadoop.executionengine.mapReduceLayer.MRConfiguration; public class UDFContext { + private static final Log LOG = LogFactory.getLog(UDFContext.class); private Configuration jconf = null; private HashMap<UDFContextKey, Properties> udfConfs; private Properties clientSysProps; @@ -204,6 +207,17 @@ public class UDFContext { conf.set(CLIENT_SYS_PROPS, ObjectSerializer.serialize(clientSysProps)); } + /* + called by SparkEngineConf#writeObject + */ + public String serialize() { + try { + return ObjectSerializer.serialize(udfConfs); + } catch (IOException e) { + LOG.error("UDFContext#serialize throws error ",e); + return null; + } + } /** * Populate the udfConfs field. This function is intended to Modified: pig/branches/spark/src/org/apache/pig/newplan/logical/relational/LOStore.java URL: http://svn.apache.org/viewvc/pig/branches/spark/src/org/apache/pig/newplan/logical/relational/LOStore.java?rev=1791060&r1=1791059&r2=1791060&view=diff ============================================================================== --- pig/branches/spark/src/org/apache/pig/newplan/logical/relational/LOStore.java (original) +++ pig/branches/spark/src/org/apache/pig/newplan/logical/relational/LOStore.java Wed Apr 12 02:20:20 2017 @@ -36,6 +36,7 @@ public class LOStore extends LogicalRela private boolean isTmpStore; private SortInfo sortInfo; private final StoreFuncInterface storeFunc; + private boolean disambiguationEnabled = true; public LOStore(LogicalPlan plan, FileSpec outputFileSpec, StoreFuncInterface storeFunc, String signature) { super("LOStore", plan); @@ -43,6 +44,12 @@ public class LOStore extends LogicalRela this.storeFunc = storeFunc; this.signature = signature; } + + public LOStore(LogicalPlan plan, FileSpec outputFileSpec, StoreFuncInterface storeFunc, String signature, + boolean disambiguationEnabled) { + this(plan, outputFileSpec, storeFunc, signature); + this.disambiguationEnabled = disambiguationEnabled; + } public FileSpec getOutputSpec() { return output; @@ -55,6 +62,17 @@ public class LOStore extends LogicalRela @Override public LogicalSchema getSchema() throws FrontendException { schema = ((LogicalRelationalOperator)plan.getPredecessors(this).get(0)).getSchema(); + + if (!disambiguationEnabled && schema != null && schema.getFields() != null) { + //If requested try and remove parent alias substring including colon(s) + for (LogicalSchema.LogicalFieldSchema field : schema.getFields()) { + if (field.alias == null || !field.alias.contains(":")) { + continue; + } + field.alias = field.alias.substring(field.alias.lastIndexOf(":") + 1); + } + } + return schema; } Modified: pig/branches/spark/src/org/apache/pig/newplan/logical/visitor/ScalarVisitor.java URL: http://svn.apache.org/viewvc/pig/branches/spark/src/org/apache/pig/newplan/logical/visitor/ScalarVisitor.java?rev=1791060&r1=1791059&r2=1791060&view=diff ============================================================================== --- pig/branches/spark/src/org/apache/pig/newplan/logical/visitor/ScalarVisitor.java (original) +++ pig/branches/spark/src/org/apache/pig/newplan/logical/visitor/ScalarVisitor.java Wed Apr 12 02:20:20 2017 @@ -22,6 +22,7 @@ import java.io.IOException; import java.util.List; import org.apache.pig.FuncSpec; +import org.apache.pig.PigConfiguration; import org.apache.pig.StoreFuncInterface; import org.apache.pig.impl.PigContext; import org.apache.pig.impl.io.FileLocalizer; @@ -94,7 +95,10 @@ public class ScalarVisitor extends AllEx StoreFuncInterface stoFunc = (StoreFuncInterface)PigContext.instantiateFuncFromSpec(interStorageFuncSpec); String sig = LogicalPlanBuilder.newOperatorKey(scope); stoFunc.setStoreFuncUDFContextSignature(sig); - store = new LOStore(lp, fileSpec, stoFunc, sig); + boolean disambiguationEnabled = Boolean.parseBoolean(pigContext.getProperties(). + getProperty(PigConfiguration.PIG_STORE_SCHEMA_DISAMBIGUATE,PigConfiguration.PIG_STORE_SCHEMA_DISAMBIGUATE_DEFAULT)); + + store = new LOStore(lp, fileSpec, stoFunc, sig, disambiguationEnabled); store.setTmpStore(true); lp.add( store ); lp.connect( refOp, store ); Modified: pig/branches/spark/src/org/apache/pig/parser/LogicalPlanBuilder.java URL: http://svn.apache.org/viewvc/pig/branches/spark/src/org/apache/pig/parser/LogicalPlanBuilder.java?rev=1791060&r1=1791059&r2=1791060&view=diff ============================================================================== --- pig/branches/spark/src/org/apache/pig/parser/LogicalPlanBuilder.java (original) +++ pig/branches/spark/src/org/apache/pig/parser/LogicalPlanBuilder.java Wed Apr 12 02:20:20 2017 @@ -1003,8 +1003,10 @@ public class LogicalPlanBuilder { fileNameMap.put(fileNameKey, absolutePath); } FileSpec fileSpec = new FileSpec(absolutePath, funcSpec); + boolean disambiguationEnabled = Boolean.parseBoolean(pigContext.getProperties(). + getProperty(PigConfiguration.PIG_STORE_SCHEMA_DISAMBIGUATE,PigConfiguration.PIG_STORE_SCHEMA_DISAMBIGUATE_DEFAULT)); - LOStore op = new LOStore(plan, fileSpec, stoFunc, signature); + LOStore op = new LOStore(plan, fileSpec, stoFunc, signature, disambiguationEnabled); return buildOp(loc, op, alias, inputAlias, null); } catch(Exception ex) { throw new ParserValidationException(intStream, loc, ex); Modified: pig/branches/spark/src/org/apache/pig/parser/QueryParserUtils.java URL: http://svn.apache.org/viewvc/pig/branches/spark/src/org/apache/pig/parser/QueryParserUtils.java?rev=1791060&r1=1791059&r2=1791060&view=diff ============================================================================== --- pig/branches/spark/src/org/apache/pig/parser/QueryParserUtils.java (original) +++ pig/branches/spark/src/org/apache/pig/parser/QueryParserUtils.java Wed Apr 12 02:20:20 2017 @@ -70,8 +70,10 @@ public class QueryParserUtils { fileName = removeQuotes( fileName ); FileSpec fileSpec = new FileSpec( fileName, funcSpec ); String sig = alias + "_" + LogicalPlanBuilder.newOperatorKey(scope); + boolean disambiguationEnabled = Boolean.parseBoolean(pigContext.getProperties(). + getProperty(PigConfiguration.PIG_STORE_SCHEMA_DISAMBIGUATE,PigConfiguration.PIG_STORE_SCHEMA_DISAMBIGUATE_DEFAULT)); stoFunc.setStoreFuncUDFContextSignature(sig); - LOStore store = new LOStore(lp, fileSpec, stoFunc, sig); + LOStore store = new LOStore(lp, fileSpec, stoFunc, sig, disambiguationEnabled); store.setAlias(alias); try { Modified: pig/branches/spark/src/org/apache/pig/tools/grunt/ConsoleReaderInputStream.java URL: http://svn.apache.org/viewvc/pig/branches/spark/src/org/apache/pig/tools/grunt/ConsoleReaderInputStream.java?rev=1791060&r1=1791059&r2=1791060&view=diff ============================================================================== --- pig/branches/spark/src/org/apache/pig/tools/grunt/ConsoleReaderInputStream.java (original) +++ pig/branches/spark/src/org/apache/pig/tools/grunt/ConsoleReaderInputStream.java Wed Apr 12 02:20:20 2017 @@ -23,6 +23,7 @@ import java.io.SequenceInputStream; import java.util.Enumeration; import jline.console.ConsoleReader; +import jline.console.history.FileHistory; /** Borrowed from jline.console.internal.ConsoleReaderInputStream. However, * we cannot use ConsoleReaderInputStream directly since: @@ -104,6 +105,9 @@ public class ConsoleReaderInputStream ex if (buffer == null) { buffer = reader.readLine().getBytes(); + + //Write current grunt buffer to pig history file + ((FileHistory)reader.getHistory()).flush(); } if (buffer == null) { Modified: pig/branches/spark/src/org/apache/pig/tools/pigstats/spark/SparkJobStats.java URL: http://svn.apache.org/viewvc/pig/branches/spark/src/org/apache/pig/tools/pigstats/spark/SparkJobStats.java?rev=1791060&r1=1791059&r2=1791060&view=diff ============================================================================== --- pig/branches/spark/src/org/apache/pig/tools/pigstats/spark/SparkJobStats.java (original) +++ pig/branches/spark/src/org/apache/pig/tools/pigstats/spark/SparkJobStats.java Wed Apr 12 02:20:20 2017 @@ -21,6 +21,7 @@ package org.apache.pig.tools.pigstats.sp import java.util.List; import java.util.Map; +import org.apache.pig.tools.pigstats.*; import scala.Option; import org.apache.hadoop.conf.Configuration; @@ -31,11 +32,6 @@ import org.apache.pig.backend.hadoop.exe import org.apache.pig.backend.hadoop.executionengine.spark.plan.SparkOperator; import org.apache.pig.impl.logicalLayer.FrontendException; import org.apache.pig.newplan.PlanVisitor; -import org.apache.pig.tools.pigstats.InputStats; -import org.apache.pig.tools.pigstats.JobStats; -import org.apache.pig.tools.pigstats.OutputStats; -import org.apache.pig.tools.pigstats.PigStats; -import org.apache.pig.tools.pigstats.mapreduce.MRPigStatsUtil; import org.apache.spark.executor.ShuffleReadMetrics; import org.apache.spark.executor.ShuffleWriteMetrics; import org.apache.spark.executor.TaskMetrics; @@ -72,7 +68,7 @@ public class SparkJobStats extends JobSt long bytes = getOutputSize(poStore, conf); long recordsCount = -1; if (disableCounter == false) { - recordsCount = SparkStatsUtil.getStoreSparkCounterValue(poStore); + recordsCount = SparkStatsUtil.getRecordCount(poStore); } OutputStats outputStats = new OutputStats(poStore.getSFile().getFileName(), bytes, recordsCount, success); @@ -88,7 +84,7 @@ public class SparkJobStats extends JobSt long recordsCount = -1; if (disableCounter == false) { - recordsCount = SparkStatsUtil.getLoadSparkCounterValue(po); + recordsCount = SparkStatsUtil.getRecordCount(po); } long bytesRead = -1; if (singleInput && stats.get("BytesRead") != null) { @@ -190,13 +186,13 @@ public class SparkJobStats extends JobSt if (inputMetricExist) { results.put("BytesRead", bytesRead); hdfsBytesRead = bytesRead; - counters.incrCounter(FS_COUNTER_GROUP, MRPigStatsUtil.HDFS_BYTES_READ, hdfsBytesRead); + counters.incrCounter(FS_COUNTER_GROUP, PigStatsUtil.HDFS_BYTES_READ, hdfsBytesRead); } if (outputMetricExist) { results.put("BytesWritten", bytesWritten); hdfsBytesWritten = bytesWritten; - counters.incrCounter(FS_COUNTER_GROUP, MRPigStatsUtil.HDFS_BYTES_WRITTEN, hdfsBytesWritten); + counters.incrCounter(FS_COUNTER_GROUP, PigStatsUtil.HDFS_BYTES_WRITTEN, hdfsBytesWritten); } if (shuffleReadMetricExist) { @@ -331,7 +327,7 @@ public class SparkJobStats extends JobSt private void initializeHadoopCounter() { counters = new Counters(); Counters.Group fsGrp = counters.addGroup(FS_COUNTER_GROUP, FS_COUNTER_GROUP); - fsGrp.addCounter(MRPigStatsUtil.HDFS_BYTES_READ, MRPigStatsUtil.HDFS_BYTES_READ, 0); - fsGrp.addCounter(MRPigStatsUtil.HDFS_BYTES_WRITTEN, MRPigStatsUtil.HDFS_BYTES_WRITTEN, 0); + fsGrp.addCounter(PigStatsUtil.HDFS_BYTES_READ, PigStatsUtil.HDFS_BYTES_READ, 0); + fsGrp.addCounter(PigStatsUtil.HDFS_BYTES_WRITTEN, PigStatsUtil.HDFS_BYTES_WRITTEN, 0); } } Modified: pig/branches/spark/src/org/apache/pig/tools/pigstats/spark/SparkPigStats.java URL: http://svn.apache.org/viewvc/pig/branches/spark/src/org/apache/pig/tools/pigstats/spark/SparkPigStats.java?rev=1791060&r1=1791059&r2=1791060&view=diff ============================================================================== --- pig/branches/spark/src/org/apache/pig/tools/pigstats/spark/SparkPigStats.java (original) +++ pig/branches/spark/src/org/apache/pig/tools/pigstats/spark/SparkPigStats.java Wed Apr 12 02:20:20 2017 @@ -93,9 +93,7 @@ public class SparkPigStats extends PigSt addInputInfoForSparkOper(sparkOperator, jobStats, isSuccess, jobMetricsListener, conf); jobSparkOperatorMap.put(jobStats, sparkOperator); jobPlan.add(jobStats); - if (e != null) { - jobStats.setBackendException(e); - } + jobStats.setBackendException(e); } public void addNativeJobStats(NativeSparkOperator sparkOperator, String jobId, boolean isSuccess, Exception e) { @@ -103,9 +101,7 @@ public class SparkPigStats extends PigSt jobStats.setSuccessful(isSuccess); jobSparkOperatorMap.put(jobStats, sparkOperator); jobPlan.add(jobStats); - if (e != null) { - jobStats.setBackendException(e); - } + jobStats.setBackendException(e); } public void finish() { Modified: pig/branches/spark/src/org/apache/pig/tools/pigstats/spark/SparkStatsUtil.java URL: http://svn.apache.org/viewvc/pig/branches/spark/src/org/apache/pig/tools/pigstats/spark/SparkStatsUtil.java?rev=1791060&r1=1791059&r2=1791060&view=diff ============================================================================== --- pig/branches/spark/src/org/apache/pig/tools/pigstats/spark/SparkStatsUtil.java (original) +++ pig/branches/spark/src/org/apache/pig/tools/pigstats/spark/SparkStatsUtil.java Wed Apr 12 02:20:20 2017 @@ -37,10 +37,10 @@ import org.apache.spark.api.java.JavaSpa public class SparkStatsUtil { - public static final String SPARK_STORE_COUNTER_GROUP = "Spark Store Counters"; - public static final String SPARK_STORE_RECORD_COUNTER = "Output records in "; - public static final String SPARK_INPUT_COUNTER_GROUP = "Spark Input Counters"; - public static final String SPARK_INPUT_RECORD_COUNTER = "Input records from "; + public static final String SPARK_STORE_COUNTER_GROUP = PigStatsUtil.MULTI_STORE_COUNTER_GROUP; + public static final String SPARK_STORE_RECORD_COUNTER = PigStatsUtil.MULTI_STORE_RECORD_COUNTER; + public static final String SPARK_INPUT_COUNTER_GROUP = PigStatsUtil.MULTI_INPUTS_COUNTER_GROUP; + public static final String SPARK_INPUT_RECORD_COUNTER = PigStatsUtil.MULTI_INPUTS_RECORD_COUNTER; public static void waitForJobAddStats(int jobID, POStore poStore, SparkOperator sparkOperator, @@ -71,7 +71,7 @@ public class SparkStatsUtil { sparkContext, e); } - public static String getStoreSparkCounterName(POStore store) { + public static String getCounterName(POStore store) { String shortName = PigStatsUtil.getShortName(store.getSFile().getFileName()); StringBuffer sb = new StringBuffer(SPARK_STORE_RECORD_COUNTER); @@ -84,7 +84,7 @@ public class SparkStatsUtil { return sb.toString(); } - public static String getLoadSparkCounterName(POLoad load) { + public static String getCounterName(POLoad load) { String shortName = PigStatsUtil.getShortName(load.getLFile().getFileName()); StringBuffer sb = new StringBuffer(SPARK_INPUT_RECORD_COUNTER); @@ -95,15 +95,15 @@ public class SparkStatsUtil { return sb.toString(); } - public static long getStoreSparkCounterValue(POStore store) { + public static long getRecordCount(POStore store) { SparkPigStatusReporter reporter = SparkPigStatusReporter.getInstance(); - return reporter.getCounters().getValue(SPARK_STORE_COUNTER_GROUP, getStoreSparkCounterName(store)); + return reporter.getCounters().getValue(SPARK_STORE_COUNTER_GROUP, getCounterName(store)); } - public static long getLoadSparkCounterValue(POLoad load) { + public static long getRecordCount(POLoad load) { SparkPigStatusReporter reporter = SparkPigStatusReporter.getInstance(); int loadersCount = countCoLoadsIfInSplit(load,load.getParentPlan()); - return reporter.getCounters().getValue(SPARK_INPUT_COUNTER_GROUP, getLoadSparkCounterName(load))/loadersCount; + return reporter.getCounters().getValue(SPARK_INPUT_COUNTER_GROUP, getCounterName(load))/loadersCount; } private static int countCoLoadsIfInSplit(PhysicalOperator op, PhysicalPlan pp){ Modified: pig/branches/spark/test/org/apache/pig/test/TestGrunt.java URL: http://svn.apache.org/viewvc/pig/branches/spark/test/org/apache/pig/test/TestGrunt.java?rev=1791060&r1=1791059&r2=1791060&view=diff ============================================================================== --- pig/branches/spark/test/org/apache/pig/test/TestGrunt.java (original) +++ pig/branches/spark/test/org/apache/pig/test/TestGrunt.java Wed Apr 12 02:20:20 2017 @@ -1003,7 +1003,7 @@ public class TestGrunt { } else { //In spark mode, We wrap ExecException to RunTimeException and is thrown out in JobGraphBuilder#sparkOperToRDD, //So unwrap the exception here - assertTrue(((ExecException) e.getCause().getCause()).getErrorCode() == 6017); + assertTrue(((ExecException) e.getCause()).getErrorCode() == 6017); } } Modified: pig/branches/spark/test/org/apache/pig/test/TestProjectRange.java URL: http://svn.apache.org/viewvc/pig/branches/spark/test/org/apache/pig/test/TestProjectRange.java?rev=1791060&r1=1791059&r2=1791060&view=diff ============================================================================== --- pig/branches/spark/test/org/apache/pig/test/TestProjectRange.java (original) +++ pig/branches/spark/test/org/apache/pig/test/TestProjectRange.java Wed Apr 12 02:20:20 2017 @@ -651,7 +651,7 @@ public class TestProjectRange { }; Schema s = pigServer.dumpSchema("f"); Util.checkQueryOutputs(it, expectedRes,org.apache.pig.newplan.logical.Util.translateSchema(s), - Util.isSparkExecType(cluster.getExecType())); + Util.isSparkExecType(cluster.getExecType())); } /** @@ -738,8 +738,8 @@ public class TestProjectRange { "(1,{(11,21,31,41,51),(10,20,30,40,50)})", }; Schema s = pigServer.dumpSchema("f"); - Util.checkQueryOutputs(it, expectedRes,org.apache.pig.newplan.logical.Util.translateSchema(s), - Util.isSparkExecType(cluster.getExecType())); + Util.checkQueryOutputs(it, expectedRes,org.apache.pig.newplan.logical.Util.translateSchema(s), + Util.isSparkExecType(cluster.getExecType())); } @@ -928,8 +928,8 @@ public class TestProjectRange { " g = group l1 by .. c, l2 by .. c;" ; String expectedSchStr = "grp: (a: int,b: long,c: int)," + - "l1: {t : (a: int,b: long,c: int,d: int,e: int)}," + - "l2: {t : (a: int,b: long,c: int,d: int,e: int)}"; + "l1: {t : (a: int,b: long,c: int,d: int,e: int)}," + + "l2: {t : (a: int,b: long,c: int,d: int,e: int)}"; Schema expectedSch = getCleanedGroupSchema(expectedSchStr); compileAndCompareSchema(expectedSch, query, "g"); @@ -948,8 +948,8 @@ public class TestProjectRange { }; Iterator<Tuple> it = pigServer.openIterator("g"); Schema s = pigServer.dumpSchema("g"); - Util.checkQueryOutputs(it, expectedRes,org.apache.pig.newplan.logical.Util.translateSchema(s), - Util.isSparkExecType(cluster.getExecType())); + Util.checkQueryOutputs(it, expectedRes,org.apache.pig.newplan.logical.Util.translateSchema(s), + Util.isSparkExecType(cluster.getExecType())); } /** @@ -1017,7 +1017,7 @@ public class TestProjectRange { Iterator<Tuple> it = pigServer.openIterator("g"); Schema s = pigServer.dumpSchema("g"); Util.checkQueryOutputs(it,expectedRes, org.apache.pig.newplan.logical.Util.translateSchema(s), - Util.isSparkExecType(cluster.getExecType())); + Util.isSparkExecType(cluster.getExecType())); } @Test @@ -1068,8 +1068,8 @@ public class TestProjectRange { }; Iterator<Tuple> it = pigServer.openIterator("lim"); Schema s = pigServer.dumpSchema("lim"); - Util.checkQueryOutputs(it, expectedRes,org.apache.pig.newplan.logical.Util.translateSchema(s), - Util.isSparkExecType(cluster.getExecType())); + Util.checkQueryOutputs(it, expectedRes,org.apache.pig.newplan.logical.Util.translateSchema(s), + Util.isSparkExecType(cluster.getExecType())); } @@ -1132,7 +1132,7 @@ public class TestProjectRange { Iterator<Tuple> it = pigServer.openIterator("g"); Schema s = pigServer.dumpSchema("g"); Util.checkQueryOutputs(it, expectedRes, org.apache.pig.newplan.logical.Util.translateSchema(s), - Util.isSparkExecType(cluster.getExecType())); + Util.isSparkExecType(cluster.getExecType())); } private void setAliasesToNull(Schema schema) { @@ -1171,8 +1171,8 @@ public class TestProjectRange { }; Iterator<Tuple> it = pigServer.openIterator("j"); Schema s = pigServer.dumpSchema("j"); - Util.checkQueryOutputs(it, expectedRes,org.apache.pig.newplan.logical.Util.translateSchema(s), - Util.isSparkExecType(cluster.getExecType())); + Util.checkQueryOutputs(it, expectedRes, org.apache.pig.newplan.logical.Util.translateSchema(s), + Util.isSparkExecType(cluster.getExecType())); } @Test @@ -1200,8 +1200,8 @@ public class TestProjectRange { }; Iterator<Tuple> it = pigServer.openIterator("j"); Schema s = pigServer.dumpSchema("j"); - Util.checkQueryOutputs(it, expectedRes, org.apache.pig.newplan.logical.Util.translateSchema(s), - Util.isSparkExecType(cluster.getExecType())); + Util.checkQueryOutputs(it, expectedRes, org.apache.pig.newplan.logical.Util.translateSchema(s), + Util.isSparkExecType(cluster.getExecType())); } @Test @@ -1213,7 +1213,7 @@ public class TestProjectRange { " l2 = load '" + INP_FILE_5FIELDS + "';" + " g = cogroup l1 by ($0 .. ), l2 by ($0 .. );"; Util.checkExceptionMessage(query, "g", "Cogroup/Group by '*' or 'x..' " + - "(range of columns to the end) " + + "(range of columns to the end) " + "is only allowed if the input has a schema"); } Modified: pig/branches/spark/test/org/apache/pig/test/TestPruneColumn.java URL: http://svn.apache.org/viewvc/pig/branches/spark/test/org/apache/pig/test/TestPruneColumn.java?rev=1791060&r1=1791059&r2=1791060&view=diff ============================================================================== --- pig/branches/spark/test/org/apache/pig/test/TestPruneColumn.java (original) +++ pig/branches/spark/test/org/apache/pig/test/TestPruneColumn.java Wed Apr 12 02:20:20 2017 @@ -611,7 +611,7 @@ public class TestPruneColumn { "({(5)},{})" }; Util.checkQueryOutputs(iter, expected, org.apache.pig.newplan.logical.Util.translateSchema(pigServer.dumpSchema("D")), - Util.isSparkExecType(Util.getLocalTestMode())); + Util.isSparkExecType(Util.getLocalTestMode())); assertTrue(emptyLogFileMessage()); } @@ -963,7 +963,7 @@ public class TestPruneColumn { "((2,5,2))" }; Util.checkQueryOutputs(iter, expected, org.apache.pig.newplan.logical.Util.translateSchema(pigServer.dumpSchema("C")), - Util.isSparkExecType(Util.getLocalTestMode())); + Util.isSparkExecType(Util.getLocalTestMode())); assertTrue(emptyLogFileMessage()); } @@ -1513,7 +1513,7 @@ public class TestPruneColumn { "({(3),(3),(3)},3)" }; Util.checkQueryOutputs(iter, expected, org.apache.pig.newplan.logical.Util.translateSchema(pigServer.dumpSchema("G")), - Util.isSparkExecType(Util.getLocalTestMode())); + Util.isSparkExecType(Util.getLocalTestMode())); assertTrue(checkLogFileMessage(new String[]{"Columns pruned for A: $1"})); pigServer.getPigContext().getProperties().remove(PigImplConstants.PIG_OPTIMIZER_RULES_KEY); @@ -1535,7 +1535,7 @@ public class TestPruneColumn { "(5,{(2,5,2)})" }; Util.checkQueryOutputs(iter, expected, org.apache.pig.newplan.logical.Util.translateSchema(pigServer.dumpSchema("D")) - ,Util.isSparkExecType(Util.getLocalTestMode())); + , Util.isSparkExecType(Util.getLocalTestMode())); assertTrue(checkLogFileMessage(new String[]{"Columns pruned for B: $0"})); } @@ -1865,7 +1865,7 @@ public class TestPruneColumn { "(2,5,2,2)" }; Util.checkQueryOutputs(iter, expected, org.apache.pig.newplan.logical.Util.translateSchema(pigServer.dumpSchema("C")), - Util.isSparkExecType(Util.getLocalTestMode())); + Util.isSparkExecType(Util.getLocalTestMode())); assertTrue(emptyLogFileMessage()); Modified: pig/branches/spark/test/org/apache/pig/test/TestSchema.java URL: http://svn.apache.org/viewvc/pig/branches/spark/test/org/apache/pig/test/TestSchema.java?rev=1791060&r1=1791059&r2=1791060&view=diff ============================================================================== --- pig/branches/spark/test/org/apache/pig/test/TestSchema.java (original) +++ pig/branches/spark/test/org/apache/pig/test/TestSchema.java Wed Apr 12 02:20:20 2017 @@ -29,7 +29,9 @@ import java.io.IOException; import java.util.ArrayList; import java.util.Iterator; import java.util.List; +import java.util.UUID; +import org.apache.pig.PigConfiguration; import org.apache.pig.PigServer; import org.apache.pig.ResourceSchema; import org.apache.pig.data.DataType; @@ -42,10 +44,28 @@ import org.apache.pig.impl.util.Utils; import org.apache.pig.newplan.logical.relational.LogicalSchema; import org.apache.pig.newplan.logical.relational.LogicalSchema.MergeMode; import org.apache.pig.parser.ParserException; + +import org.junit.AfterClass; +import org.junit.Assert; +import org.junit.BeforeClass; import org.junit.Test; public class TestSchema { + private static MiniGenericCluster cluster; + private static PigServer pigServer; + + @BeforeClass + public static void setupTestCluster() throws Exception { + cluster = MiniGenericCluster.buildCluster(); + pigServer = new PigServer(cluster.getExecType(), cluster.getProperties()); + } + + @AfterClass + public static void tearDownTestCluster() throws Exception { + cluster.shutDown(); + } + @Test public void testSchemaEqual1() { @@ -660,8 +680,6 @@ public class TestSchema { @Test public void testSchemaSerialization() throws IOException { - MiniGenericCluster cluster = MiniGenericCluster.buildCluster(); - PigServer pigServer = new PigServer(cluster.getExecType(), cluster.getProperties()); String inputFileName = "testSchemaSerialization-input.txt"; String[] inputData = new String[] { "foo\t1", "hello\t2" }; Util.createInputFile(cluster, inputFileName, inputData); @@ -673,7 +691,6 @@ public class TestSchema { Tuple t = it.next(); assertEquals("{a: {(f1: chararray,f2: int)}}", t.get(0)); } - cluster.shutDown(); } @Test @@ -938,4 +955,79 @@ public class TestSchema { assertTrue(schemaString.equals(s2)); } } + + @Test + public void testDisabledDisambiguationContainsNoColons() throws IOException { + resetDisambiguationTestPropertyOverride(); + + String inputFileName = "testPrepend-input.txt"; + String[] inputData = new String[]{"apple\t1\tred", "orange\t2\torange", "kiwi\t3\tgreen", "orange\t4\torange"}; + Util.createInputFile(cluster, inputFileName, inputData); + + String script = "A = LOAD '" + inputFileName + "' AS (fruit:chararray, foo:int, color: chararray);" + + "B = LOAD '" + inputFileName + "' AS (id:chararray, bar:int);" + + "C = GROUP A BY (fruit,color);" + + "D = FOREACH C GENERATE FLATTEN(group), AVG(A.foo);" + + "D2 = FOREACH C GENERATE FLATTEN(group), AVG(A.foo) as avgFoo;" + + "E = JOIN B BY id, D BY group::fruit;" + + "F = UNION ONSCHEMA B, D2;" + + "G = CROSS B, D2;"; + + Util.registerMultiLineQuery(pigServer, script); + + //Prepending should happen with default settings + assertEquals("{B::id: chararray,B::bar: int,D::group::fruit: chararray,D::group::color: chararray,double}", pigServer.dumpSchema("E").toString()); + + //Override prepend property setting (check for flatten, join) + pigServer.getPigContext().getProperties().setProperty(PigConfiguration.PIG_STORE_SCHEMA_DISAMBIGUATE, "false"); + assertEquals("{id: chararray,bar: int,fruit: chararray,color: chararray,double}", pigServer.dumpSchema("E").toString()); + assertTrue(pigServer.openIterator("E").hasNext()); + + //Check for union and cross + assertEquals("{id: chararray,bar: int,fruit: chararray,color: chararray,avgFoo: double}", pigServer.dumpSchema("F").toString()); + assertEquals("{id: chararray,bar: int,fruit: chararray,color: chararray,avgFoo: double}", pigServer.dumpSchema("G").toString()); + + } + + @Test + public void testEnabledDisambiguationPassesForDupeAliases() throws IOException { + resetDisambiguationTestPropertyOverride(); + + checkForDupeAliases(); + + //Should pass with default settings + assertEquals("{A::id: chararray,A::val: int,B::id: chararray,B::val: int}", pigServer.dumpSchema("C").toString()); + assertTrue(pigServer.openIterator("C").hasNext()); + } + + @Test + public void testDisabledDisambiguationFailsForDupeAliases() throws IOException { + resetDisambiguationTestPropertyOverride(); + + try { + checkForDupeAliases(); + //Should fail with prepending disabled + pigServer.getPigContext().getProperties().setProperty(PigConfiguration.PIG_STORE_SCHEMA_DISAMBIGUATE, "false"); + pigServer.dumpSchema("C"); + } catch (FrontendException e){ + Assert.assertEquals("Duplicate schema alias: id in \"fake\"",e.getCause().getMessage()); + } + } + + private static void checkForDupeAliases() throws IOException { + String inputFileName = "testPrependFail-input" + UUID.randomUUID().toString() + ".txt"; + String[] inputData = new String[]{"foo\t1", "bar\t2"}; + Util.createInputFile(cluster, inputFileName, inputData); + + String script = "A = LOAD '" + inputFileName + "' AS (id:chararray, val:int);" + + "B = LOAD '" + inputFileName + "' AS (id:chararray, val:int);" + + "C = JOIN A by id, B by id;"; + + Util.registerMultiLineQuery(pigServer, script); + } + + private static void resetDisambiguationTestPropertyOverride() { + //Reset possible overrides + pigServer.getPigContext().getProperties().remove(PigConfiguration.PIG_STORE_SCHEMA_DISAMBIGUATE); + } } Modified: pig/branches/spark/test/org/apache/pig/test/TestSecondarySort.java URL: http://svn.apache.org/viewvc/pig/branches/spark/test/org/apache/pig/test/TestSecondarySort.java?rev=1791060&r1=1791059&r2=1791060&view=diff ============================================================================== --- pig/branches/spark/test/org/apache/pig/test/TestSecondarySort.java (original) +++ pig/branches/spark/test/org/apache/pig/test/TestSecondarySort.java Wed Apr 12 02:20:20 2017 @@ -270,7 +270,7 @@ public abstract class TestSecondarySort "(1,{(1,2,3),(1,2,4),(1,3,4)})" }; Util.checkQueryOutputs(iter, expected, org.apache - .pig.newplan.logical.Util.translateSchema(s), + .pig.newplan.logical.Util.translateSchema(s), Util.isSparkExecType(Util.getLocalTestMode())); Util.deleteFile(cluster, clusterPath); } Modified: pig/branches/spark/test/org/apache/pig/tez/TestTezCompiler.java URL: http://svn.apache.org/viewvc/pig/branches/spark/test/org/apache/pig/tez/TestTezCompiler.java?rev=1791060&r1=1791059&r2=1791060&view=diff ============================================================================== --- pig/branches/spark/test/org/apache/pig/tez/TestTezCompiler.java (original) +++ pig/branches/spark/test/org/apache/pig/tez/TestTezCompiler.java Wed Apr 12 02:20:20 2017 @@ -203,6 +203,25 @@ public class TestTezCompiler { resetScope(); resetFileLocalizer(); run(query, "test/org/apache/pig/test/data/GoldenFiles/tez/TEZC-LoadStore-6.gld"); + + // Three levels of splits - a, a1 and a2. + // One split above and one split below a1 which is the split to be replaced with tmp store. + query = + "a = load 'file:///tmp/input';" + + "store a into 'file:///tmp/pigoutput/Dir0';" + + "a1 = filter a by $0 == 5;" + + "store a1 into 'file:///tmp/pigoutput/Dir1';" + + "a2 = distinct a1;" + + "store a2 into 'file:///tmp/pigoutput/Dir2';" + + "a3 = group a2 by $0;" + + "store a3 into 'file:///tmp/pigoutput/Dir3';" + + "b = load 'file:///tmp/pigoutput/Dir3';" + + "c = join a1 by $0, b by $0;" + + "store c into 'file:///tmp/pigoutput/Dir4';"; + + resetScope(); + resetFileLocalizer(); + run(query, "test/org/apache/pig/test/data/GoldenFiles/tez/TEZC-LoadStore-7.gld"); } @Test
