Modified: pig/branches/spark/src/org/apache/pig/tools/parameters/PreprocessorContext.java URL: http://svn.apache.org/viewvc/pig/branches/spark/src/org/apache/pig/tools/parameters/PreprocessorContext.java?rev=1783988&r1=1783987&r2=1783988&view=diff ============================================================================== --- pig/branches/spark/src/org/apache/pig/tools/parameters/PreprocessorContext.java (original) +++ pig/branches/spark/src/org/apache/pig/tools/parameters/PreprocessorContext.java Wed Feb 22 09:43:41 2017 @@ -27,6 +27,8 @@ import java.io.FileReader; import java.io.IOException; import java.io.InputStreamReader; import java.io.StringReader; +import java.util.ArrayDeque; +import java.util.Deque; import java.util.Hashtable; import java.util.List; import java.util.Map; @@ -40,20 +42,26 @@ import org.apache.pig.impl.PigContext; import org.apache.pig.impl.logicalLayer.FrontendException; import org.apache.pig.validator.BlackAndWhitelistFilter; import org.apache.pig.validator.PigCommandFilter; -import org.python.google.common.base.Preconditions; public class PreprocessorContext { - private Map<String, String> param_val; + private int tableinitsize = 10; + private Deque<Map<String,String>> param_val_stack; - // used internally to detect when a param is set multiple times, - // but it set with the same value so it's ok not to log a warning - private Map<String, String> param_source; - private PigContext pigContext; public Map<String, String> getParamVal() { - return param_val; + Map <String, String> ret = new Hashtable <String, String>(tableinitsize); + + //stack (deque) iterates LIFO + for (Map <String, String> map : param_val_stack ) { + for (Map.Entry<String, String> entry : map.entrySet()) { + if( ! ret.containsKey(entry.getKey()) ) { + ret.put(entry.getKey(), entry.getValue()); + } + } + } + return ret; } private final Log log = LogFactory.getLog(getClass()); @@ -63,24 +71,15 @@ public class PreprocessorContext { * smaller number only impacts performance */ public PreprocessorContext(int limit) { - param_val = new Hashtable<String, String> (limit); - param_source = new Hashtable<String, String> (limit); - } - - public PreprocessorContext(Map<String, String> paramVal) { - param_val = paramVal; - param_source = new Hashtable<String, String>(paramVal); + tableinitsize = limit; + param_val_stack = new ArrayDeque<Map<String,String>> (); + param_val_stack.push(new Hashtable<String, String> (tableinitsize)); } public void setPigContext(PigContext context) { this.pigContext = context; } - /* - public void processLiteral(String key, String val) { - processLiteral(key, val, true); - } */ - /** * This method generates parameter value by running specified command * @@ -102,20 +101,35 @@ public class PreprocessorContext { processOrdLine(key, val, true); } - /* - public void processLiteral(String key, String val, Boolean overwrite) { + public void paramScopePush() { + param_val_stack.push( new Hashtable<String, String> (tableinitsize) ); + } - if (param_val.containsKey(key)) { - if (overwrite) { - log.warn("Warning : Multiple values found for " + key + ". Using value " + val); - } else { - return; + public void paramScopePop() { + param_val_stack.pop(); + } + + public boolean paramval_containsKey(String key) { + for (Map <String, String> map : param_val_stack ) { + if( map.containsKey(key) ) { + return true; } } + return false; + } - String sub_val = substitute(val); - param_val.put(key, sub_val); - } */ + public String paramval_get(String key) { + for (Map <String, String> map : param_val_stack ) { + if( map.containsKey(key) ) { + return map.get(key); + } + } + return null; + } + + public void paramval_put(String key, String value) { + param_val_stack.peek().put(key, value); + } /** * This method generates parameter value by running specified command @@ -129,21 +143,21 @@ public class PreprocessorContext { filter.validate(PigCommandFilter.Command.SH); } - if (param_val.containsKey(key)) { - if (param_source.get(key).equals(val) || !overwrite) { - return; - } else { - log.warn("Warning : Multiple values found for " + key - + ". Using value " + val); - } + if (paramval_containsKey(key) && !overwrite) { + return; } - param_source.put(key, val); - val = val.substring(1, val.length()-1); //to remove the backticks String sub_val = substitute(val); sub_val = executeShellCommand(sub_val); - param_val.put(key, sub_val); + + if (paramval_containsKey(key) && !paramval_get(key).equals(sub_val) ) { + //(boolean overwrite is always true here) + log.warn("Warning : Multiple values found for " + key + " command `" + val + "`. " + + "Previous value " + paramval_get(key) + ", now using value " + sub_val); + } + + paramval_put(key, sub_val); } public void validate(String preprocessorCmd) throws FrontendException { @@ -175,18 +189,18 @@ public class PreprocessorContext { */ public void processOrdLine(String key, String val, Boolean overwrite) throws ParameterSubstitutionException { - if (param_val.containsKey(key)) { - if (param_source.get(key).equals(val) || !overwrite) { + String sub_val = substitute(val, key); + if (paramval_containsKey(key)) { + if (paramval_get(key).equals(sub_val) || !overwrite) { return; } else { - log.warn("Warning : Multiple values found for " + key + ". Using value " + val); + log.warn("Warning : Multiple values found for " + key + + ". Previous value " + paramval_get(key) + + ", now using value " + sub_val); } } - param_source.put(key, val); - - String sub_val = substitute(val, key); - param_val.put(key, sub_val); + paramval_put(key, sub_val); } @@ -318,7 +332,7 @@ public class PreprocessorContext { while (bracketKeyMatcher.find()) { if ( (bracketKeyMatcher.start() == 0) || (line.charAt( bracketKeyMatcher.start() - 1)) != '\\' ) { key = bracketKeyMatcher.group(1); - if (!(param_val.containsKey(key))) { + if (!(paramval_containsKey(key))) { String message; if (parentKey == null) { message = "Undefined parameter : " + key; @@ -327,7 +341,7 @@ public class PreprocessorContext { } throw new ParameterSubstitutionException(message); } - val = param_val.get(key); + val = paramval_get(key); if (val.contains("$")) { val = val.replaceAll("(?<!\\\\)\\$", "\\\\\\$"); } @@ -345,7 +359,7 @@ public class PreprocessorContext { // for escaped vars of the form \$<id> if ( (keyMatcher.start() == 0) || (line.charAt( keyMatcher.start() - 1)) != '\\' ) { key = keyMatcher.group(1); - if (!(param_val.containsKey(key))) { + if (!(paramval_containsKey(key))) { String message; if (parentKey == null) { message = "Undefined parameter : " + key; @@ -354,7 +368,7 @@ public class PreprocessorContext { } throw new ParameterSubstitutionException(message); } - val = param_val.get(key); + val = paramval_get(key); if (val.contains("$")) { val = val.replaceAll("(?<!\\\\)\\$", "\\\\\\$"); }
Modified: pig/branches/spark/src/org/apache/pig/tools/pigscript/parser/PigScriptParser.jj URL: http://svn.apache.org/viewvc/pig/branches/spark/src/org/apache/pig/tools/pigscript/parser/PigScriptParser.jj?rev=1783988&r1=1783987&r2=1783988&view=diff ============================================================================== --- pig/branches/spark/src/org/apache/pig/tools/pigscript/parser/PigScriptParser.jj (original) +++ pig/branches/spark/src/org/apache/pig/tools/pigscript/parser/PigScriptParser.jj Wed Feb 22 09:43:41 2017 @@ -23,6 +23,7 @@ options { STATIC = false; // Case is ignored in keywords IGNORE_CASE = true; + // DEBUG_PARSER = true; JAVA_UNICODE_ESCAPE = true; } @@ -36,7 +37,7 @@ import java.util.List; import java.util.ArrayList; import org.apache.pig.impl.util.StringUtils; -import jline.ConsoleReader; +import jline.console.ConsoleReader; public abstract class PigScriptParser { @@ -217,7 +218,7 @@ TOKEN_MGR_DECLS : { { /*System.err.print(">> "); System.err.flush();*/ - consoleReader.setDefaultPrompt(">> "); + consoleReader.setPrompt(">> "); } } @@ -267,7 +268,7 @@ TOKEN_MGR_DECLS : { <"'"> {prevState = PIG_START;} : IN_STRING | <"`"> {prevState = PIG_START;} : IN_COMMAND | <(" " | "\t")+["A","a"]["S","s"](" " | "\t")+ > {prevState = PIG_START;} : SCHEMA_DEFINITION -| <(" " | "\t")+["G","g"]["E","e"]["N","n"]["E","e"]["R","r"]["A","a"]["T","t"]["E","e"](" " | "\t")+ > {prevState = PIG_START;} : GENERATE +| <(" " | "\t")+["G","g"]["E","e"]["N","n"]["E","e"]["R","r"]["A","a"]["T","t"]["E","e"](" " | "\t" | "\r" | "\n")+ > {prevState = PIG_START;} : GENERATE | <"{"> {pigBlockLevel = 1;} : IN_BLOCK | <"}"> {if (true) throw new TokenMgrError("Unmatched '}'", TokenMgrError.LEXICAL_ERROR);} | <";"> : PIG_END @@ -292,7 +293,8 @@ TOKEN_MGR_DECLS : { <IN_STRING> MORE : { - <"\\'"> + <"\\\\"> +| <"\\'"> | <"'"> { SwitchTo(prevState);} | <("\n" | "\r" | "\r\n")> {secondary_prompt();} | <(~[])> @@ -395,7 +397,7 @@ TOKEN_MGR_DECLS : { { <"\""> {prevState = IN_BLOCK;} : IN_DOUBLE_QUOTED_STRING | <(" " | "\t")+["A","a"]["S","s"](" " | "\t")+ > {prevState = IN_BLOCK;} : SCHEMA_DEFINITION -| <(" " | "\t")+["G","g"]["E","e"]["N","n"]["E","e"]["R","r"]["A","a"]["T","t"]["E","e"](" " | "\t")+> {prevState = IN_BLOCK;} : GENERATE +| <(" " | "\t")+["G","g"]["E","e"]["N","n"]["E","e"]["R","r"]["A","a"]["T","t"]["E","e"](" " | "\t" | "\r" | "\n")+> {prevState = IN_BLOCK;} : GENERATE | <"{"> {pigBlockLevel++;} | <"}"(";")?> {pigBlockLevel--; if (pigBlockLevel == 0) SwitchTo(PIG_END);} | <"'"> {prevState = IN_BLOCK;} : IN_STRING Modified: pig/branches/spark/src/org/apache/pig/tools/pigstats/EmbeddedPigStats.java URL: http://svn.apache.org/viewvc/pig/branches/spark/src/org/apache/pig/tools/pigstats/EmbeddedPigStats.java?rev=1783988&r1=1783987&r2=1783988&view=diff ============================================================================== --- pig/branches/spark/src/org/apache/pig/tools/pigstats/EmbeddedPigStats.java (original) +++ pig/branches/spark/src/org/apache/pig/tools/pigstats/EmbeddedPigStats.java Wed Feb 22 09:43:41 2017 @@ -147,6 +147,11 @@ final class EmbeddedPigStats extends Pig } @Override + public String getDisplayString() { + return null; + } + + @Override public long getProactiveSpillCountObjects() { throw new UnsupportedOperationException(); } Modified: pig/branches/spark/src/org/apache/pig/tools/pigstats/EmptyPigStats.java URL: http://svn.apache.org/viewvc/pig/branches/spark/src/org/apache/pig/tools/pigstats/EmptyPigStats.java?rev=1783988&r1=1783987&r2=1783988&view=diff ============================================================================== --- pig/branches/spark/src/org/apache/pig/tools/pigstats/EmptyPigStats.java (original) +++ pig/branches/spark/src/org/apache/pig/tools/pigstats/EmptyPigStats.java Wed Feb 22 09:43:41 2017 @@ -87,6 +87,11 @@ public class EmptyPigStats extends PigSt } @Override + public String getDisplayString() { + return null; + } + + @Override public JobGraph getJobGraph() { return emptyJobPlan; } Modified: pig/branches/spark/src/org/apache/pig/tools/pigstats/PigStats.java URL: http://svn.apache.org/viewvc/pig/branches/spark/src/org/apache/pig/tools/pigstats/PigStats.java?rev=1783988&r1=1783987&r2=1783988&view=diff ============================================================================== --- pig/branches/spark/src/org/apache/pig/tools/pigstats/PigStats.java (original) +++ pig/branches/spark/src/org/apache/pig/tools/pigstats/PigStats.java Wed Feb 22 09:43:41 2017 @@ -134,6 +134,11 @@ public abstract class PigStats { } /** + * Returns the display message in pig grunt + */ + public abstract String getDisplayString(); + + /** * Returns the DAG of jobs spawned by the script */ public JobGraph getJobGraph() { @@ -265,6 +270,13 @@ public abstract class PigStats { return ScriptState.get().getPigVersion(); } + /** + * Returns the contents of the script that was run. + */ + public String getScript() { + return ScriptState.get().getScript(); + } + public String getScriptId() { return ScriptState.get().getId(); } Modified: pig/branches/spark/src/org/apache/pig/tools/pigstats/PigStatsUtil.java URL: http://svn.apache.org/viewvc/pig/branches/spark/src/org/apache/pig/tools/pigstats/PigStatsUtil.java?rev=1783988&r1=1783987&r2=1783988&view=diff ============================================================================== --- pig/branches/spark/src/org/apache/pig/tools/pigstats/PigStatsUtil.java (original) +++ pig/branches/spark/src/org/apache/pig/tools/pigstats/PigStatsUtil.java Wed Feb 22 09:43:41 2017 @@ -24,7 +24,7 @@ import java.util.regex.Matcher; import java.util.regex.Pattern; import org.apache.pig.backend.hadoop.executionengine.physicalLayer.relationalOperators.POStore; -import org.apache.pig.backend.hadoop.executionengine.shims.HadoopShims; +import org.apache.pig.tools.pigstats.mapreduce.MRPigStatsUtil; import org.apache.pig.tools.pigstats.mapreduce.SimplePigStats; /** @@ -71,7 +71,7 @@ public class PigStatsUtil { */ @Deprecated public static final String FS_COUNTER_GROUP - = HadoopShims.getFsCounterGroupName(); + = MRPigStatsUtil.FS_COUNTER_GROUP; /** * Returns an empty PigStats object Use of this method is not advised as it Modified: pig/branches/spark/src/org/apache/pig/tools/pigstats/ScriptState.java URL: http://svn.apache.org/viewvc/pig/branches/spark/src/org/apache/pig/tools/pigstats/ScriptState.java?rev=1783988&r1=1783987&r2=1783988&view=diff ============================================================================== --- pig/branches/spark/src/org/apache/pig/tools/pigstats/ScriptState.java (original) +++ pig/branches/spark/src/org/apache/pig/tools/pigstats/ScriptState.java Wed Feb 22 09:43:41 2017 @@ -133,6 +133,8 @@ public abstract class ScriptState { MERGE_SPARSE_JOIN, REPLICATED_JOIN, SKEWED_JOIN, + BUILD_BLOOM, + FILTER_BLOOM, HASH_JOIN, COLLECTED_GROUP, MERGE_COGROUP, @@ -312,7 +314,7 @@ public abstract class ScriptState { maxScriptSize = Integer.valueOf(prop); } } - + this.truncatedScript = (script.length() > maxScriptSize) ? script.substring(0, maxScriptSize) : script; @@ -485,6 +487,10 @@ public abstract class ScriptState { public void visit(LOJoin op) { if (op.getJoinType() == JOINTYPE.HASH) { feature.set(PIG_FEATURE.HASH_JOIN.ordinal()); + } else if (op.getJoinType() == JOINTYPE.BLOOM) { + feature.set(PIG_FEATURE.HASH_JOIN.ordinal()); + feature.set(PIG_FEATURE.BUILD_BLOOM.ordinal()); + feature.set(PIG_FEATURE.FILTER_BLOOM.ordinal()); } else if (op.getJoinType() == JOINTYPE.MERGE) { feature.set(PIG_FEATURE.MERGE_JOIN.ordinal()); } else if (op.getJoinType() == JOINTYPE.MERGESPARSE) { @@ -506,6 +512,7 @@ public abstract class ScriptState { feature.set(PIG_FEATURE.RANK.ordinal()); } + @Override public void visit(LOSort op) { feature.set(PIG_FEATURE.ORDER_BY.ordinal()); } Modified: pig/branches/spark/src/org/apache/pig/tools/pigstats/mapreduce/MRJobStats.java URL: http://svn.apache.org/viewvc/pig/branches/spark/src/org/apache/pig/tools/pigstats/mapreduce/MRJobStats.java?rev=1783988&r1=1783987&r2=1783988&view=diff ============================================================================== --- pig/branches/spark/src/org/apache/pig/tools/pigstats/mapreduce/MRJobStats.java (original) +++ pig/branches/spark/src/org/apache/pig/tools/pigstats/mapreduce/MRJobStats.java Wed Feb 22 09:43:41 2017 @@ -32,15 +32,16 @@ import org.apache.hadoop.conf.Configurat import org.apache.hadoop.mapred.Counters; import org.apache.hadoop.mapred.Counters.Counter; import org.apache.hadoop.mapred.JobID; -import org.apache.hadoop.mapred.TaskReport; +import org.apache.hadoop.mapreduce.Cluster; +import org.apache.hadoop.mapreduce.TaskReport; import org.apache.hadoop.mapred.jobcontrol.Job; import org.apache.hadoop.mapreduce.TaskType; +import org.apache.pig.PigConfiguration; import org.apache.pig.PigCounters; import org.apache.pig.backend.hadoop.executionengine.mapReduceLayer.JobControlCompiler; import org.apache.pig.backend.hadoop.executionengine.mapReduceLayer.MRConfiguration; import org.apache.pig.backend.hadoop.executionengine.mapReduceLayer.MapReduceOper; import org.apache.pig.backend.hadoop.executionengine.physicalLayer.relationalOperators.POStore; -import org.apache.pig.backend.hadoop.executionengine.shims.HadoopShims; import org.apache.pig.classification.InterfaceAudience; import org.apache.pig.classification.InterfaceStability; import org.apache.pig.impl.io.FileSpec; @@ -53,6 +54,8 @@ import org.apache.pig.tools.pigstats.Out import org.apache.pig.tools.pigstats.PigStats.JobGraph; import org.apache.pig.tools.pigstats.PigStats.JobGraphPrinter; +import org.python.google.common.collect.Lists; + /** * This class encapsulates the runtime statistics of a MapReduce job. @@ -281,7 +284,7 @@ public final class MRJobStats extends Jo void addCounters(Job job) { try { - counters = HadoopShims.getCounters(job); + counters = getCounters(job); } catch (IOException e) { LOG.warn("Unable to get job counters", e); } @@ -349,13 +352,13 @@ public final class MRJobStats extends Jo void addMapReduceStatistics(Job job) { Iterator<TaskReport> maps = null; try { - maps = HadoopShims.getTaskReports(job, TaskType.MAP); + maps = getTaskReports(job, TaskType.MAP); } catch (IOException e) { LOG.warn("Failed to get map task report", e); } Iterator<TaskReport> reduces = null; try { - reduces = HadoopShims.getTaskReports(job, TaskType.REDUCE); + reduces = getTaskReports(job, TaskType.REDUCE); } catch (IOException e) { LOG.warn("Failed to get reduce task report", e); } @@ -515,4 +518,35 @@ public final class MRJobStats extends Jo inputs.add(is); } + public static Iterator<TaskReport> getTaskReports(Job job, TaskType type) throws IOException { + if (job.getJobConf().getBoolean(PigConfiguration.PIG_NO_TASK_REPORT, false)) { + LOG.info("TaskReports are disabled for job: " + job.getAssignedJobID()); + return null; + } + Cluster cluster = new Cluster(job.getJobConf()); + try { + org.apache.hadoop.mapreduce.Job mrJob = cluster.getJob(job.getAssignedJobID()); + if (mrJob == null) { // In local mode, mrJob will be null + mrJob = job.getJob(); + } + org.apache.hadoop.mapreduce.TaskReport[] reports = mrJob.getTaskReports(type); + return Lists.newArrayList(reports).iterator(); + } catch (InterruptedException ir) { + throw new IOException(ir); + } + } + + public static Counters getCounters(Job job) throws IOException { + try { + Cluster cluster = new Cluster(job.getJobConf()); + org.apache.hadoop.mapreduce.Job mrJob = cluster.getJob(job.getAssignedJobID()); + if (mrJob == null) { // In local mode, mrJob will be null + mrJob = job.getJob(); + } + return new Counters(mrJob.getCounters()); + } catch (Exception ir) { + throw new IOException(ir); + } + } + } Modified: pig/branches/spark/src/org/apache/pig/tools/pigstats/mapreduce/MRPigStatsUtil.java URL: http://svn.apache.org/viewvc/pig/branches/spark/src/org/apache/pig/tools/pigstats/mapreduce/MRPigStatsUtil.java?rev=1783988&r1=1783987&r2=1783988&view=diff ============================================================================== --- pig/branches/spark/src/org/apache/pig/tools/pigstats/mapreduce/MRPigStatsUtil.java (original) +++ pig/branches/spark/src/org/apache/pig/tools/pigstats/mapreduce/MRPigStatsUtil.java Wed Feb 22 09:43:41 2017 @@ -33,7 +33,6 @@ import org.apache.pig.backend.hadoop.exe import org.apache.pig.backend.hadoop.executionengine.mapReduceLayer.MapReduceOper; import org.apache.pig.backend.hadoop.executionengine.mapReduceLayer.NativeMapReduceOper; import org.apache.pig.backend.hadoop.executionengine.mapReduceLayer.plans.MROperPlan; -import org.apache.pig.backend.hadoop.executionengine.shims.HadoopShims; import org.apache.pig.classification.InterfaceAudience.Private; import org.apache.pig.impl.PigContext; import org.apache.pig.tools.pigstats.JobStats; @@ -51,7 +50,7 @@ public class MRPigStatsUtil extends PigS public static final String TASK_COUNTER_GROUP = "org.apache.hadoop.mapred.Task$Counter"; public static final String FS_COUNTER_GROUP - = HadoopShims.getFsCounterGroupName(); + = "org.apache.hadoop.mapreduce.FileSystemCounter"; private static final Log LOG = LogFactory.getLog(MRPigStatsUtil.class); Modified: pig/branches/spark/src/org/apache/pig/tools/pigstats/mapreduce/SimplePigStats.java URL: http://svn.apache.org/viewvc/pig/branches/spark/src/org/apache/pig/tools/pigstats/mapreduce/SimplePigStats.java?rev=1783988&r1=1783987&r2=1783988&view=diff ============================================================================== --- pig/branches/spark/src/org/apache/pig/tools/pigstats/mapreduce/SimplePigStats.java (original) +++ pig/branches/spark/src/org/apache/pig/tools/pigstats/mapreduce/SimplePigStats.java Wed Feb 22 09:43:41 2017 @@ -207,13 +207,18 @@ public final class SimplePigStats extend } void display() { + LOG.info(getDisplayString()); + } + + @Override + public String getDisplayString() { if (returnCode == ReturnCode.UNKNOWN) { LOG.warn("unknown return code, can't display the results"); - return; + return ""; } if (pigContext == null) { LOG.warn("unknown exec type, don't display the results"); - return; + return ""; } SimpleDateFormat sdf = new SimpleDateFormat(DATE_FORMAT); @@ -276,7 +281,7 @@ public final class SimplePigStats extend sb.append("\nJob DAG:\n").append(jobPlan.toString()); - LOG.info("Script Statistics: \n" + sb.toString()); + return "Script Statistics: \n" + sb.toString(); } void mapMROperToJob(MapReduceOper mro, Job job) { Modified: pig/branches/spark/src/org/apache/pig/tools/pigstats/spark/SparkPigStats.java URL: http://svn.apache.org/viewvc/pig/branches/spark/src/org/apache/pig/tools/pigstats/spark/SparkPigStats.java?rev=1783988&r1=1783987&r2=1783988&view=diff ============================================================================== --- pig/branches/spark/src/org/apache/pig/tools/pigstats/spark/SparkPigStats.java (original) +++ pig/branches/spark/src/org/apache/pig/tools/pigstats/spark/SparkPigStats.java Wed Feb 22 09:43:41 2017 @@ -23,7 +23,6 @@ import java.util.Iterator; import java.util.List; import java.util.Map; import java.util.Set; -import java.util.Properties; import org.apache.commons.logging.Log; import org.apache.commons.logging.LogFactory; @@ -40,7 +39,6 @@ import org.apache.pig.impl.PigContext; import org.apache.pig.impl.plan.VisitorException; import org.apache.pig.tools.pigstats.InputStats; import org.apache.pig.tools.pigstats.JobStats; -import org.apache.pig.tools.pigstats.OutputStats; import org.apache.pig.tools.pigstats.PigStats; import org.apache.pig.tools.pigstats.ScriptState; import org.apache.spark.api.java.JavaSparkContext; @@ -115,29 +113,35 @@ public class SparkPigStats extends PigSt } private void display() { + LOG.info(getDisplayString()); + } + + public String getDisplayString() { + StringBuilder sb = new StringBuilder(); Iterator<JobStats> iter = jobPlan.iterator(); while (iter.hasNext()) { - SparkJobStats js = (SparkJobStats)iter.next(); + SparkJobStats js = (SparkJobStats) iter.next(); if (jobSparkOperatorMap.containsKey(js)) { SparkOperator sparkOperator = jobSparkOperatorMap.get(js); js.setAlias(sparkOperator); } - LOG.info( "Spark Job [" + js.getJobId() + "] Metrics"); + sb.append("Spark Job [" + js.getJobId() + "] Metrics"); Map<String, Long> stats = js.getStats(); if (stats == null) { - LOG.info("No statistics found for job " + js.getJobId()); - return; + sb.append("No statistics found for job " + js.getJobId()); + return sb.toString(); } Iterator statIt = stats.entrySet().iterator(); while (statIt.hasNext()) { - Map.Entry pairs = (Map.Entry)statIt.next(); - LOG.info("\t" + pairs.getKey() + " : " + pairs.getValue()); + Map.Entry pairs = (Map.Entry) statIt.next(); + sb.append("\t" + pairs.getKey() + " : " + pairs.getValue()); } - for (InputStats inputStat : js.getInputs()){ - LOG.info("\t"+inputStat.getDisplayString()); + for (InputStats inputStat : js.getInputs()) { + sb.append("\t" + inputStat.getDisplayString()); } } + return sb.toString(); } @Override @@ -217,4 +221,4 @@ public class SparkPigStats extends PigSt sparkOperatorsSet.add(sparkOperator); } -} +} \ No newline at end of file Modified: pig/branches/spark/src/org/apache/pig/tools/pigstats/spark/SparkStatsUtil.java URL: http://svn.apache.org/viewvc/pig/branches/spark/src/org/apache/pig/tools/pigstats/spark/SparkStatsUtil.java?rev=1783988&r1=1783987&r2=1783988&view=diff ============================================================================== --- pig/branches/spark/src/org/apache/pig/tools/pigstats/spark/SparkStatsUtil.java (original) +++ pig/branches/spark/src/org/apache/pig/tools/pigstats/spark/SparkStatsUtil.java Wed Feb 22 09:43:41 2017 @@ -18,12 +18,8 @@ package org.apache.pig.tools.pigstats.spark; -import java.util.List; - -import org.apache.pig.backend.hadoop.executionengine.physicalLayer.PhysicalOperator; -import org.apache.pig.backend.hadoop.executionengine.physicalLayer.plans.PhysicalPlan; +import org.apache.hadoop.mapred.JobConf; import org.apache.pig.backend.hadoop.executionengine.physicalLayer.relationalOperators.POLoad; -import org.apache.pig.backend.hadoop.executionengine.physicalLayer.relationalOperators.POSplit; import org.apache.pig.backend.hadoop.executionengine.physicalLayer.relationalOperators.POStore; import org.apache.pig.backend.hadoop.executionengine.spark.JobMetricsListener; import org.apache.pig.backend.hadoop.executionengine.spark.operator.NativeSparkOperator; @@ -101,21 +97,7 @@ public class SparkStatsUtil { public static long getLoadSparkCounterValue(POLoad load) { SparkPigStatusReporter reporter = SparkPigStatusReporter.getInstance(); - int loadersCount = countCoLoadsIfInSplit(load,load.getParentPlan()); - return reporter.getCounters().getValue(SPARK_INPUT_COUNTER_GROUP, getLoadSparkCounterName(load))/loadersCount; - } - - private static int countCoLoadsIfInSplit(PhysicalOperator op, PhysicalPlan pp){ - List<PhysicalOperator> successors = pp.getSuccessors(op); - if (successors == null || successors.size()==0) return 1; - for (PhysicalOperator successor : successors){ - if (successor instanceof POSplit){ - return ((POSplit)successor).getPlans().size(); - }else{ - return countCoLoadsIfInSplit(successor,pp); - } - } - return 1; + return reporter.getCounters().getValue(SPARK_INPUT_COUNTER_GROUP, getLoadSparkCounterName(load)); } public static boolean isJobSuccess(int jobID, Modified: pig/branches/spark/src/org/apache/pig/tools/pigstats/tez/TezDAGStats.java URL: http://svn.apache.org/viewvc/pig/branches/spark/src/org/apache/pig/tools/pigstats/tez/TezDAGStats.java?rev=1783988&r1=1783987&r2=1783988&view=diff ============================================================================== --- pig/branches/spark/src/org/apache/pig/tools/pigstats/tez/TezDAGStats.java (original) +++ pig/branches/spark/src/org/apache/pig/tools/pigstats/tez/TezDAGStats.java Wed Feb 22 09:43:41 2017 @@ -245,7 +245,11 @@ public class TezDAGStats extends JobStat OutputStats existingOut = outputsByLocation.get(output.getLocation()); // In case of multistore, bytesWritten is already calculated // from size of all the files in the output directory. - if (!output.getPOStore().isMultiStore() && output.getBytes() > -1) { + // So use that if there is a combination of multistore and single store + if (output.getPOStore().isMultiStore()) { + existingOut.setBytes(output.getBytes()); + existingOut.setPOStore(output.getPOStore()); + } else if (!existingOut.getPOStore().isMultiStore() && output.getBytes() > -1) { long bytes = existingOut.getBytes() > -1 ? (existingOut.getBytes() + output.getBytes()) : output.getBytes(); Modified: pig/branches/spark/src/org/apache/pig/tools/pigstats/tez/TezPigScriptStats.java URL: http://svn.apache.org/viewvc/pig/branches/spark/src/org/apache/pig/tools/pigstats/tez/TezPigScriptStats.java?rev=1783988&r1=1783987&r2=1783988&view=diff ============================================================================== --- pig/branches/spark/src/org/apache/pig/tools/pigstats/tez/TezPigScriptStats.java (original) +++ pig/branches/spark/src/org/apache/pig/tools/pigstats/tez/TezPigScriptStats.java Wed Feb 22 09:43:41 2017 @@ -117,6 +117,11 @@ public class TezPigScriptStats extends P } private void display() { + LOG.info(getDisplayString()); + } + + @Override + public String getDisplayString() { SimpleDateFormat sdf = new SimpleDateFormat(DATE_FORMAT); StringBuilder sb = new StringBuilder(); sb.append("\n"); @@ -170,7 +175,7 @@ public class TezPigScriptStats extends P for (OutputStats os : getOutputStats()) { sb.append(os.getDisplayString().trim()).append("\n"); } - LOG.info("Script Statistics:\n" + sb.toString()); + return "Script Statistics:\n" + sb.toString(); } /** Modified: pig/branches/spark/src/org/apache/pig/tools/pigstats/tez/TezScriptState.java URL: http://svn.apache.org/viewvc/pig/branches/spark/src/org/apache/pig/tools/pigstats/tez/TezScriptState.java?rev=1783988&r1=1783987&r2=1783988&view=diff ============================================================================== --- pig/branches/spark/src/org/apache/pig/tools/pigstats/tez/TezScriptState.java (original) +++ pig/branches/spark/src/org/apache/pig/tools/pigstats/tez/TezScriptState.java Wed Feb 22 09:43:41 2017 @@ -275,6 +275,12 @@ public class TezScriptState extends Scri if (tezOp.isRegularJoin()) { feature.set(PIG_FEATURE.HASH_JOIN.ordinal()); } + if (tezOp.isBuildBloom()) { + feature.set(PIG_FEATURE.BUILD_BLOOM.ordinal()); + } + if (tezOp.isFilterBloom()) { + feature.set(PIG_FEATURE.FILTER_BLOOM.ordinal()); + } if (tezOp.isUnion()) { feature.set(PIG_FEATURE.UNION.ordinal()); } Modified: pig/branches/spark/src/org/apache/pig/tools/pigstats/tez/TezVertexStats.java URL: http://svn.apache.org/viewvc/pig/branches/spark/src/org/apache/pig/tools/pigstats/tez/TezVertexStats.java?rev=1783988&r1=1783987&r2=1783988&view=diff ============================================================================== --- pig/branches/spark/src/org/apache/pig/tools/pigstats/tez/TezVertexStats.java (original) +++ pig/branches/spark/src/org/apache/pig/tools/pigstats/tez/TezVertexStats.java Wed Feb 22 09:43:41 2017 @@ -22,6 +22,7 @@ import static org.apache.pig.tools.pigst import static org.apache.pig.tools.pigstats.tez.TezDAGStats.TASK_COUNTER_GROUP; import java.io.IOException; +import java.util.ArrayList; import java.util.Collections; import java.util.HashMap; import java.util.Iterator; @@ -289,13 +290,19 @@ public class TezVertexStats extends JobS } // Split followed by union will have multiple stores writing to same location - Map<String, POStore> uniqueOutputs = new HashMap<String, POStore>(); + Map<String, List<POStore>> uniqueOutputs = new HashMap<String, List<POStore>>(); for (POStore sto : stores) { POStoreTez store = (POStoreTez) sto; - uniqueOutputs.put(store.getOutputKey(), store); + List<POStore> stores = uniqueOutputs.get(store.getOutputKey()); + if (stores == null) { + stores = new ArrayList<POStore>(); + } + stores.add(store); + uniqueOutputs.put(store.getOutputKey(), stores); } - for (POStore sto : uniqueOutputs.values()) { + for (List<POStore> stores : uniqueOutputs.values()) { + POStore sto = stores.get(0); if (sto.isTmpStore()) { continue; } @@ -304,11 +311,16 @@ public class TezVertexStats extends JobS String filename = sto.getSFile().getFileName(); if (counters != null) { if (msGroup != null) { - Long n = msGroup.get(PigStatsUtil.getMultiStoreCounterName(sto)); - if (n != null) records = n; - } - if (records == -1) { - records = outputRecords; + long n = 0; + Long val = null; + for (POStore store : stores) { + val = msGroup.get(PigStatsUtil.getMultiStoreCounterName(store)); + // Tez removes 0 value counters for efficiency. + if (val != null) { + n += val; + }; + } + records = n; } if (isSuccessful() && records == -1) { // Tez removes 0 value counters for efficiency. @@ -338,13 +350,13 @@ public class TezVertexStats extends JobS @Override @Deprecated public int getNumberMaps() { - return this.isMapOpts ? numTasks : -1; + return this.isMapOpts ? numTasks : 0; } @Override @Deprecated public int getNumberReduces() { - return this.isMapOpts ? -1 : numTasks; + return this.isMapOpts ? 0 : numTasks; } @Override @@ -386,25 +398,25 @@ public class TezVertexStats extends JobS @Override @Deprecated public long getMapInputRecords() { - return this.isMapOpts ? numInputRecords : -1; + return this.isMapOpts ? numInputRecords : 0; } @Override @Deprecated public long getMapOutputRecords() { - return this.isMapOpts ? numOutputRecords : -1; + return this.isMapOpts ? numOutputRecords : 0; } @Override @Deprecated public long getReduceInputRecords() { - return this.isMapOpts ? -1 : numInputRecords; + return numReduceInputRecords; } @Override @Deprecated public long getReduceOutputRecords() { - return this.isMapOpts ? -1 : numOutputRecords; + return this.isMapOpts ? 0 : numOutputRecords; } @Override Modified: pig/branches/spark/src/pig-default.properties URL: http://svn.apache.org/viewvc/pig/branches/spark/src/pig-default.properties?rev=1783988&r1=1783987&r2=1783988&view=diff ============================================================================== --- pig/branches/spark/src/pig-default.properties (original) +++ pig/branches/spark/src/pig-default.properties Wed Feb 22 09:43:41 2017 @@ -61,4 +61,8 @@ pig.stats.output.size.reader.unsupported pig.tez.opt.union.unsupported.storefuncs=org.apache.hcatalog.pig.HCatStorer,org.apache.hive.hcatalog.pig.HCatStorer,org.apache.pig.piggybank.storage.DBStorage,org.apache.pig.piggybank.storage.MultiStorage -pig.sort.readonce.loadfuncs=org.apache.pig.backend.hadoop.hbase.HBaseStorage,org.apache.pig.backend.hadoop.accumulo.AccumuloStorage \ No newline at end of file +pig.sort.readonce.loadfuncs=org.apache.pig.backend.hadoop.hbase.HBaseStorage,org.apache.pig.backend.hadoop.accumulo.AccumuloStorage + +pig.ats.enabled=true + +pig.tez.configure.am.memory=true Added: pig/branches/spark/start-build-env.sh URL: http://svn.apache.org/viewvc/pig/branches/spark/start-build-env.sh?rev=1783988&view=auto ============================================================================== --- pig/branches/spark/start-build-env.sh (added) +++ pig/branches/spark/start-build-env.sh Wed Feb 22 09:43:41 2017 @@ -0,0 +1,63 @@ +#!/bin/bash + +# Licensed to the Apache Software Foundation (ASF) under one or more +# contributor license agreements. See the NOTICE file distributed with +# this work for additional information regarding copyright ownership. +# The ASF licenses this file to You under the Apache License, Version 2.0 +# (the "License"); you may not use this file except in compliance with +# the License. You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, +# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +# See the License for the specific language governing permissions and +# limitations under the License. + +set -e # exit on error + +cd "$(dirname "$0")" # connect to root + +docker build -t pig-build dev-support/docker + +if [ "$(uname -s)" == "Linux" ]; then + USER_NAME=${SUDO_USER:=${USER}} + USER_ID=$(id -u "${USER_NAME}") + GROUP_ID=$(id -g "${USER_NAME}") +else # boot2docker uid and gid + USER_NAME=${USER} + USER_ID=1000 + GROUP_ID=50 +fi + +docker build -t "pig-build-${USER_NAME}" - <<UserSpecificDocker +FROM pig-build +RUN bash configure-for-user.sh ${USER_NAME} ${USER_ID} ${GROUP_ID} "$(fgrep vboxsf /etc/group)" +UserSpecificDocker + +# By mapping the .m2 directory you can do an mvn install from +# within the container and use the result on your normal +# system. This also is a significant speedup in subsequent +# builds because the dependencies are downloaded only once. +# Same with the .ivy2 directory + +DOCKER="docker run --rm=true -t -i" +DOCKER=${DOCKER}" -u ${USER_NAME}" + +# Work in the current directory +DOCKER=${DOCKER}" -v ${PWD}:/home/${USER_NAME}/pig" +DOCKER=${DOCKER}" -w /home/${USER_NAME}/pig" + +# Mount persistent caching of 'large' downloads +DOCKER=${DOCKER}" -v ${HOME}/.m2:/home/${USER_NAME}/.m2" +DOCKER=${DOCKER}" -v ${HOME}/.ivy2:/home/${USER_NAME}/.ivy2" + +# What do we run? +DOCKER=${DOCKER}" --name pig-build-${USER_NAME}-$$" +DOCKER=${DOCKER}" pig-build-${USER_NAME}" +DOCKER=${DOCKER}" bash" + +# Now actually start it +${DOCKER} + Modified: pig/branches/spark/test/e2e/pig/build.xml URL: http://svn.apache.org/viewvc/pig/branches/spark/test/e2e/pig/build.xml?rev=1783988&r1=1783987&r2=1783988&view=diff ============================================================================== --- pig/branches/spark/test/e2e/pig/build.xml (original) +++ pig/branches/spark/test/e2e/pig/build.xml Wed Feb 22 09:43:41 2017 @@ -27,9 +27,8 @@ <property name="hive.lib.dir" value="${pig.base.dir}/build/ivy/lib/Pig"/> - <condition property="hive.hadoop.shims.version" value="0.23" else="0.20S"> - <equals arg1="${hadoopversion}" arg2="23" /> - </condition> + <property name="hadoopversion" value="2" /> + <property name="hive.hadoop.shims.version" value="0.23" /> <property name="mvnrepo" value="http://repo2.maven.org/maven2"/> @@ -61,6 +60,7 @@ <property name="harness.PH_LOCAL" value="."/> <property name="harness.PH_OUT" value="."/> <property name="harness.PERL5LIB" value="./libexec"/> + <property name="harness.user.home" value="/user/pig" /> <property name="test.location" value="${basedir}/testdist"/> <property name="benchmark.location" value="${test.location}/benchmarks"/> @@ -137,6 +137,7 @@ <path path="${test.location}/tests/multiquery.conf"/> <path path="${test.location}/tests/negative.conf"/> <path path="${test.location}/tests/nightly.conf"/> + <path path="${test.location}/tests/join.conf"/> <path path="${test.location}/tests/streaming.conf"/> <path path="${test.location}/tests/streaming_local.conf"/> <path path="${test.location}/tests/turing_jython.conf"/> @@ -309,6 +310,7 @@ <env key="PH_HIVE_LIB_DIR" value="${hive.lib.dir}"/> <env key="PH_HIVE_VERSION" value="${hive.version}"/> <env key="PH_HIVE_SHIMS_VERSION" value="${hive.hadoop.shims.version}"/> + <env key="PH_HDFS_BASE" value="${harness.user.home}" /> <env key="HARNESS_CONF" value="${harness.conf.file}"/> <env key="HADOOP_HOME" value="${harness.hadoop.home}"/> <env key="HADOOP_PREFIX" value="${HADOOP_PREFIX}"/> @@ -369,6 +371,7 @@ <env key="PH_CLUSTER_BIN" value="${harness.cluster.bin}"/> <env key="HARNESS_CONF" value="${harness.conf.file}"/> <env key="HADOOP_HOME" value="${harness.hadoop.home}"/> + <env key="PH_HDFS_BASE" value="${harness.user.home}" /> <arg value="./test_harness.pl"/> <arg value="-deploycfg"/> Modified: pig/branches/spark/test/e2e/pig/conf/spark.conf URL: http://svn.apache.org/viewvc/pig/branches/spark/test/e2e/pig/conf/spark.conf?rev=1783988&r1=1783987&r2=1783988&view=diff ============================================================================== --- pig/branches/spark/test/e2e/pig/conf/spark.conf (original) +++ pig/branches/spark/test/e2e/pig/conf/spark.conf Wed Feb 22 09:43:41 2017 @@ -30,8 +30,8 @@ my $hdfsBase = $ENV{PH_HDFS_BASE} || "/u $cfg = { #HDFS - 'inpathbase' => "$ENV{PH_ROOT}/data" - , 'outpathbase' => "$ENV{PH_ROOT}/testout" + 'inpathbase' => "$hdfsBase/test/data" + , 'outpathbase' => "$hdfsBase/out" #LOCAL , 'localinpathbase' => "$ENV{PH_LOCAL}/in" @@ -55,7 +55,7 @@ $cfg = { , 'hcatbin' => "$ENV{HCAT_BIN}" , 'usePython' => "$ENV{PIG_USE_PYTHON}" , 'exectype' => 'spark' - , 'benchmark_exectype' => 'local' + , 'benchmark_exectype' => 'mapred' #HADOOP , 'mapredjars' => "$ENV{PH_ROOT}/lib" Modified: pig/branches/spark/test/e2e/pig/deployers/ExistingClusterDeployer.pm URL: http://svn.apache.org/viewvc/pig/branches/spark/test/e2e/pig/deployers/ExistingClusterDeployer.pm?rev=1783988&r1=1783987&r2=1783988&view=diff ============================================================================== --- pig/branches/spark/test/e2e/pig/deployers/ExistingClusterDeployer.pm (original) +++ pig/branches/spark/test/e2e/pig/deployers/ExistingClusterDeployer.pm Wed Feb 22 09:43:41 2017 @@ -231,11 +231,6 @@ sub generateData 'rows' => 5000, 'hdfs' => "types/numbers.txt", }, { - 'name' => "biggish", - 'filetype' => "biggish", - 'rows' => 1000000, - 'hdfs' => "singlefile/biggish", - }, { 'name' => "prerank", 'filetype' => "ranking", 'rows' => 30, Modified: pig/branches/spark/test/e2e/pig/deployers/LocalDeployer.pm URL: http://svn.apache.org/viewvc/pig/branches/spark/test/e2e/pig/deployers/LocalDeployer.pm?rev=1783988&r1=1783987&r2=1783988&view=diff ============================================================================== --- pig/branches/spark/test/e2e/pig/deployers/LocalDeployer.pm (original) +++ pig/branches/spark/test/e2e/pig/deployers/LocalDeployer.pm Wed Feb 22 09:43:41 2017 @@ -209,11 +209,21 @@ sub generateData 'filetype' => "ranking", 'rows' => 30, 'outfile' => "singlefile/prerank", + }, { + 'name' => "utf8Voter", + 'filetype' => "utf8Voter", + 'rows' => 30, + 'outfile' => "utf8Data/éæ°/utf8Voter", + }, { + 'name' => "utf8Student", + 'filetype' => "utf8Student", + 'rows' => 300, + 'outfile' => "utf8Data/å¦ç/utf8Student", } ); # Create the target directories - for my $dir ("singlefile", "dir", "types", "glob/star/somegood", + for my $dir ("singlefile", "utf8Data/éæ°", "utf8Data/å¦ç", "dir", "types", "glob/star/somegood", "glob/star/moregood", "glob/star/bad") { my @cmd = ("mkdir", "-p", "$cfg->{'inpathbase'}/$dir"); $self->runCmd($log, \@cmd); Modified: pig/branches/spark/test/e2e/pig/drivers/TestDriverPig.pm URL: http://svn.apache.org/viewvc/pig/branches/spark/test/e2e/pig/drivers/TestDriverPig.pm?rev=1783988&r1=1783987&r2=1783988&view=diff ============================================================================== --- pig/branches/spark/test/e2e/pig/drivers/TestDriverPig.pm (original) +++ pig/branches/spark/test/e2e/pig/drivers/TestDriverPig.pm Wed Feb 22 09:43:41 2017 @@ -211,13 +211,6 @@ sub runTest $testCmd->{'pig'} = $testCmd->{'pig_win'}; } - if ( $testCmd->{'hadoopversion'} == '23' && $testCmd->{'pig23'}) { - $oldpig = $testCmd->{'pig'}; - $testCmd->{'pig'} = $testCmd->{'pig23'}; - } - if ( $testCmd->{'hadoopversion'} == '23' && $testCmd->{'expected_err_regex23'}) { - $testCmd->{'expected_err_regex'} = $testCmd->{'expected_err_regex23'}; - } my $res = $self->runPigCmdLine( $testCmd, $log, 1, $resources ); if ($oldpig) { $testCmd->{'pig'} = $oldpig; @@ -231,10 +224,6 @@ sub runTest $testCmd->{'pig'} = $testCmd->{'pig_win'}; } - if ( $testCmd->{'hadoopversion'} == '23' && $testCmd->{'pig23'}) { - $oldpig = $testCmd->{'pig'}; - $testCmd->{'pig'} = $testCmd->{'pig23'}; - } my $res = $self->runPig( $testCmd, $log, 1, $resources ); if ($oldpig) { $testCmd->{'pig'} = $oldpig; @@ -686,9 +675,6 @@ sub generateBenchmark if ((Util::isWindows()||Util::isCygwin()) && $testCmd->{'pig_win'}) { $modifiedTestCmd{'pig'} = $testCmd->{'pig_win'}; } - if ( $testCmd->{'hadoopversion'} == '23' && $testCmd->{'pig23'}) { - $modifiedTestCmd{'pig'} = $testCmd->{'pig23'}; - } # Change so we're looking at the old version of Pig if (defined $testCmd->{'oldpigpath'} && $testCmd->{'oldpigpath'} ne "") { $modifiedTestCmd{'pigpath'} = $testCmd->{'oldpigpath'}; @@ -1058,10 +1044,6 @@ sub wrongExecutionMode($$) } } - if (defined $testCmd->{'ignore23'} && $testCmd->{'hadoopversion'}=='23') { - $wrong = 1; - } - if ($wrong) { print $log "Skipping test $testCmd->{'group'}" . "_" . $testCmd->{'num'} . " since it is not suppsed to be run in hadoop 23\n"; Modified: pig/branches/spark/test/e2e/pig/streaming/PigStreaming.pl URL: http://svn.apache.org/viewvc/pig/branches/spark/test/e2e/pig/streaming/PigStreaming.pl?rev=1783988&r1=1783987&r2=1783988&view=diff ============================================================================== --- pig/branches/spark/test/e2e/pig/streaming/PigStreaming.pl (original) +++ pig/branches/spark/test/e2e/pig/streaming/PigStreaming.pl Wed Feb 22 09:43:41 2017 @@ -73,7 +73,7 @@ while (<$input_handle>) { chomp; $data = $_; - if (defined(%hash) && (exists $hash{$data})) + if (exists $hash{$data}) { print $output_handle "$hash{$data}\n"; } Modified: pig/branches/spark/test/e2e/pig/tests/grunt.conf URL: http://svn.apache.org/viewvc/pig/branches/spark/test/e2e/pig/tests/grunt.conf?rev=1783988&r1=1783987&r2=1783988&view=diff ============================================================================== --- pig/branches/spark/test/e2e/pig/tests/grunt.conf (original) +++ pig/branches/spark/test/e2e/pig/tests/grunt.conf Wed Feb 22 09:43:41 2017 @@ -46,7 +46,12 @@ $cfg = { 'execonly' => 'mapred,tez', # don't have a clue what their cwd will be for local mode 'expected_out_regex' => "/user", 'rc' => 0 - + },{ + 'num' => 3, + 'pig' => "ls .", + 'execonly' => 'mapred,tez', + 'expected_out_regex' => "/user", + 'rc' => 0 },{ 'num' => 4, 'pig' => "ls :INPATH:", @@ -77,21 +82,22 @@ $cfg = { 'rc' => 0 },{ 'num' => 10, - 'pig' => "cp :INPATH:/singlefile/studenttab10k . - ls .", + 'pig' => "mkdir :OUTPATH: + cp :INPATH:/singlefile/studenttab10k :OUTPATH: + ls :OUTPATH:", 'expected_out_regex' => ".*studenttab10k", 'rc' => 0 },{ 'num' => 11, - 'pig' => "cp :INPATH:/singlefile/studenttab10k ./fred - ls .", + 'pig' => "cp :INPATH:/singlefile/studenttab10k :OUTPATH:/fred + ls :OUTPATH:", 'expected_out_regex' => ".*fred", 'rc' => 0 },{ 'num' => 12, - 'pig' => "cp :INPATH:/singlefile/studenttab10k ./jim - mv ./jim ./bob - ls .", + 'pig' => "cp :INPATH:/singlefile/studenttab10k :OUTPATH:/jim + mv :OUTPATH:/jim :OUTPATH:/bob + ls :OUTPATH:", 'expected_out_regex' => ".*bob", 'rc' => 0 },{ @@ -103,18 +109,19 @@ $cfg = { },{ 'num' => 14, 'pig' => "copyToLocal :INPATH:/singlefile/votertab10k :TMP: - copyFromLocal :TMP:/votertab10k ./joe - cat ./joe", + copyFromLocal :TMP:/votertab10k :OUTPATH:/joe + cat :OUTPATH:/joe", 'expected_out_regex' => ":Grunt_14_output:", 'rc' => 0 },{ 'num' => 15, - 'pig' => "rm fred bob joe", - 'not_expected_out_regex' => "joe", + 'pig' => "cp :INPATH:/singlefile/studenttab10k :OUTPATH:/fred + rm :OUTPATH:/fred", + 'not_expected_out_regex' => "fred", 'rc' => 0 },{ 'num' => 16, - 'pig' => "rmf jill", + 'pig' => "rmf :OUTPATH:/jill", 'rc' => 0 } ] Modified: pig/branches/spark/test/e2e/pig/tests/hcat.conf URL: http://svn.apache.org/viewvc/pig/branches/spark/test/e2e/pig/tests/hcat.conf?rev=1783988&r1=1783987&r2=1783988&view=diff ============================================================================== --- pig/branches/spark/test/e2e/pig/tests/hcat.conf (original) +++ pig/branches/spark/test/e2e/pig/tests/hcat.conf Wed Feb 22 09:43:41 2017 @@ -44,7 +44,7 @@ stored as textfile;\, 'num' => 2, 'java_params' => ['-Dhcat.bin=:HCATBIN:'], 'pig' => q\a = load ':INPATH:/singlefile/studenttab10k' as (name, age, gpa); -SQL drop table if exists pig_hcat_ddl_1; +SQL drop table if exists pig_hcat_ddl_1 purge; sql create table pig_hcat_ddl_1(name string, age int, gpa double) @@ -55,6 +55,35 @@ store a into ':OUTPATH:';\, }, ] }, + { + 'name' => 'Jython_HCatDDL', + 'tests' => [ + { + # sql command + 'num' => 1 + ,'java_params' => ['-Dhcat.bin=:HCATBIN:'] + ,'pig' => q\#!/usr/bin/python +from org.apache.pig.scripting import Pig + +#create pig script + +Pig.sql("""sql drop table if exists pig_script_hcat_ddl_1;""") +ret = Pig.sql("""sql create table pig_script_hcat_ddl_1(name string, +age int, +gpa double) +stored as textfile; +""") + +if ret==0: + print "SQL command PASSED" + +else: + raise "SQL command FAILED" +\ + ,'rc' => 0 + }, + ] + }, ] } ; Added: pig/branches/spark/test/e2e/pig/tests/join.conf URL: http://svn.apache.org/viewvc/pig/branches/spark/test/e2e/pig/tests/join.conf?rev=1783988&view=auto ============================================================================== --- pig/branches/spark/test/e2e/pig/tests/join.conf (added) +++ pig/branches/spark/test/e2e/pig/tests/join.conf Wed Feb 22 09:43:41 2017 @@ -0,0 +1,310 @@ +#!/usr/bin/env perl +############################################################################ +# Licensed to the Apache Software Foundation (ASF) under one or more +# contributor license agreements. See the NOTICE file distributed with +# this work for additional information regarding copyright ownership. +# The ASF licenses this file to You under the Apache License, Version 2.0 +# (the "License"); you may not use this file except in compliance with +# the License. You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, +# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +# See the License for the specific language governing permissions and +# limitations under the License. + +############################################################################### + +$cfg = { + 'driver' => 'Pig', + + 'groups' => [ + { + 'name' => 'BloomJoin_Map', + 'execonly' => 'tez', + 'tests' => [ + { + # Tuple join key + 'num' => 1, + 'pig' => q\a = load ':INPATH:/singlefile/studentnulltab10k' as (name, age, gpa); +b = load ':INPATH:/singlefile/voternulltab10k' as (name, age, registration, contributions); +--c = filter a by age < 20; +--d = filter b by age < 20; +e = join a by (name, age), b by (name, age) using 'bloom'; +store e into ':OUTPATH:';\, + 'verify_pig_script' => q\a = load ':INPATH:/singlefile/studentnulltab10k' as (name, age, gpa); +b = load ':INPATH:/singlefile/voternulltab10k' as (name, age, registration, contributions); +--c = filter a by age < 20; +--d = filter b by age < 20; +e = join a by (name, age), b by (name, age); +store e into ':OUTPATH:';\, + }, + { + # bytearray join key + 'num' => 2, + 'pig' => q\ +SET mapreduce.input.fileinputformat.split.maxsize '50000'; +SET pig.splitCombination false; +a = load ':INPATH:/singlefile/studentnulltab10k' as (name, age, gpa); +b = load ':INPATH:/singlefile/voternulltab10k' as (name, age, registration, contributions); +c = filter a by age < 20; +d = filter b by age < 20; +e = join c by name, d by name using 'bloom'; +store e into ':OUTPATH:';\, + 'verify_pig_script' => q\a = load ':INPATH:/singlefile/studentnulltab10k' as (name, age, gpa); +b = load ':INPATH:/singlefile/voternulltab10k' as (name, age, registration, contributions); +c = filter a by age < 20; +d = filter b by age < 20; +e = join c by name, d by name; +store e into ':OUTPATH:';\, + }, + { + # Left outer join and chararray join key + 'num' => 3, + 'pig' => q\ +SET mapreduce.input.fileinputformat.split.maxsize '50000'; +SET pig.splitCombination false; +a = load ':INPATH:/singlefile/studentnulltab10k' as (name:chararray, age, gpa); +b = load ':INPATH:/singlefile/voternulltab10k' as (name:chararray, age, registration, contributions); +c = join a by name left, b by name using 'bloom'; +d = foreach c generate a::name, a::age, gpa, registration, contributions; +store d into ':OUTPATH:';\, + 'verify_pig_script' => q\a = load ':INPATH:/singlefile/studentnulltab10k' as (name:chararray, age, gpa); +b = load ':INPATH:/singlefile/voternulltab10k' as (name:chararray, age, registration, contributions); +c = join a by name left, b by name; +d = foreach c generate a::name, a::age, gpa, registration, contributions; +store d into ':OUTPATH:';\, + }, + { + # Right outer join + 'num' => 4, + 'pig' => q\a = load ':INPATH:/singlefile/studentnulltab10k' as (name:chararray, age:int, gpa); +b = load ':INPATH:/singlefile/voternulltab10k' as (name:chararray, age:int, registration, contributions); +c = join a by (name,age) right, b by (name,age) using 'bloom'; +store c into ':OUTPATH:';\, + 'verify_pig_script' => q\a = load ':INPATH:/singlefile/studentnulltab10k' as (name:chararray, age:int, gpa); +b = load ':INPATH:/singlefile/voternulltab10k' as (name:chararray, age:int, registration, contributions); +c = join a by (name,age) right, b by (name,age); +store c into ':OUTPATH:';\, + }, + { + # Left input from a union + 'num' => 5, + 'pig' => q\a = load ':INPATH:/singlefile/studenttab10k' as (name:chararray, age:int, gpa); +b = load ':INPATH:/singlefile/studentcolon10k' using PigStorage(':') as (name:chararray, age:int, gpa); +c = union a, b; +d = load ':INPATH:/singlefile/votertab10k' as (name, age, registration, contributions); +d = filter d by age > 60; +e = join c by name, d by name using 'bloom' PARALLEL 3; +store e into ':OUTPATH:';\, + 'verify_pig_script' => q\a = load ':INPATH:/singlefile/studenttab10k' as (name:chararray, age:int, gpa); +b = load ':INPATH:/singlefile/studentcolon10k' using PigStorage(':') as (name:chararray, age:int, gpa); +c = union a, b; +d = load ':INPATH:/singlefile/votertab10k' as (name, age, registration, contributions); +d = filter d by age > 60; +e = join c by name, d by name; +store e into ':OUTPATH:';\, + }, + { + # Right input from a union and integer join key + 'num' => 6, + 'pig' => q\a = load ':INPATH:/singlefile/studenttab10k' as (name:chararray, age:int, gpa); +b = load ':INPATH:/singlefile/studentcolon10k' using PigStorage(':') as (name:chararray, age:int, gpa); +c = union a, b; +c = filter c by age > 75; +d = load ':INPATH:/singlefile/votertab10k' as (name, age, registration, contributions); +e = join d by age, c by age using 'bloom' PARALLEL 3; +store e into ':OUTPATH:';\, + 'verify_pig_script' => q\a = load ':INPATH:/singlefile/studenttab10k' as (name:chararray, age:int, gpa); +b = load ':INPATH:/singlefile/studentcolon10k' using PigStorage(':') as (name:chararray, age:int, gpa); +c = union a, b; +c = filter c by age > 75; +d = load ':INPATH:/singlefile/votertab10k' as (name, age, registration, contributions); +e = join d by age, c by age; +store e into ':OUTPATH:';\, + }, + { + # Left input from a split + 'num' => 7, + 'pig' => q\a = load ':INPATH:/singlefile/studentnulltab10k' as (name, age:int, gpa); +b = load ':INPATH:/singlefile/voternulltab10k' as (name, age:int, registration, contributions); +b = filter b by age > 75; +c = filter a by age > 50; +d = join a by age, b by age using 'bloom'; +store c into ':OUTPATH:.1'; +store d into ':OUTPATH:.2';\, + 'verify_pig_script' => q\a = load ':INPATH:/singlefile/studentnulltab10k' as (name, age:int, gpa); +b = load ':INPATH:/singlefile/voternulltab10k' as (name, age:int, registration, contributions); +b = filter b by age > 75; +c = filter a by age > 50; +d = join a by age, b by age; +store c into ':OUTPATH:.1'; +store d into ':OUTPATH:.2';\, + }, + { + # Right input from a split + 'num' => 8, + 'pig' => q\a = load ':INPATH:/singlefile/studentnulltab10k' as (name, age:int, gpa); +b = load ':INPATH:/singlefile/voternulltab10k' as (name, age:int, registration, contributions); +c = filter a by age > 75; +d = filter a by name == 'nick miller'; +e = join b by age, c by age using 'bloom'; +store d into ':OUTPATH:.1'; +store e into ':OUTPATH:.2';\, + 'verify_pig_script' => q\a = load ':INPATH:/singlefile/studentnulltab10k' as (name, age:int, gpa); +b = load ':INPATH:/singlefile/voternulltab10k' as (name, age:int, registration, contributions); +c = filter a by age > 75; +d = filter a by name == 'nick miller'; +e = join b by age, c by age; +store d into ':OUTPATH:.1'; +store e into ':OUTPATH:.2';\, + }, + ] # end of tests + }, + { + 'name' => 'BloomJoin_Reduce', + 'execonly' => 'tez', + 'java_params' => ['-Dpig.bloomjoin.strategy=reduce'], + 'tests' => [ + { + # Tuple join key + 'num' => 1, + 'pig' => q\a = load ':INPATH:/singlefile/studentnulltab10k' as (name, age, gpa); +b = load ':INPATH:/singlefile/voternulltab10k' as (name, age, registration, contributions); +--c = filter a by age < 20; +--d = filter b by age < 20; +e = join a by (name, age), b by (name, age) using 'bloom'; +store e into ':OUTPATH:';\, + 'verify_pig_script' => q\a = load ':INPATH:/singlefile/studentnulltab10k' as (name, age, gpa); +b = load ':INPATH:/singlefile/voternulltab10k' as (name, age, registration, contributions); +--c = filter a by age < 20; +--d = filter b by age < 20; +e = join a by (name, age), b by (name, age); +store e into ':OUTPATH:';\, + }, + { + # bytearray join key + 'num' => 2, + 'pig' => q\ +SET mapreduce.input.fileinputformat.split.maxsize '50000'; +SET pig.splitCombination false; +a = load ':INPATH:/singlefile/studentnulltab10k' as (name, age, gpa); +b = load ':INPATH:/singlefile/voternulltab10k' as (name, age, registration, contributions); +c = filter a by age < 20; +d = filter b by age < 20; +e = join c by name, d by name using 'bloom'; +store e into ':OUTPATH:';\, + 'verify_pig_script' => q\a = load ':INPATH:/singlefile/studentnulltab10k' as (name, age, gpa); +b = load ':INPATH:/singlefile/voternulltab10k' as (name, age, registration, contributions); +c = filter a by age < 20; +d = filter b by age < 20; +e = join c by name, d by name; +store e into ':OUTPATH:';\, + }, + { + # Left outer join and chararray join key + 'num' => 3, + 'pig' => q\ +SET mapreduce.input.fileinputformat.split.maxsize '50000'; +SET pig.splitCombination false; +a = load ':INPATH:/singlefile/studentnulltab10k' as (name:chararray, age, gpa); +b = load ':INPATH:/singlefile/voternulltab10k' as (name:chararray, age, registration, contributions); +c = join a by name left, b by name using 'bloom'; +d = foreach c generate a::name, a::age, gpa, registration, contributions; +store d into ':OUTPATH:';\, + 'verify_pig_script' => q\a = load ':INPATH:/singlefile/studentnulltab10k' as (name:chararray, age, gpa); +b = load ':INPATH:/singlefile/voternulltab10k' as (name:chararray, age, registration, contributions); +c = join a by name left, b by name; +d = foreach c generate a::name, a::age, gpa, registration, contributions; +store d into ':OUTPATH:';\, + }, + { + # Right outer join + 'num' => 4, + 'pig' => q\a = load ':INPATH:/singlefile/studentnulltab10k' as (name:chararray, age:int, gpa); +b = load ':INPATH:/singlefile/voternulltab10k' as (name:chararray, age:int, registration, contributions); +c = join a by (name,age) right, b by (name,age) using 'bloom'; +store c into ':OUTPATH:';\, + 'verify_pig_script' => q\a = load ':INPATH:/singlefile/studentnulltab10k' as (name:chararray, age:int, gpa); +b = load ':INPATH:/singlefile/voternulltab10k' as (name:chararray, age:int, registration, contributions); +c = join a by (name,age) right, b by (name,age); +store c into ':OUTPATH:';\, + }, + { + # Left input from a union + 'num' => 5, + 'pig' => q\a = load ':INPATH:/singlefile/studenttab10k' as (name:chararray, age:int, gpa); +b = load ':INPATH:/singlefile/studentcolon10k' using PigStorage(':') as (name:chararray, age:int, gpa); +c = union a, b; +d = load ':INPATH:/singlefile/votertab10k' as (name, age, registration, contributions); +d = filter d by age > 60; +e = join c by name, d by name using 'bloom' PARALLEL 3; +store e into ':OUTPATH:';\, + 'verify_pig_script' => q\a = load ':INPATH:/singlefile/studenttab10k' as (name:chararray, age:int, gpa); +b = load ':INPATH:/singlefile/studentcolon10k' using PigStorage(':') as (name:chararray, age:int, gpa); +c = union a, b; +d = load ':INPATH:/singlefile/votertab10k' as (name, age, registration, contributions); +d = filter d by age > 60; +e = join c by name, d by name; +store e into ':OUTPATH:';\, + }, + { + # Right input from a union and integer join key + 'num' => 6, + 'pig' => q\a = load ':INPATH:/singlefile/studenttab10k' as (name:chararray, age:int, gpa); +b = load ':INPATH:/singlefile/studentcolon10k' using PigStorage(':') as (name:chararray, age:int, gpa); +c = union a, b; +c = filter c by age > 75; +d = load ':INPATH:/singlefile/votertab10k' as (name, age, registration, contributions); +e = join d by age, c by age using 'bloom' PARALLEL 3; +store e into ':OUTPATH:';\, + 'verify_pig_script' => q\a = load ':INPATH:/singlefile/studenttab10k' as (name:chararray, age:int, gpa); +b = load ':INPATH:/singlefile/studentcolon10k' using PigStorage(':') as (name:chararray, age:int, gpa); +c = union a, b; +c = filter c by age > 75; +d = load ':INPATH:/singlefile/votertab10k' as (name, age, registration, contributions); +e = join d by age, c by age; +store e into ':OUTPATH:';\, + }, + { + # Left input from a split + 'num' => 7, + 'pig' => q\a = load ':INPATH:/singlefile/studentnulltab10k' as (name, age:int, gpa); +b = load ':INPATH:/singlefile/voternulltab10k' as (name, age:int, registration, contributions); +b = filter b by age > 75; +c = filter a by age > 50; +d = join a by age, b by age using 'bloom'; +store c into ':OUTPATH:.1'; +store d into ':OUTPATH:.2';\, + 'verify_pig_script' => q\a = load ':INPATH:/singlefile/studentnulltab10k' as (name, age:int, gpa); +b = load ':INPATH:/singlefile/voternulltab10k' as (name, age:int, registration, contributions); +b = filter b by age > 75; +c = filter a by age > 50; +d = join a by age, b by age; +store c into ':OUTPATH:.1'; +store d into ':OUTPATH:.2';\, + }, + { + # Right input from a split + 'num' => 8, + 'pig' => q\a = load ':INPATH:/singlefile/studentnulltab10k' as (name, age:int, gpa); +b = load ':INPATH:/singlefile/voternulltab10k' as (name, age:int, registration, contributions); +c = filter a by age > 75; +d = filter a by name == 'nick miller'; +e = join b by age, c by age using 'bloom'; +store d into ':OUTPATH:.1'; +store e into ':OUTPATH:.2';\, + 'verify_pig_script' => q\a = load ':INPATH:/singlefile/studentnulltab10k' as (name, age:int, gpa); +b = load ':INPATH:/singlefile/voternulltab10k' as (name, age:int, registration, contributions); +c = filter a by age > 75; +d = filter a by name == 'nick miller'; +e = join b by age, c by age; +store d into ':OUTPATH:.1'; +store e into ':OUTPATH:.2';\, + }, + ] # end of tests + } + ] # end of groups +}; \ No newline at end of file Modified: pig/branches/spark/test/e2e/pig/tests/multiquery.conf URL: http://svn.apache.org/viewvc/pig/branches/spark/test/e2e/pig/tests/multiquery.conf?rev=1783988&r1=1783987&r2=1783988&view=diff ============================================================================== --- pig/branches/spark/test/e2e/pig/tests/multiquery.conf (original) +++ pig/branches/spark/test/e2e/pig/tests/multiquery.conf Wed Feb 22 09:43:41 2017 @@ -728,6 +728,52 @@ b = union a1, a2; c = rank b by name ASC, age DESC DENSE; store c into ':OUTPATH:';\, }, + { + # Union + Split + Two replicate join + 'num' => 12, + 'pig' => q\a = load ':INPATH:/singlefile/studentnulltab10k' as (name, age, gpa); +a1 = filter a by gpa is null or gpa <= 3.9; +a2 = filter a by gpa < 2; +b = union a1, a2; +c = load ':INPATH:/singlefile/voternulltab10k' as (name, age, registration, contributions); +c1 = filter c by age < 30; +c2 = filter c by age > 50; +d = join b by name, c1 by name using 'replicated'; +e = join d by b::name, c2 by name using 'replicated'; +store e into ':OUTPATH:';\, + }, + { + # Multiple Union + Multiple Split + Single store + 'num' => 13, + 'pig' => q\a = load ':INPATH:/singlefile/studentnulltab10k' as (name, age:int, gpa); +b = load ':INPATH:/singlefile/studenttab10k' as (name, age:int, gpa); +u1 = union onschema a, b; +SPLIT u1 INTO r IF age < 30, s OTHERWISE; +c = load ':INPATH:/singlefile/voternulltab10k' as (votername, voterage, registration, contributions); +d = JOIN r BY name LEFT, c BY votername; +u2 = UNION ONSCHEMA d, s; +e = FILTER u2 BY name == 'nick miller'; +f = FILTER u2 BY age > 70 ; +u3 = UNION ONSCHEMA e, f; +store u3 into ':OUTPATH:';\, + }, + { + # PIG-5082. Similar to MultiQuery_Union_13 but for non-store vertex group + 'num' => 14, + 'pig' => q\a = load ':INPATH:/singlefile/studentnulltab10k' as (name, age:int, gpa); +b = load ':INPATH:/singlefile/studenttab10k' as (name, age:int, gpa); +u1 = union onschema a, b; +SPLIT u1 INTO r IF age < 30, s OTHERWISE; +c = load ':INPATH:/singlefile/voternulltab10k' as (votername, voterage, registration, contributions); +d = JOIN r BY name LEFT, c BY votername; +u2 = UNION ONSCHEMA d, s; +e = FILTER u2 BY name == 'nick miller'; +f = FILTER u2 BY age > 70 ; +u3 = UNION ONSCHEMA e, f; +SPLIT u3 INTO t if age > 75, u OTHERWISE; +v = JOIN t BY name LEFT, c BY votername; +store v into ':OUTPATH:';\, + } ] # end of tests }, @@ -860,7 +906,38 @@ m = UNION e, i, j, n; n = JOIN a BY name, m BY name; store n into ':OUTPATH:';\, - } + }, + { + # Self join bloom left outer + 'num' => 12, + 'execonly' => 'tez', + 'pig' => q\a = load ':INPATH:/singlefile/studentnulltab10k' as (name, age, gpa); +b = filter a by gpa >= 3.9; +c = filter a by gpa > 3; +d = join b by name left outer, c by name using 'bloom'; +store d into ':OUTPATH:';\, + 'verify_pig_script' => q\a = load ':INPATH:/singlefile/studentnulltab10k' as (name, age, gpa); +b = filter a by gpa >= 3.9; +c = filter a by gpa > 3; +d = join b by name left outer, c by name; +store d into ':OUTPATH:';\, + }, + { + # Self join bloom left outer with strategy as reduce + 'num' => 13, + 'execonly' => 'tez', + 'java_params' => ['-Dpig.bloomjoin.strategy=reduce'], + 'pig' => q\a = load ':INPATH:/singlefile/studentnulltab10k' as (name, age, gpa); +b = filter a by gpa >= 3.9; +c = filter a by gpa > 3; +d = join b by name left outer, c by name using 'bloom'; +store d into ':OUTPATH:';\, + 'verify_pig_script' => q\a = load ':INPATH:/singlefile/studentnulltab10k' as (name, age, gpa); +b = filter a by gpa >= 3.9; +c = filter a by gpa > 3; +d = join b by name left outer, c by name; +store d into ':OUTPATH:';\, + }, ] # end of tests }, Modified: pig/branches/spark/test/e2e/pig/tests/negative.conf URL: http://svn.apache.org/viewvc/pig/branches/spark/test/e2e/pig/tests/negative.conf?rev=1783988&r1=1783987&r2=1783988&view=diff ============================================================================== --- pig/branches/spark/test/e2e/pig/tests/negative.conf (original) +++ pig/branches/spark/test/e2e/pig/tests/negative.conf Wed Feb 22 09:43:41 2017 @@ -473,7 +473,7 @@ define CMD `perl PigStreaming.pl` input( A = load ':INPATH:/singlefile/studenttab10k'; B = stream A through CMD; store B into ':OUTPATH:';\, - 'expected_err_regex' => "Error reading output from Streaming binary", + 'expected_err_regex' => "Error reading output from Streaming binary|Error while reading from POStream and passing it to the streaming process", }, { # Invalid serializer - throws exception @@ -568,24 +568,7 @@ store D into ':OUTPATH:';\, 'expected_err_regex' => "Could not resolve StringStoreBad using imports", }, ] - }, - { - 'name' => 'LineageErrors', - 'tests' => [ - { - # UDF returns a bytearray that is cast to an integer - 'num' => 1, - 'pig' => q\register :FUNCPATH:/testudf.jar; -a = load ':INPATH:/singlefile/studenttab10k' as (name, age, gpa); -b = filter a by name lt 'b'; -c = foreach b generate org.apache.pig.test.udf.evalfunc.CreateMap((chararray)name, age); -d = foreach c generate $0#'alice young'; -split d into e if $0 < 42, f if $0 >= 42; -store e into ':OUTPATH:';\, - 'expected_err_regex' => "Received a bytearray from the UDF or Union from two different Loaders. Cannot determine how to convert the bytearray to int", - }, - ] - } + } ] } ;
