Modified: pig/branches/spark/src/org/apache/pig/parser/RegisterResolver.java URL: http://svn.apache.org/viewvc/pig/branches/spark/src/org/apache/pig/parser/RegisterResolver.java?rev=1784224&r1=1784223&r2=1784224&view=diff ============================================================================== --- pig/branches/spark/src/org/apache/pig/parser/RegisterResolver.java (original) +++ pig/branches/spark/src/org/apache/pig/parser/RegisterResolver.java Fri Feb 24 03:34:37 2017 @@ -23,10 +23,6 @@ import java.net.URISyntaxException; import org.apache.pig.PigServer; import org.apache.pig.tools.DownloadResolver; -import org.apache.hadoop.conf.Configuration; -import org.apache.pig.backend.hadoop.datastorage.ConfigurationUtil; -import org.apache.pig.backend.hadoop.executionengine.shims.HadoopShims; -import org.apache.hadoop.fs.Path; public class RegisterResolver { @@ -70,24 +66,15 @@ public class RegisterResolver { String scheme = uri.getScheme(); if (scheme != null) { scheme = scheme.toLowerCase(); - if (scheme.equals("ivy")) { - DownloadResolver downloadResolver = DownloadResolver.getInstance(); - return downloadResolver.downloadArtifact(uri, pigServer); - } - if (!hasFileSystemImpl(uri)) { - throw new ParserException("Invalid Scheme: " + uri.getScheme()); - } } - return new URI[] { uri }; - } - - /** - * @param uri - * @return True if the uri has valid file system implementation - */ - private boolean hasFileSystemImpl(URI uri) { - Configuration conf = ConfigurationUtil.toConfiguration(pigServer.getPigContext().getProperties(), true); - return HadoopShims.hasFileSystemImpl(new Path(uri), conf); + if (scheme == null || scheme.equals("file") || scheme.equals("hdfs")) { + return new URI[] { uri }; + } else if (scheme.equals("ivy")) { + DownloadResolver downloadResolver = DownloadResolver.getInstance(); + return downloadResolver.downloadArtifact(uri, pigServer); + } else { + throw new ParserException("Invalid Scheme: " + uri.getScheme()); + } } /**
Modified: pig/branches/spark/src/org/apache/pig/parser/SourceLocation.java URL: http://svn.apache.org/viewvc/pig/branches/spark/src/org/apache/pig/parser/SourceLocation.java?rev=1784224&r1=1784223&r2=1784224&view=diff ============================================================================== --- pig/branches/spark/src/org/apache/pig/parser/SourceLocation.java (original) +++ pig/branches/spark/src/org/apache/pig/parser/SourceLocation.java Fri Feb 24 03:34:37 2017 @@ -75,11 +75,12 @@ public class SourceLocation { if (node != null) { InvocationPoint pt = node.getNextInvocationPoint(); while (pt != null) { + sb.append("\n"); sb.append("at expanding macro '" + pt.getMacro() + "' (" + pt.getFile() + ":" + pt.getLine() + ")"); pt = node.getNextInvocationPoint(); - sb.append("\n"); } + sb.append("\n"); } sb.append( "<" ); if( file != null && !file.isEmpty() ) Modified: pig/branches/spark/src/org/apache/pig/pen/LocalMapReduceSimulator.java URL: http://svn.apache.org/viewvc/pig/branches/spark/src/org/apache/pig/pen/LocalMapReduceSimulator.java?rev=1784224&r1=1784223&r2=1784224&view=diff ============================================================================== --- pig/branches/spark/src/org/apache/pig/pen/LocalMapReduceSimulator.java (original) +++ pig/branches/spark/src/org/apache/pig/pen/LocalMapReduceSimulator.java Fri Feb 24 03:34:37 2017 @@ -27,7 +27,6 @@ import java.util.Map; import org.apache.hadoop.conf.Configuration; import org.apache.hadoop.io.Text; import org.apache.hadoop.io.Writable; -import org.apache.hadoop.mapred.TaskID; import org.apache.hadoop.mapred.jobcontrol.Job; import org.apache.hadoop.mapred.jobcontrol.JobControl; import org.apache.hadoop.mapreduce.Mapper; @@ -36,7 +35,6 @@ import org.apache.pig.PigException; import org.apache.pig.backend.executionengine.ExecException; import org.apache.pig.backend.hadoop.datastorage.ConfigurationUtil; import org.apache.pig.backend.hadoop.executionengine.mapReduceLayer.JobControlCompiler; -import org.apache.pig.backend.hadoop.executionengine.mapReduceLayer.MRConfiguration; import org.apache.pig.backend.hadoop.executionengine.mapReduceLayer.MapReduceLauncher; import org.apache.pig.backend.hadoop.executionengine.mapReduceLayer.MapReduceOper; import org.apache.pig.backend.hadoop.executionengine.mapReduceLayer.PigMapBase; @@ -77,9 +75,9 @@ import org.apache.pig.pen.util.LineageTr * */ public class LocalMapReduceSimulator { - + private MapReduceLauncher launcher = new MapReduceLauncher(); - + private Map<PhysicalOperator, PhysicalOperator> phyToMRMap = new HashMap<PhysicalOperator, PhysicalOperator>();; @SuppressWarnings("unchecked") @@ -90,12 +88,12 @@ public class LocalMapReduceSimulator { PigContext pc) throws PigException, IOException, InterruptedException { phyToMRMap.clear(); MROperPlan mrp = launcher.compile(php, pc); - + ConfigurationValidator.validatePigProperties(pc.getProperties()); Configuration conf = ConfigurationUtil.toConfiguration(pc.getProperties()); - + JobControlCompiler jcc = new JobControlCompiler(pc, conf); - + JobControl jc; int numMRJobsCompl = 0; DataBag input; @@ -108,8 +106,6 @@ public class LocalMapReduceSimulator { boolean needFileInput; final ArrayList<OperatorKey> emptyInpTargets = new ArrayList<OperatorKey>(); pc.getProperties().setProperty("pig.illustrating", "true"); - String jtIdentifier = "" + System.currentTimeMillis(); - int jobId = 0; while(mrp.size() != 0) { jc = jcc.compile(mrp, "Illustrator"); if(jc == null) { @@ -117,7 +113,6 @@ public class LocalMapReduceSimulator { } List<Job> jobs = jc.getWaitingJobs(); for (Job job : jobs) { - jobId++; jobConf = job.getJobConf(); FileLocalizer.setInitialized(false); ArrayList<ArrayList<OperatorKey>> inpTargets = @@ -128,14 +123,14 @@ public class LocalMapReduceSimulator { PigSplit split = null; List<POStore> stores = null; PhysicalOperator pack = null; - // revisit as there are new physical operators from MR compilation + // revisit as there are new physical operators from MR compilation if (!mro.mapPlan.isEmpty()) attacher.revisit(mro.mapPlan); if (!mro.reducePlan.isEmpty()) { attacher.revisit(mro.reducePlan); pack = mro.reducePlan.getRoots().get(0); } - + List<POLoad> lds = PlanHelper.getPhysicalOperators(mro.mapPlan, POLoad.class); if (!mro.mapPlan.isEmpty()) { stores = PlanHelper.getPhysicalOperators(mro.mapPlan, POStore.class); @@ -150,10 +145,10 @@ public class LocalMapReduceSimulator { for (POStore store : stores) { output.put(store.getSFile().getFileName(), attacher.getDataMap().get(store)); } - + OutputAttacher oa = new OutputAttacher(mro.mapPlan, output); oa.visit(); - + if (!mro.reducePlan.isEmpty()) { oa = new OutputAttacher(mro.reducePlan, output); oa.visit(); @@ -173,7 +168,6 @@ public class LocalMapReduceSimulator { if (input != null) mro.mapPlan.remove(ld); } - int mapTaskId = 0; for (POLoad ld : lds) { // check newly generated data first input = output.get(ld.getLFile().getFileName()); @@ -186,7 +180,7 @@ public class LocalMapReduceSimulator { break; } } - } + } } needFileInput = (input == null); split = new PigSplit(null, index, needFileInput ? emptyInpTargets : inpTargets.get(index), 0); @@ -205,7 +199,6 @@ public class LocalMapReduceSimulator { context = ((PigMapReduceCounter.PigMapCounter) map).getIllustratorContext(jobConf, input, intermediateData, split); } ((PigMapBase) map).setMapPlan(mro.mapPlan); - context.getConfiguration().set(MRConfiguration.TASK_ID, new TaskID(jtIdentifier, jobId, true, mapTaskId++).toString()); map.run(context); } else { if ("true".equals(jobConf.get("pig.usercomparator"))) @@ -217,11 +210,10 @@ public class LocalMapReduceSimulator { Mapper<Text, Tuple, PigNullableWritable, Writable>.Context context = ((PigMapBase) map) .getIllustratorContext(jobConf, input, intermediateData, split); ((PigMapBase) map).setMapPlan(mro.mapPlan); - context.getConfiguration().set(MRConfiguration.TASK_ID, new TaskID(jtIdentifier, jobId, true, mapTaskId++).toString()); map.run(context); } } - + if (!mro.reducePlan.isEmpty()) { if (pack instanceof POPackage) @@ -241,20 +233,19 @@ public class LocalMapReduceSimulator { } ((PigMapReduce.Reduce) reduce).setReducePlan(mro.reducePlan); - context.getConfiguration().set(MRConfiguration.TASK_ID, new TaskID(jtIdentifier, jobId, false, 0).toString()); reduce.run(context); } for (PhysicalOperator key : mro.phyToMRMap.keySet()) for (PhysicalOperator value : mro.phyToMRMap.get(key)) phyToMRMap.put(key, value); } - - + + int removedMROp = jcc.updateMROpPlan(new LinkedList<Job>()); - + numMRJobsCompl += removedMROp; } - + jcc.reset(); } @@ -265,7 +256,7 @@ public class LocalMapReduceSimulator { plan)); this.outputBuffer = output; } - + @Override public void visitUserFunc(POUserFunc userFunc) throws VisitorException { if (userFunc.getFunc() != null && userFunc.getFunc() instanceof ReadScalars) { Modified: pig/branches/spark/src/org/apache/pig/scripting/ScriptEngine.java URL: http://svn.apache.org/viewvc/pig/branches/spark/src/org/apache/pig/scripting/ScriptEngine.java?rev=1784224&r1=1784223&r2=1784224&view=diff ============================================================================== --- pig/branches/spark/src/org/apache/pig/scripting/ScriptEngine.java (original) +++ pig/branches/spark/src/org/apache/pig/scripting/ScriptEngine.java Fri Feb 24 03:34:37 2017 @@ -38,7 +38,6 @@ import java.util.regex.Pattern; import org.apache.hadoop.util.Shell; import org.apache.pig.backend.executionengine.ExecException; import org.apache.pig.impl.PigContext; -import org.apache.pig.impl.util.UDFContext; import org.apache.pig.tools.pigstats.PigStats; /** @@ -128,9 +127,7 @@ public abstract class ScriptEngine { //protected static InputStream getScriptAsStream(String scriptPath) { InputStream is = null; File file = new File(scriptPath); - // In the frontend give preference to the local file. - // In the backend, try the jar first - if (UDFContext.getUDFContext().isFrontend() && file.exists()) { + if (file.exists()) { try { is = new FileInputStream(file); } catch (FileNotFoundException e) { @@ -159,14 +156,7 @@ public abstract class ScriptEngine { } } } - if (is == null && file.exists()) { - try { - is = new FileInputStream(file); - } catch (FileNotFoundException e) { - throw new IllegalStateException("could not find existing file "+scriptPath, e); - } - } - + // TODO: discuss if we want to add logic here to load a script from HDFS if (is == null) { Modified: pig/branches/spark/src/org/apache/pig/scripting/js/JsFunction.java URL: http://svn.apache.org/viewvc/pig/branches/spark/src/org/apache/pig/scripting/js/JsFunction.java?rev=1784224&r1=1784223&r2=1784224&view=diff ============================================================================== --- pig/branches/spark/src/org/apache/pig/scripting/js/JsFunction.java (original) +++ pig/branches/spark/src/org/apache/pig/scripting/js/JsFunction.java Fri Feb 24 03:34:37 2017 @@ -95,7 +95,7 @@ public class JsFunction extends EvalFunc private void debugConvertPigToJS(int depth, String pigType, Object value, Schema schema) { if (LOG.isDebugEnabled()) { - LOG.debug(indent(depth)+"converting from Pig " + pigType + " " + toString(value) + " using " + stringify(schema)); + LOG.debug(indent(depth)+"converting from Pig " + pigType + " " + value + " using " + stringify(schema)); } } Modified: pig/branches/spark/src/org/apache/pig/scripting/jython/JythonFunction.java URL: http://svn.apache.org/viewvc/pig/branches/spark/src/org/apache/pig/scripting/jython/JythonFunction.java?rev=1784224&r1=1784223&r2=1784224&view=diff ============================================================================== --- pig/branches/spark/src/org/apache/pig/scripting/jython/JythonFunction.java (original) +++ pig/branches/spark/src/org/apache/pig/scripting/jython/JythonFunction.java Fri Feb 24 03:34:37 2017 @@ -54,7 +54,7 @@ public class JythonFunction extends Eval try { f = JythonScriptEngine.getFunction(filename, functionName); this.function = f; - num_parameters = ((PyBaseCode) f.__code__).co_argcount; + num_parameters = ((PyBaseCode) f.func_code).co_argcount; PyObject outputSchemaDef = f.__findattr__("outputSchema".intern()); if (outputSchemaDef != null) { this.schema = Utils.getSchemaFromString(outputSchemaDef.toString()); @@ -105,7 +105,7 @@ public class JythonFunction extends Eval @Override public Object exec(Tuple tuple) throws IOException { try { - if (tuple == null || (num_parameters == 0 && !((PyTableCode)function.__code__).varargs)) { + if (tuple == null || (num_parameters == 0 && !((PyTableCode)function.func_code).varargs)) { // ignore input tuple PyObject out = function.__call__(); return JythonUtils.pythonToPig(out); Modified: pig/branches/spark/src/org/apache/pig/tools/DownloadResolver.java URL: http://svn.apache.org/viewvc/pig/branches/spark/src/org/apache/pig/tools/DownloadResolver.java?rev=1784224&r1=1784223&r2=1784224&view=diff ============================================================================== --- pig/branches/spark/src/org/apache/pig/tools/DownloadResolver.java (original) +++ pig/branches/spark/src/org/apache/pig/tools/DownloadResolver.java Fri Feb 24 03:34:37 2017 @@ -44,6 +44,8 @@ public class DownloadResolver { private static DownloadResolver downloadResolver = new DownloadResolver(); private DownloadResolver() { + System.setProperty("groovy.grape.report.downloads", "true"); + if (System.getProperty("grape.config") != null) { LOG.info("Using ivysettings file from " + System.getProperty("grape.config")); } else { Modified: pig/branches/spark/src/org/apache/pig/tools/grunt/Grunt.java URL: http://svn.apache.org/viewvc/pig/branches/spark/src/org/apache/pig/tools/grunt/Grunt.java?rev=1784224&r1=1784223&r2=1784224&view=diff ============================================================================== --- pig/branches/spark/src/org/apache/pig/tools/grunt/Grunt.java (original) +++ pig/branches/spark/src/org/apache/pig/tools/grunt/Grunt.java Fri Feb 24 03:34:37 2017 @@ -20,7 +20,7 @@ package org.apache.pig.tools.grunt; import java.io.BufferedReader; import java.util.ArrayList; -import jline.console.ConsoleReader; +import jline.ConsoleReader; import org.apache.commons.logging.Log; import org.apache.commons.logging.LogFactory; @@ -52,8 +52,8 @@ public class Grunt public void setConsoleReader(ConsoleReader c) { - c.addCompleter(new PigCompletorAliases(pig)); - c.addCompleter(new PigCompletor()); + c.addCompletor(new PigCompletorAliases(pig)); + c.addCompletor(new PigCompletor()); parser.setConsoleReader(c); } Modified: pig/branches/spark/src/org/apache/pig/tools/grunt/GruntParser.java URL: http://svn.apache.org/viewvc/pig/branches/spark/src/org/apache/pig/tools/grunt/GruntParser.java?rev=1784224&r1=1784223&r2=1784224&view=diff ============================================================================== --- pig/branches/spark/src/org/apache/pig/tools/grunt/GruntParser.java (original) +++ pig/branches/spark/src/org/apache/pig/tools/grunt/GruntParser.java Fri Feb 24 03:34:37 2017 @@ -26,6 +26,7 @@ import java.io.FileReader; import java.io.IOException; import java.io.InputStream; import java.io.InputStreamReader; +import java.io.OutputStreamWriter; import java.io.PrintStream; import java.io.Reader; import java.io.StringReader; @@ -41,7 +42,8 @@ import java.util.Map; import java.util.Properties; import java.util.Set; -import jline.console.ConsoleReader; +import jline.ConsoleReader; +import jline.ConsoleReaderInputStream; import org.apache.commons.io.output.NullOutputStream; import org.apache.commons.logging.Log; @@ -262,7 +264,7 @@ public class GruntParser extends PigScri public void prompt() { if (mInteractive) { - mConsoleReader.setPrompt("grunt> "); + mConsoleReader.setDefaultPrompt("grunt> "); } } @@ -514,13 +516,8 @@ public class GruntParser extends PigScri ConsoleReader reader; boolean interactive; - PigContext pc = mPigServer.getPigContext(); - - if( !loadOnly ) { - pc.getPreprocessorContext().paramScopePush(); - } - pc.setParams(params); - pc.setParamFiles(files); + mPigServer.getPigContext().setParams(params); + mPigServer.getPigContext().setParamFiles(files); try { FetchFileRet fetchFile = FileLocalizer.fetchFile(mConf, script); @@ -531,7 +528,7 @@ public class GruntParser extends PigScri cmds = cmds.replaceAll("\t"," "); reader = new ConsoleReader(new ByteArrayInputStream(cmds.getBytes()), - System.out); + new OutputStreamWriter(System.out)); reader.setHistory(mConsoleReader.getHistory()); InputStream in = new ConsoleReaderInputStream(reader); inputReader = new BufferedReader(new InputStreamReader(in)); @@ -563,9 +560,6 @@ public class GruntParser extends PigScri if (interactive) { System.out.println(""); } - if( ! loadOnly ) { - pc.getPreprocessorContext().paramScopePop(); - } } @Override Modified: pig/branches/spark/src/org/apache/pig/tools/grunt/PigCompletor.java URL: http://svn.apache.org/viewvc/pig/branches/spark/src/org/apache/pig/tools/grunt/PigCompletor.java?rev=1784224&r1=1784223&r2=1784224&view=diff ============================================================================== --- pig/branches/spark/src/org/apache/pig/tools/grunt/PigCompletor.java (original) +++ pig/branches/spark/src/org/apache/pig/tools/grunt/PigCompletor.java Fri Feb 24 03:34:37 2017 @@ -33,9 +33,9 @@ import java.util.TreeSet; import org.apache.commons.logging.Log; import org.apache.commons.logging.LogFactory; -import jline.console.completer.Completer; +import jline.Completor; -public class PigCompletor implements Completer { +public class PigCompletor implements Completor { private final Log log = LogFactory.getLog(getClass()); Set<String> candidates; static final String AUTOCOMPLETE_FILENAME = "autocomplete"; Modified: pig/branches/spark/src/org/apache/pig/tools/grunt/PigCompletorAliases.java URL: http://svn.apache.org/viewvc/pig/branches/spark/src/org/apache/pig/tools/grunt/PigCompletorAliases.java?rev=1784224&r1=1784223&r2=1784224&view=diff ============================================================================== --- pig/branches/spark/src/org/apache/pig/tools/grunt/PigCompletorAliases.java (original) +++ pig/branches/spark/src/org/apache/pig/tools/grunt/PigCompletorAliases.java Fri Feb 24 03:34:37 2017 @@ -26,11 +26,12 @@ import java.util.TreeSet; import org.apache.commons.logging.Log; import org.apache.commons.logging.LogFactory; + import org.apache.pig.PigServer; -import jline.console.completer.Completer; +import jline.Completor; -public class PigCompletorAliases implements Completer { +public class PigCompletorAliases implements Completor { private final Log log = LogFactory.getLog(getClass()); Set<String> keywords; PigServer pig; Modified: pig/branches/spark/src/org/apache/pig/tools/parameters/PigFileParser.jj URL: http://svn.apache.org/viewvc/pig/branches/spark/src/org/apache/pig/tools/parameters/PigFileParser.jj?rev=1784224&r1=1784223&r2=1784224&view=diff ============================================================================== --- pig/branches/spark/src/org/apache/pig/tools/parameters/PigFileParser.jj (original) +++ pig/branches/spark/src/org/apache/pig/tools/parameters/PigFileParser.jj Fri Feb 24 03:34:37 2017 @@ -259,11 +259,8 @@ TOKEN : <PIGDEFAULT: "%default" > } - TOKEN : { - <REGISTER: "register"> : IN_REGISTER - | <IDENTIFIER: (<SPECIALCHAR>)*<LETTER>(<DIGIT> | <LETTER> | <SPECIALCHAR>)*> | <LITERAL: ("\"" ((~["\""])*("\\\"")?)* "\"")|("'" ((~["'"])*("\\\'")?)* "'") > @@ -279,14 +276,7 @@ TOKEN : <OTHER: (~["\"" , "'" , "`" , "a"-"z" , "A"-"Z" , "_" , "#" , "=" , " " , "\n" , "\t" , "\r", "%", "/", "-", "$"])+ > | <NOT_OTHER_CHAR: ["\"" , "'" , "`" , "a"-"z" , "A"-"Z" , "_" , "#" , "=" , " " , "\n" , "\t" , "\r", "%", "/", "-", "$"] > -} - -<IN_REGISTER> MORE : { " " | "\t" | "\r" | "\n"} - -<IN_REGISTER> TOKEN: { - <PATH: (~["(", ")", ";", "\r", " ", "\t", "\n"])+> { - matchedToken.image = image.toString(); - }: DEFAULT + } void Parse() throws IOException : {} @@ -298,7 +288,6 @@ void input() throws IOException : { String s; Token strTok = null; - Token strTok2 = null; } { strTok = <PIG> @@ -319,20 +308,6 @@ void input() throws IOException : { pc.validate(strTok.toString()); } ) | - strTok = <REGISTER> - strTok2 = <PATH> {} - { - // Adding a special case for register since it handles "/*" globbing - // and this conflicts with general multi-line comment "/* */". - // See the comment above on OTHERS on how tokenizer matches the longest - // match. Here, string next to "register" is treated as PATH TOKEN - // and therefore not consider "/*" as part of the comment - // (and avoid the longest match problem). - out.append(strTok.image); - String sub_line = pc.substitute(strTok2.image); - out.append(sub_line); - } - | s = paramString(){} { //process an ordinary pig line - perform substitution Modified: pig/branches/spark/src/org/apache/pig/tools/parameters/PreprocessorContext.java URL: http://svn.apache.org/viewvc/pig/branches/spark/src/org/apache/pig/tools/parameters/PreprocessorContext.java?rev=1784224&r1=1784223&r2=1784224&view=diff ============================================================================== --- pig/branches/spark/src/org/apache/pig/tools/parameters/PreprocessorContext.java (original) +++ pig/branches/spark/src/org/apache/pig/tools/parameters/PreprocessorContext.java Fri Feb 24 03:34:37 2017 @@ -27,8 +27,6 @@ import java.io.FileReader; import java.io.IOException; import java.io.InputStreamReader; import java.io.StringReader; -import java.util.ArrayDeque; -import java.util.Deque; import java.util.Hashtable; import java.util.List; import java.util.Map; @@ -42,26 +40,20 @@ import org.apache.pig.impl.PigContext; import org.apache.pig.impl.logicalLayer.FrontendException; import org.apache.pig.validator.BlackAndWhitelistFilter; import org.apache.pig.validator.PigCommandFilter; +import org.python.google.common.base.Preconditions; public class PreprocessorContext { - private int tableinitsize = 10; - private Deque<Map<String,String>> param_val_stack; + private Map<String, String> param_val; + // used internally to detect when a param is set multiple times, + // but it set with the same value so it's ok not to log a warning + private Map<String, String> param_source; + private PigContext pigContext; public Map<String, String> getParamVal() { - Map <String, String> ret = new Hashtable <String, String>(tableinitsize); - - //stack (deque) iterates LIFO - for (Map <String, String> map : param_val_stack ) { - for (Map.Entry<String, String> entry : map.entrySet()) { - if( ! ret.containsKey(entry.getKey()) ) { - ret.put(entry.getKey(), entry.getValue()); - } - } - } - return ret; + return param_val; } private final Log log = LogFactory.getLog(getClass()); @@ -71,15 +63,24 @@ public class PreprocessorContext { * smaller number only impacts performance */ public PreprocessorContext(int limit) { - tableinitsize = limit; - param_val_stack = new ArrayDeque<Map<String,String>> (); - param_val_stack.push(new Hashtable<String, String> (tableinitsize)); + param_val = new Hashtable<String, String> (limit); + param_source = new Hashtable<String, String> (limit); + } + + public PreprocessorContext(Map<String, String> paramVal) { + param_val = paramVal; + param_source = new Hashtable<String, String>(paramVal); } public void setPigContext(PigContext context) { this.pigContext = context; } + /* + public void processLiteral(String key, String val) { + processLiteral(key, val, true); + } */ + /** * This method generates parameter value by running specified command * @@ -101,35 +102,20 @@ public class PreprocessorContext { processOrdLine(key, val, true); } - public void paramScopePush() { - param_val_stack.push( new Hashtable<String, String> (tableinitsize) ); - } - - public void paramScopePop() { - param_val_stack.pop(); - } - - public boolean paramval_containsKey(String key) { - for (Map <String, String> map : param_val_stack ) { - if( map.containsKey(key) ) { - return true; - } - } - return false; - } + /* + public void processLiteral(String key, String val, Boolean overwrite) { - public String paramval_get(String key) { - for (Map <String, String> map : param_val_stack ) { - if( map.containsKey(key) ) { - return map.get(key); + if (param_val.containsKey(key)) { + if (overwrite) { + log.warn("Warning : Multiple values found for " + key + ". Using value " + val); + } else { + return; } } - return null; - } - public void paramval_put(String key, String value) { - param_val_stack.peek().put(key, value); - } + String sub_val = substitute(val); + param_val.put(key, sub_val); + } */ /** * This method generates parameter value by running specified command @@ -143,21 +129,21 @@ public class PreprocessorContext { filter.validate(PigCommandFilter.Command.SH); } - if (paramval_containsKey(key) && !overwrite) { - return; + if (param_val.containsKey(key)) { + if (param_source.get(key).equals(val) || !overwrite) { + return; + } else { + log.warn("Warning : Multiple values found for " + key + + ". Using value " + val); + } } + param_source.put(key, val); + val = val.substring(1, val.length()-1); //to remove the backticks String sub_val = substitute(val); sub_val = executeShellCommand(sub_val); - - if (paramval_containsKey(key) && !paramval_get(key).equals(sub_val) ) { - //(boolean overwrite is always true here) - log.warn("Warning : Multiple values found for " + key + " command `" + val + "`. " - + "Previous value " + paramval_get(key) + ", now using value " + sub_val); - } - - paramval_put(key, sub_val); + param_val.put(key, sub_val); } public void validate(String preprocessorCmd) throws FrontendException { @@ -189,18 +175,18 @@ public class PreprocessorContext { */ public void processOrdLine(String key, String val, Boolean overwrite) throws ParameterSubstitutionException { - String sub_val = substitute(val, key); - if (paramval_containsKey(key)) { - if (paramval_get(key).equals(sub_val) || !overwrite) { + if (param_val.containsKey(key)) { + if (param_source.get(key).equals(val) || !overwrite) { return; } else { - log.warn("Warning : Multiple values found for " + key - + ". Previous value " + paramval_get(key) - + ", now using value " + sub_val); + log.warn("Warning : Multiple values found for " + key + ". Using value " + val); } } - paramval_put(key, sub_val); + param_source.put(key, val); + + String sub_val = substitute(val, key); + param_val.put(key, sub_val); } @@ -332,7 +318,7 @@ public class PreprocessorContext { while (bracketKeyMatcher.find()) { if ( (bracketKeyMatcher.start() == 0) || (line.charAt( bracketKeyMatcher.start() - 1)) != '\\' ) { key = bracketKeyMatcher.group(1); - if (!(paramval_containsKey(key))) { + if (!(param_val.containsKey(key))) { String message; if (parentKey == null) { message = "Undefined parameter : " + key; @@ -341,7 +327,7 @@ public class PreprocessorContext { } throw new ParameterSubstitutionException(message); } - val = paramval_get(key); + val = param_val.get(key); if (val.contains("$")) { val = val.replaceAll("(?<!\\\\)\\$", "\\\\\\$"); } @@ -359,7 +345,7 @@ public class PreprocessorContext { // for escaped vars of the form \$<id> if ( (keyMatcher.start() == 0) || (line.charAt( keyMatcher.start() - 1)) != '\\' ) { key = keyMatcher.group(1); - if (!(paramval_containsKey(key))) { + if (!(param_val.containsKey(key))) { String message; if (parentKey == null) { message = "Undefined parameter : " + key; @@ -368,7 +354,7 @@ public class PreprocessorContext { } throw new ParameterSubstitutionException(message); } - val = paramval_get(key); + val = param_val.get(key); if (val.contains("$")) { val = val.replaceAll("(?<!\\\\)\\$", "\\\\\\$"); } Modified: pig/branches/spark/src/org/apache/pig/tools/pigscript/parser/PigScriptParser.jj URL: http://svn.apache.org/viewvc/pig/branches/spark/src/org/apache/pig/tools/pigscript/parser/PigScriptParser.jj?rev=1784224&r1=1784223&r2=1784224&view=diff ============================================================================== --- pig/branches/spark/src/org/apache/pig/tools/pigscript/parser/PigScriptParser.jj (original) +++ pig/branches/spark/src/org/apache/pig/tools/pigscript/parser/PigScriptParser.jj Fri Feb 24 03:34:37 2017 @@ -23,7 +23,6 @@ options { STATIC = false; // Case is ignored in keywords IGNORE_CASE = true; - // DEBUG_PARSER = true; JAVA_UNICODE_ESCAPE = true; } @@ -37,7 +36,7 @@ import java.util.List; import java.util.ArrayList; import org.apache.pig.impl.util.StringUtils; -import jline.console.ConsoleReader; +import jline.ConsoleReader; public abstract class PigScriptParser { @@ -218,7 +217,7 @@ TOKEN_MGR_DECLS : { { /*System.err.print(">> "); System.err.flush();*/ - consoleReader.setPrompt(">> "); + consoleReader.setDefaultPrompt(">> "); } } @@ -268,7 +267,7 @@ TOKEN_MGR_DECLS : { <"'"> {prevState = PIG_START;} : IN_STRING | <"`"> {prevState = PIG_START;} : IN_COMMAND | <(" " | "\t")+["A","a"]["S","s"](" " | "\t")+ > {prevState = PIG_START;} : SCHEMA_DEFINITION -| <(" " | "\t")+["G","g"]["E","e"]["N","n"]["E","e"]["R","r"]["A","a"]["T","t"]["E","e"](" " | "\t" | "\r" | "\n")+ > {prevState = PIG_START;} : GENERATE +| <(" " | "\t")+["G","g"]["E","e"]["N","n"]["E","e"]["R","r"]["A","a"]["T","t"]["E","e"](" " | "\t")+ > {prevState = PIG_START;} : GENERATE | <"{"> {pigBlockLevel = 1;} : IN_BLOCK | <"}"> {if (true) throw new TokenMgrError("Unmatched '}'", TokenMgrError.LEXICAL_ERROR);} | <";"> : PIG_END @@ -293,8 +292,7 @@ TOKEN_MGR_DECLS : { <IN_STRING> MORE : { - <"\\\\"> -| <"\\'"> + <"\\'"> | <"'"> { SwitchTo(prevState);} | <("\n" | "\r" | "\r\n")> {secondary_prompt();} | <(~[])> @@ -397,7 +395,7 @@ TOKEN_MGR_DECLS : { { <"\""> {prevState = IN_BLOCK;} : IN_DOUBLE_QUOTED_STRING | <(" " | "\t")+["A","a"]["S","s"](" " | "\t")+ > {prevState = IN_BLOCK;} : SCHEMA_DEFINITION -| <(" " | "\t")+["G","g"]["E","e"]["N","n"]["E","e"]["R","r"]["A","a"]["T","t"]["E","e"](" " | "\t" | "\r" | "\n")+> {prevState = IN_BLOCK;} : GENERATE +| <(" " | "\t")+["G","g"]["E","e"]["N","n"]["E","e"]["R","r"]["A","a"]["T","t"]["E","e"](" " | "\t")+> {prevState = IN_BLOCK;} : GENERATE | <"{"> {pigBlockLevel++;} | <"}"(";")?> {pigBlockLevel--; if (pigBlockLevel == 0) SwitchTo(PIG_END);} | <"'"> {prevState = IN_BLOCK;} : IN_STRING Modified: pig/branches/spark/src/org/apache/pig/tools/pigstats/EmbeddedPigStats.java URL: http://svn.apache.org/viewvc/pig/branches/spark/src/org/apache/pig/tools/pigstats/EmbeddedPigStats.java?rev=1784224&r1=1784223&r2=1784224&view=diff ============================================================================== --- pig/branches/spark/src/org/apache/pig/tools/pigstats/EmbeddedPigStats.java (original) +++ pig/branches/spark/src/org/apache/pig/tools/pigstats/EmbeddedPigStats.java Fri Feb 24 03:34:37 2017 @@ -147,11 +147,6 @@ final class EmbeddedPigStats extends Pig } @Override - public String getDisplayString() { - return null; - } - - @Override public long getProactiveSpillCountObjects() { throw new UnsupportedOperationException(); } Modified: pig/branches/spark/src/org/apache/pig/tools/pigstats/EmptyPigStats.java URL: http://svn.apache.org/viewvc/pig/branches/spark/src/org/apache/pig/tools/pigstats/EmptyPigStats.java?rev=1784224&r1=1784223&r2=1784224&view=diff ============================================================================== --- pig/branches/spark/src/org/apache/pig/tools/pigstats/EmptyPigStats.java (original) +++ pig/branches/spark/src/org/apache/pig/tools/pigstats/EmptyPigStats.java Fri Feb 24 03:34:37 2017 @@ -87,11 +87,6 @@ public class EmptyPigStats extends PigSt } @Override - public String getDisplayString() { - return null; - } - - @Override public JobGraph getJobGraph() { return emptyJobPlan; } Modified: pig/branches/spark/src/org/apache/pig/tools/pigstats/PigStats.java URL: http://svn.apache.org/viewvc/pig/branches/spark/src/org/apache/pig/tools/pigstats/PigStats.java?rev=1784224&r1=1784223&r2=1784224&view=diff ============================================================================== --- pig/branches/spark/src/org/apache/pig/tools/pigstats/PigStats.java (original) +++ pig/branches/spark/src/org/apache/pig/tools/pigstats/PigStats.java Fri Feb 24 03:34:37 2017 @@ -134,11 +134,6 @@ public abstract class PigStats { } /** - * Returns the display message in pig grunt - */ - public abstract String getDisplayString(); - - /** * Returns the DAG of jobs spawned by the script */ public JobGraph getJobGraph() { @@ -270,13 +265,6 @@ public abstract class PigStats { return ScriptState.get().getPigVersion(); } - /** - * Returns the contents of the script that was run. - */ - public String getScript() { - return ScriptState.get().getScript(); - } - public String getScriptId() { return ScriptState.get().getId(); } Modified: pig/branches/spark/src/org/apache/pig/tools/pigstats/PigStatsUtil.java URL: http://svn.apache.org/viewvc/pig/branches/spark/src/org/apache/pig/tools/pigstats/PigStatsUtil.java?rev=1784224&r1=1784223&r2=1784224&view=diff ============================================================================== --- pig/branches/spark/src/org/apache/pig/tools/pigstats/PigStatsUtil.java (original) +++ pig/branches/spark/src/org/apache/pig/tools/pigstats/PigStatsUtil.java Fri Feb 24 03:34:37 2017 @@ -24,7 +24,7 @@ import java.util.regex.Matcher; import java.util.regex.Pattern; import org.apache.pig.backend.hadoop.executionengine.physicalLayer.relationalOperators.POStore; -import org.apache.pig.tools.pigstats.mapreduce.MRPigStatsUtil; +import org.apache.pig.backend.hadoop.executionengine.shims.HadoopShims; import org.apache.pig.tools.pigstats.mapreduce.SimplePigStats; /** @@ -71,7 +71,7 @@ public class PigStatsUtil { */ @Deprecated public static final String FS_COUNTER_GROUP - = MRPigStatsUtil.FS_COUNTER_GROUP; + = HadoopShims.getFsCounterGroupName(); /** * Returns an empty PigStats object Use of this method is not advised as it Modified: pig/branches/spark/src/org/apache/pig/tools/pigstats/ScriptState.java URL: http://svn.apache.org/viewvc/pig/branches/spark/src/org/apache/pig/tools/pigstats/ScriptState.java?rev=1784224&r1=1784223&r2=1784224&view=diff ============================================================================== --- pig/branches/spark/src/org/apache/pig/tools/pigstats/ScriptState.java (original) +++ pig/branches/spark/src/org/apache/pig/tools/pigstats/ScriptState.java Fri Feb 24 03:34:37 2017 @@ -133,8 +133,6 @@ public abstract class ScriptState { MERGE_SPARSE_JOIN, REPLICATED_JOIN, SKEWED_JOIN, - BUILD_BLOOM, - FILTER_BLOOM, HASH_JOIN, COLLECTED_GROUP, MERGE_COGROUP, @@ -314,7 +312,7 @@ public abstract class ScriptState { maxScriptSize = Integer.valueOf(prop); } } - + this.truncatedScript = (script.length() > maxScriptSize) ? script.substring(0, maxScriptSize) : script; @@ -487,10 +485,6 @@ public abstract class ScriptState { public void visit(LOJoin op) { if (op.getJoinType() == JOINTYPE.HASH) { feature.set(PIG_FEATURE.HASH_JOIN.ordinal()); - } else if (op.getJoinType() == JOINTYPE.BLOOM) { - feature.set(PIG_FEATURE.HASH_JOIN.ordinal()); - feature.set(PIG_FEATURE.BUILD_BLOOM.ordinal()); - feature.set(PIG_FEATURE.FILTER_BLOOM.ordinal()); } else if (op.getJoinType() == JOINTYPE.MERGE) { feature.set(PIG_FEATURE.MERGE_JOIN.ordinal()); } else if (op.getJoinType() == JOINTYPE.MERGESPARSE) { @@ -512,7 +506,6 @@ public abstract class ScriptState { feature.set(PIG_FEATURE.RANK.ordinal()); } - @Override public void visit(LOSort op) { feature.set(PIG_FEATURE.ORDER_BY.ordinal()); } Modified: pig/branches/spark/src/org/apache/pig/tools/pigstats/mapreduce/MRJobStats.java URL: http://svn.apache.org/viewvc/pig/branches/spark/src/org/apache/pig/tools/pigstats/mapreduce/MRJobStats.java?rev=1784224&r1=1784223&r2=1784224&view=diff ============================================================================== --- pig/branches/spark/src/org/apache/pig/tools/pigstats/mapreduce/MRJobStats.java (original) +++ pig/branches/spark/src/org/apache/pig/tools/pigstats/mapreduce/MRJobStats.java Fri Feb 24 03:34:37 2017 @@ -32,16 +32,15 @@ import org.apache.hadoop.conf.Configurat import org.apache.hadoop.mapred.Counters; import org.apache.hadoop.mapred.Counters.Counter; import org.apache.hadoop.mapred.JobID; -import org.apache.hadoop.mapreduce.Cluster; -import org.apache.hadoop.mapreduce.TaskReport; +import org.apache.hadoop.mapred.TaskReport; import org.apache.hadoop.mapred.jobcontrol.Job; import org.apache.hadoop.mapreduce.TaskType; -import org.apache.pig.PigConfiguration; import org.apache.pig.PigCounters; import org.apache.pig.backend.hadoop.executionengine.mapReduceLayer.JobControlCompiler; import org.apache.pig.backend.hadoop.executionengine.mapReduceLayer.MRConfiguration; import org.apache.pig.backend.hadoop.executionengine.mapReduceLayer.MapReduceOper; import org.apache.pig.backend.hadoop.executionengine.physicalLayer.relationalOperators.POStore; +import org.apache.pig.backend.hadoop.executionengine.shims.HadoopShims; import org.apache.pig.classification.InterfaceAudience; import org.apache.pig.classification.InterfaceStability; import org.apache.pig.impl.io.FileSpec; @@ -54,8 +53,6 @@ import org.apache.pig.tools.pigstats.Out import org.apache.pig.tools.pigstats.PigStats.JobGraph; import org.apache.pig.tools.pigstats.PigStats.JobGraphPrinter; -import org.python.google.common.collect.Lists; - /** * This class encapsulates the runtime statistics of a MapReduce job. @@ -284,7 +281,7 @@ public final class MRJobStats extends Jo void addCounters(Job job) { try { - counters = getCounters(job); + counters = HadoopShims.getCounters(job); } catch (IOException e) { LOG.warn("Unable to get job counters", e); } @@ -352,13 +349,13 @@ public final class MRJobStats extends Jo void addMapReduceStatistics(Job job) { Iterator<TaskReport> maps = null; try { - maps = getTaskReports(job, TaskType.MAP); + maps = HadoopShims.getTaskReports(job, TaskType.MAP); } catch (IOException e) { LOG.warn("Failed to get map task report", e); } Iterator<TaskReport> reduces = null; try { - reduces = getTaskReports(job, TaskType.REDUCE); + reduces = HadoopShims.getTaskReports(job, TaskType.REDUCE); } catch (IOException e) { LOG.warn("Failed to get reduce task report", e); } @@ -518,35 +515,4 @@ public final class MRJobStats extends Jo inputs.add(is); } - public static Iterator<TaskReport> getTaskReports(Job job, TaskType type) throws IOException { - if (job.getJobConf().getBoolean(PigConfiguration.PIG_NO_TASK_REPORT, false)) { - LOG.info("TaskReports are disabled for job: " + job.getAssignedJobID()); - return null; - } - Cluster cluster = new Cluster(job.getJobConf()); - try { - org.apache.hadoop.mapreduce.Job mrJob = cluster.getJob(job.getAssignedJobID()); - if (mrJob == null) { // In local mode, mrJob will be null - mrJob = job.getJob(); - } - org.apache.hadoop.mapreduce.TaskReport[] reports = mrJob.getTaskReports(type); - return Lists.newArrayList(reports).iterator(); - } catch (InterruptedException ir) { - throw new IOException(ir); - } - } - - public static Counters getCounters(Job job) throws IOException { - try { - Cluster cluster = new Cluster(job.getJobConf()); - org.apache.hadoop.mapreduce.Job mrJob = cluster.getJob(job.getAssignedJobID()); - if (mrJob == null) { // In local mode, mrJob will be null - mrJob = job.getJob(); - } - return new Counters(mrJob.getCounters()); - } catch (Exception ir) { - throw new IOException(ir); - } - } - } Modified: pig/branches/spark/src/org/apache/pig/tools/pigstats/mapreduce/MRPigStatsUtil.java URL: http://svn.apache.org/viewvc/pig/branches/spark/src/org/apache/pig/tools/pigstats/mapreduce/MRPigStatsUtil.java?rev=1784224&r1=1784223&r2=1784224&view=diff ============================================================================== --- pig/branches/spark/src/org/apache/pig/tools/pigstats/mapreduce/MRPigStatsUtil.java (original) +++ pig/branches/spark/src/org/apache/pig/tools/pigstats/mapreduce/MRPigStatsUtil.java Fri Feb 24 03:34:37 2017 @@ -33,6 +33,7 @@ import org.apache.pig.backend.hadoop.exe import org.apache.pig.backend.hadoop.executionengine.mapReduceLayer.MapReduceOper; import org.apache.pig.backend.hadoop.executionengine.mapReduceLayer.NativeMapReduceOper; import org.apache.pig.backend.hadoop.executionengine.mapReduceLayer.plans.MROperPlan; +import org.apache.pig.backend.hadoop.executionengine.shims.HadoopShims; import org.apache.pig.classification.InterfaceAudience.Private; import org.apache.pig.impl.PigContext; import org.apache.pig.tools.pigstats.JobStats; @@ -50,7 +51,7 @@ public class MRPigStatsUtil extends PigS public static final String TASK_COUNTER_GROUP = "org.apache.hadoop.mapred.Task$Counter"; public static final String FS_COUNTER_GROUP - = "org.apache.hadoop.mapreduce.FileSystemCounter"; + = HadoopShims.getFsCounterGroupName(); private static final Log LOG = LogFactory.getLog(MRPigStatsUtil.class); Modified: pig/branches/spark/src/org/apache/pig/tools/pigstats/mapreduce/SimplePigStats.java URL: http://svn.apache.org/viewvc/pig/branches/spark/src/org/apache/pig/tools/pigstats/mapreduce/SimplePigStats.java?rev=1784224&r1=1784223&r2=1784224&view=diff ============================================================================== --- pig/branches/spark/src/org/apache/pig/tools/pigstats/mapreduce/SimplePigStats.java (original) +++ pig/branches/spark/src/org/apache/pig/tools/pigstats/mapreduce/SimplePigStats.java Fri Feb 24 03:34:37 2017 @@ -207,18 +207,13 @@ public final class SimplePigStats extend } void display() { - LOG.info(getDisplayString()); - } - - @Override - public String getDisplayString() { if (returnCode == ReturnCode.UNKNOWN) { LOG.warn("unknown return code, can't display the results"); - return ""; + return; } if (pigContext == null) { LOG.warn("unknown exec type, don't display the results"); - return ""; + return; } SimpleDateFormat sdf = new SimpleDateFormat(DATE_FORMAT); @@ -281,7 +276,7 @@ public final class SimplePigStats extend sb.append("\nJob DAG:\n").append(jobPlan.toString()); - return "Script Statistics: \n" + sb.toString(); + LOG.info("Script Statistics: \n" + sb.toString()); } void mapMROperToJob(MapReduceOper mro, Job job) { Modified: pig/branches/spark/src/org/apache/pig/tools/pigstats/spark/SparkPigStats.java URL: http://svn.apache.org/viewvc/pig/branches/spark/src/org/apache/pig/tools/pigstats/spark/SparkPigStats.java?rev=1784224&r1=1784223&r2=1784224&view=diff ============================================================================== --- pig/branches/spark/src/org/apache/pig/tools/pigstats/spark/SparkPigStats.java (original) +++ pig/branches/spark/src/org/apache/pig/tools/pigstats/spark/SparkPigStats.java Fri Feb 24 03:34:37 2017 @@ -23,6 +23,7 @@ import java.util.Iterator; import java.util.List; import java.util.Map; import java.util.Set; +import java.util.Properties; import org.apache.commons.logging.Log; import org.apache.commons.logging.LogFactory; @@ -39,6 +40,7 @@ import org.apache.pig.impl.PigContext; import org.apache.pig.impl.plan.VisitorException; import org.apache.pig.tools.pigstats.InputStats; import org.apache.pig.tools.pigstats.JobStats; +import org.apache.pig.tools.pigstats.OutputStats; import org.apache.pig.tools.pigstats.PigStats; import org.apache.pig.tools.pigstats.ScriptState; import org.apache.spark.api.java.JavaSparkContext; @@ -113,35 +115,29 @@ public class SparkPigStats extends PigSt } private void display() { - LOG.info(getDisplayString()); - } - - public String getDisplayString() { - StringBuilder sb = new StringBuilder(); Iterator<JobStats> iter = jobPlan.iterator(); while (iter.hasNext()) { - SparkJobStats js = (SparkJobStats) iter.next(); + SparkJobStats js = (SparkJobStats)iter.next(); if (jobSparkOperatorMap.containsKey(js)) { SparkOperator sparkOperator = jobSparkOperatorMap.get(js); js.setAlias(sparkOperator); } - sb.append("Spark Job [" + js.getJobId() + "] Metrics"); + LOG.info( "Spark Job [" + js.getJobId() + "] Metrics"); Map<String, Long> stats = js.getStats(); if (stats == null) { - sb.append("No statistics found for job " + js.getJobId()); - return sb.toString(); + LOG.info("No statistics found for job " + js.getJobId()); + return; } Iterator statIt = stats.entrySet().iterator(); while (statIt.hasNext()) { - Map.Entry pairs = (Map.Entry) statIt.next(); - sb.append("\t" + pairs.getKey() + " : " + pairs.getValue()); + Map.Entry pairs = (Map.Entry)statIt.next(); + LOG.info("\t" + pairs.getKey() + " : " + pairs.getValue()); } - for (InputStats inputStat : js.getInputs()) { - sb.append("\t" + inputStat.getDisplayString()); + for (InputStats inputStat : js.getInputs()){ + LOG.info("\t"+inputStat.getDisplayString()); } } - return sb.toString(); } @Override @@ -221,4 +217,4 @@ public class SparkPigStats extends PigSt sparkOperatorsSet.add(sparkOperator); } -} \ No newline at end of file +} Modified: pig/branches/spark/src/org/apache/pig/tools/pigstats/spark/SparkStatsUtil.java URL: http://svn.apache.org/viewvc/pig/branches/spark/src/org/apache/pig/tools/pigstats/spark/SparkStatsUtil.java?rev=1784224&r1=1784223&r2=1784224&view=diff ============================================================================== --- pig/branches/spark/src/org/apache/pig/tools/pigstats/spark/SparkStatsUtil.java (original) +++ pig/branches/spark/src/org/apache/pig/tools/pigstats/spark/SparkStatsUtil.java Fri Feb 24 03:34:37 2017 @@ -18,8 +18,12 @@ package org.apache.pig.tools.pigstats.spark; -import org.apache.hadoop.mapred.JobConf; +import java.util.List; + +import org.apache.pig.backend.hadoop.executionengine.physicalLayer.PhysicalOperator; +import org.apache.pig.backend.hadoop.executionengine.physicalLayer.plans.PhysicalPlan; import org.apache.pig.backend.hadoop.executionengine.physicalLayer.relationalOperators.POLoad; +import org.apache.pig.backend.hadoop.executionengine.physicalLayer.relationalOperators.POSplit; import org.apache.pig.backend.hadoop.executionengine.physicalLayer.relationalOperators.POStore; import org.apache.pig.backend.hadoop.executionengine.spark.JobMetricsListener; import org.apache.pig.backend.hadoop.executionengine.spark.operator.NativeSparkOperator; @@ -97,7 +101,21 @@ public class SparkStatsUtil { public static long getLoadSparkCounterValue(POLoad load) { SparkPigStatusReporter reporter = SparkPigStatusReporter.getInstance(); - return reporter.getCounters().getValue(SPARK_INPUT_COUNTER_GROUP, getLoadSparkCounterName(load)); + int loadersCount = countCoLoadsIfInSplit(load,load.getParentPlan()); + return reporter.getCounters().getValue(SPARK_INPUT_COUNTER_GROUP, getLoadSparkCounterName(load))/loadersCount; + } + + private static int countCoLoadsIfInSplit(PhysicalOperator op, PhysicalPlan pp){ + List<PhysicalOperator> successors = pp.getSuccessors(op); + if (successors == null || successors.size()==0) return 1; + for (PhysicalOperator successor : successors){ + if (successor instanceof POSplit){ + return ((POSplit)successor).getPlans().size(); + }else{ + return countCoLoadsIfInSplit(successor,pp); + } + } + return 1; } public static boolean isJobSuccess(int jobID, Modified: pig/branches/spark/src/org/apache/pig/tools/pigstats/tez/TezDAGStats.java URL: http://svn.apache.org/viewvc/pig/branches/spark/src/org/apache/pig/tools/pigstats/tez/TezDAGStats.java?rev=1784224&r1=1784223&r2=1784224&view=diff ============================================================================== --- pig/branches/spark/src/org/apache/pig/tools/pigstats/tez/TezDAGStats.java (original) +++ pig/branches/spark/src/org/apache/pig/tools/pigstats/tez/TezDAGStats.java Fri Feb 24 03:34:37 2017 @@ -245,11 +245,7 @@ public class TezDAGStats extends JobStat OutputStats existingOut = outputsByLocation.get(output.getLocation()); // In case of multistore, bytesWritten is already calculated // from size of all the files in the output directory. - // So use that if there is a combination of multistore and single store - if (output.getPOStore().isMultiStore()) { - existingOut.setBytes(output.getBytes()); - existingOut.setPOStore(output.getPOStore()); - } else if (!existingOut.getPOStore().isMultiStore() && output.getBytes() > -1) { + if (!output.getPOStore().isMultiStore() && output.getBytes() > -1) { long bytes = existingOut.getBytes() > -1 ? (existingOut.getBytes() + output.getBytes()) : output.getBytes(); Modified: pig/branches/spark/src/org/apache/pig/tools/pigstats/tez/TezPigScriptStats.java URL: http://svn.apache.org/viewvc/pig/branches/spark/src/org/apache/pig/tools/pigstats/tez/TezPigScriptStats.java?rev=1784224&r1=1784223&r2=1784224&view=diff ============================================================================== --- pig/branches/spark/src/org/apache/pig/tools/pigstats/tez/TezPigScriptStats.java (original) +++ pig/branches/spark/src/org/apache/pig/tools/pigstats/tez/TezPigScriptStats.java Fri Feb 24 03:34:37 2017 @@ -117,11 +117,6 @@ public class TezPigScriptStats extends P } private void display() { - LOG.info(getDisplayString()); - } - - @Override - public String getDisplayString() { SimpleDateFormat sdf = new SimpleDateFormat(DATE_FORMAT); StringBuilder sb = new StringBuilder(); sb.append("\n"); @@ -175,7 +170,7 @@ public class TezPigScriptStats extends P for (OutputStats os : getOutputStats()) { sb.append(os.getDisplayString().trim()).append("\n"); } - return "Script Statistics:\n" + sb.toString(); + LOG.info("Script Statistics:\n" + sb.toString()); } /** Modified: pig/branches/spark/src/org/apache/pig/tools/pigstats/tez/TezScriptState.java URL: http://svn.apache.org/viewvc/pig/branches/spark/src/org/apache/pig/tools/pigstats/tez/TezScriptState.java?rev=1784224&r1=1784223&r2=1784224&view=diff ============================================================================== --- pig/branches/spark/src/org/apache/pig/tools/pigstats/tez/TezScriptState.java (original) +++ pig/branches/spark/src/org/apache/pig/tools/pigstats/tez/TezScriptState.java Fri Feb 24 03:34:37 2017 @@ -275,12 +275,6 @@ public class TezScriptState extends Scri if (tezOp.isRegularJoin()) { feature.set(PIG_FEATURE.HASH_JOIN.ordinal()); } - if (tezOp.isBuildBloom()) { - feature.set(PIG_FEATURE.BUILD_BLOOM.ordinal()); - } - if (tezOp.isFilterBloom()) { - feature.set(PIG_FEATURE.FILTER_BLOOM.ordinal()); - } if (tezOp.isUnion()) { feature.set(PIG_FEATURE.UNION.ordinal()); } Modified: pig/branches/spark/src/org/apache/pig/tools/pigstats/tez/TezVertexStats.java URL: http://svn.apache.org/viewvc/pig/branches/spark/src/org/apache/pig/tools/pigstats/tez/TezVertexStats.java?rev=1784224&r1=1784223&r2=1784224&view=diff ============================================================================== --- pig/branches/spark/src/org/apache/pig/tools/pigstats/tez/TezVertexStats.java (original) +++ pig/branches/spark/src/org/apache/pig/tools/pigstats/tez/TezVertexStats.java Fri Feb 24 03:34:37 2017 @@ -22,7 +22,6 @@ import static org.apache.pig.tools.pigst import static org.apache.pig.tools.pigstats.tez.TezDAGStats.TASK_COUNTER_GROUP; import java.io.IOException; -import java.util.ArrayList; import java.util.Collections; import java.util.HashMap; import java.util.Iterator; @@ -290,19 +289,13 @@ public class TezVertexStats extends JobS } // Split followed by union will have multiple stores writing to same location - Map<String, List<POStore>> uniqueOutputs = new HashMap<String, List<POStore>>(); + Map<String, POStore> uniqueOutputs = new HashMap<String, POStore>(); for (POStore sto : stores) { POStoreTez store = (POStoreTez) sto; - List<POStore> stores = uniqueOutputs.get(store.getOutputKey()); - if (stores == null) { - stores = new ArrayList<POStore>(); - } - stores.add(store); - uniqueOutputs.put(store.getOutputKey(), stores); + uniqueOutputs.put(store.getOutputKey(), store); } - for (List<POStore> stores : uniqueOutputs.values()) { - POStore sto = stores.get(0); + for (POStore sto : uniqueOutputs.values()) { if (sto.isTmpStore()) { continue; } @@ -311,16 +304,11 @@ public class TezVertexStats extends JobS String filename = sto.getSFile().getFileName(); if (counters != null) { if (msGroup != null) { - long n = 0; - Long val = null; - for (POStore store : stores) { - val = msGroup.get(PigStatsUtil.getMultiStoreCounterName(store)); - // Tez removes 0 value counters for efficiency. - if (val != null) { - n += val; - }; - } - records = n; + Long n = msGroup.get(PigStatsUtil.getMultiStoreCounterName(sto)); + if (n != null) records = n; + } + if (records == -1) { + records = outputRecords; } if (isSuccessful() && records == -1) { // Tez removes 0 value counters for efficiency. @@ -350,13 +338,13 @@ public class TezVertexStats extends JobS @Override @Deprecated public int getNumberMaps() { - return this.isMapOpts ? numTasks : 0; + return this.isMapOpts ? numTasks : -1; } @Override @Deprecated public int getNumberReduces() { - return this.isMapOpts ? 0 : numTasks; + return this.isMapOpts ? -1 : numTasks; } @Override @@ -398,25 +386,25 @@ public class TezVertexStats extends JobS @Override @Deprecated public long getMapInputRecords() { - return this.isMapOpts ? numInputRecords : 0; + return this.isMapOpts ? numInputRecords : -1; } @Override @Deprecated public long getMapOutputRecords() { - return this.isMapOpts ? numOutputRecords : 0; + return this.isMapOpts ? numOutputRecords : -1; } @Override @Deprecated public long getReduceInputRecords() { - return numReduceInputRecords; + return this.isMapOpts ? -1 : numInputRecords; } @Override @Deprecated public long getReduceOutputRecords() { - return this.isMapOpts ? 0 : numOutputRecords; + return this.isMapOpts ? -1 : numOutputRecords; } @Override Modified: pig/branches/spark/src/pig-default.properties URL: http://svn.apache.org/viewvc/pig/branches/spark/src/pig-default.properties?rev=1784224&r1=1784223&r2=1784224&view=diff ============================================================================== --- pig/branches/spark/src/pig-default.properties (original) +++ pig/branches/spark/src/pig-default.properties Fri Feb 24 03:34:37 2017 @@ -61,8 +61,4 @@ pig.stats.output.size.reader.unsupported pig.tez.opt.union.unsupported.storefuncs=org.apache.hcatalog.pig.HCatStorer,org.apache.hive.hcatalog.pig.HCatStorer,org.apache.pig.piggybank.storage.DBStorage,org.apache.pig.piggybank.storage.MultiStorage -pig.sort.readonce.loadfuncs=org.apache.pig.backend.hadoop.hbase.HBaseStorage,org.apache.pig.backend.hadoop.accumulo.AccumuloStorage - -pig.ats.enabled=true - -pig.tez.configure.am.memory=true +pig.sort.readonce.loadfuncs=org.apache.pig.backend.hadoop.hbase.HBaseStorage,org.apache.pig.backend.hadoop.accumulo.AccumuloStorage \ No newline at end of file Modified: pig/branches/spark/test/e2e/pig/build.xml URL: http://svn.apache.org/viewvc/pig/branches/spark/test/e2e/pig/build.xml?rev=1784224&r1=1784223&r2=1784224&view=diff ============================================================================== --- pig/branches/spark/test/e2e/pig/build.xml (original) +++ pig/branches/spark/test/e2e/pig/build.xml Fri Feb 24 03:34:37 2017 @@ -27,8 +27,9 @@ <property name="hive.lib.dir" value="${pig.base.dir}/build/ivy/lib/Pig"/> - <property name="hadoopversion" value="2" /> - <property name="hive.hadoop.shims.version" value="0.23" /> + <condition property="hive.hadoop.shims.version" value="0.23" else="0.20S"> + <equals arg1="${hadoopversion}" arg2="23" /> + </condition> <property name="mvnrepo" value="http://repo2.maven.org/maven2"/> @@ -60,7 +61,6 @@ <property name="harness.PH_LOCAL" value="."/> <property name="harness.PH_OUT" value="."/> <property name="harness.PERL5LIB" value="./libexec"/> - <property name="harness.user.home" value="/user/pig" /> <property name="test.location" value="${basedir}/testdist"/> <property name="benchmark.location" value="${test.location}/benchmarks"/> @@ -137,7 +137,6 @@ <path path="${test.location}/tests/multiquery.conf"/> <path path="${test.location}/tests/negative.conf"/> <path path="${test.location}/tests/nightly.conf"/> - <path path="${test.location}/tests/join.conf"/> <path path="${test.location}/tests/streaming.conf"/> <path path="${test.location}/tests/streaming_local.conf"/> <path path="${test.location}/tests/turing_jython.conf"/> @@ -310,7 +309,6 @@ <env key="PH_HIVE_LIB_DIR" value="${hive.lib.dir}"/> <env key="PH_HIVE_VERSION" value="${hive.version}"/> <env key="PH_HIVE_SHIMS_VERSION" value="${hive.hadoop.shims.version}"/> - <env key="PH_HDFS_BASE" value="${harness.user.home}" /> <env key="HARNESS_CONF" value="${harness.conf.file}"/> <env key="HADOOP_HOME" value="${harness.hadoop.home}"/> <env key="HADOOP_PREFIX" value="${HADOOP_PREFIX}"/> @@ -371,7 +369,6 @@ <env key="PH_CLUSTER_BIN" value="${harness.cluster.bin}"/> <env key="HARNESS_CONF" value="${harness.conf.file}"/> <env key="HADOOP_HOME" value="${harness.hadoop.home}"/> - <env key="PH_HDFS_BASE" value="${harness.user.home}" /> <arg value="./test_harness.pl"/> <arg value="-deploycfg"/> Modified: pig/branches/spark/test/e2e/pig/conf/spark.conf URL: http://svn.apache.org/viewvc/pig/branches/spark/test/e2e/pig/conf/spark.conf?rev=1784224&r1=1784223&r2=1784224&view=diff ============================================================================== --- pig/branches/spark/test/e2e/pig/conf/spark.conf (original) +++ pig/branches/spark/test/e2e/pig/conf/spark.conf Fri Feb 24 03:34:37 2017 @@ -30,8 +30,8 @@ my $hdfsBase = $ENV{PH_HDFS_BASE} || "/u $cfg = { #HDFS - 'inpathbase' => "$hdfsBase/test/data" - , 'outpathbase' => "$hdfsBase/out" + 'inpathbase' => "$ENV{PH_ROOT}/data" + , 'outpathbase' => "$ENV{PH_ROOT}/testout" #LOCAL , 'localinpathbase' => "$ENV{PH_LOCAL}/in" @@ -55,7 +55,7 @@ $cfg = { , 'hcatbin' => "$ENV{HCAT_BIN}" , 'usePython' => "$ENV{PIG_USE_PYTHON}" , 'exectype' => 'spark' - , 'benchmark_exectype' => 'mapred' + , 'benchmark_exectype' => 'local' #HADOOP , 'mapredjars' => "$ENV{PH_ROOT}/lib" Modified: pig/branches/spark/test/e2e/pig/deployers/ExistingClusterDeployer.pm URL: http://svn.apache.org/viewvc/pig/branches/spark/test/e2e/pig/deployers/ExistingClusterDeployer.pm?rev=1784224&r1=1784223&r2=1784224&view=diff ============================================================================== --- pig/branches/spark/test/e2e/pig/deployers/ExistingClusterDeployer.pm (original) +++ pig/branches/spark/test/e2e/pig/deployers/ExistingClusterDeployer.pm Fri Feb 24 03:34:37 2017 @@ -231,6 +231,11 @@ sub generateData 'rows' => 5000, 'hdfs' => "types/numbers.txt", }, { + 'name' => "biggish", + 'filetype' => "biggish", + 'rows' => 1000000, + 'hdfs' => "singlefile/biggish", + }, { 'name' => "prerank", 'filetype' => "ranking", 'rows' => 30, Modified: pig/branches/spark/test/e2e/pig/deployers/LocalDeployer.pm URL: http://svn.apache.org/viewvc/pig/branches/spark/test/e2e/pig/deployers/LocalDeployer.pm?rev=1784224&r1=1784223&r2=1784224&view=diff ============================================================================== --- pig/branches/spark/test/e2e/pig/deployers/LocalDeployer.pm (original) +++ pig/branches/spark/test/e2e/pig/deployers/LocalDeployer.pm Fri Feb 24 03:34:37 2017 @@ -209,21 +209,11 @@ sub generateData 'filetype' => "ranking", 'rows' => 30, 'outfile' => "singlefile/prerank", - }, { - 'name' => "utf8Voter", - 'filetype' => "utf8Voter", - 'rows' => 30, - 'outfile' => "utf8Data/éæ°/utf8Voter", - }, { - 'name' => "utf8Student", - 'filetype' => "utf8Student", - 'rows' => 300, - 'outfile' => "utf8Data/å¦ç/utf8Student", } ); # Create the target directories - for my $dir ("singlefile", "utf8Data/éæ°", "utf8Data/å¦ç", "dir", "types", "glob/star/somegood", + for my $dir ("singlefile", "dir", "types", "glob/star/somegood", "glob/star/moregood", "glob/star/bad") { my @cmd = ("mkdir", "-p", "$cfg->{'inpathbase'}/$dir"); $self->runCmd($log, \@cmd); Modified: pig/branches/spark/test/e2e/pig/drivers/TestDriverPig.pm URL: http://svn.apache.org/viewvc/pig/branches/spark/test/e2e/pig/drivers/TestDriverPig.pm?rev=1784224&r1=1784223&r2=1784224&view=diff ============================================================================== --- pig/branches/spark/test/e2e/pig/drivers/TestDriverPig.pm (original) +++ pig/branches/spark/test/e2e/pig/drivers/TestDriverPig.pm Fri Feb 24 03:34:37 2017 @@ -211,6 +211,13 @@ sub runTest $testCmd->{'pig'} = $testCmd->{'pig_win'}; } + if ( $testCmd->{'hadoopversion'} == '23' && $testCmd->{'pig23'}) { + $oldpig = $testCmd->{'pig'}; + $testCmd->{'pig'} = $testCmd->{'pig23'}; + } + if ( $testCmd->{'hadoopversion'} == '23' && $testCmd->{'expected_err_regex23'}) { + $testCmd->{'expected_err_regex'} = $testCmd->{'expected_err_regex23'}; + } my $res = $self->runPigCmdLine( $testCmd, $log, 1, $resources ); if ($oldpig) { $testCmd->{'pig'} = $oldpig; @@ -224,6 +231,10 @@ sub runTest $testCmd->{'pig'} = $testCmd->{'pig_win'}; } + if ( $testCmd->{'hadoopversion'} == '23' && $testCmd->{'pig23'}) { + $oldpig = $testCmd->{'pig'}; + $testCmd->{'pig'} = $testCmd->{'pig23'}; + } my $res = $self->runPig( $testCmd, $log, 1, $resources ); if ($oldpig) { $testCmd->{'pig'} = $oldpig; @@ -675,6 +686,9 @@ sub generateBenchmark if ((Util::isWindows()||Util::isCygwin()) && $testCmd->{'pig_win'}) { $modifiedTestCmd{'pig'} = $testCmd->{'pig_win'}; } + if ( $testCmd->{'hadoopversion'} == '23' && $testCmd->{'pig23'}) { + $modifiedTestCmd{'pig'} = $testCmd->{'pig23'}; + } # Change so we're looking at the old version of Pig if (defined $testCmd->{'oldpigpath'} && $testCmd->{'oldpigpath'} ne "") { $modifiedTestCmd{'pigpath'} = $testCmd->{'oldpigpath'}; @@ -1044,6 +1058,10 @@ sub wrongExecutionMode($$) } } + if (defined $testCmd->{'ignore23'} && $testCmd->{'hadoopversion'}=='23') { + $wrong = 1; + } + if ($wrong) { print $log "Skipping test $testCmd->{'group'}" . "_" . $testCmd->{'num'} . " since it is not suppsed to be run in hadoop 23\n"; Modified: pig/branches/spark/test/e2e/pig/streaming/PigStreaming.pl URL: http://svn.apache.org/viewvc/pig/branches/spark/test/e2e/pig/streaming/PigStreaming.pl?rev=1784224&r1=1784223&r2=1784224&view=diff ============================================================================== --- pig/branches/spark/test/e2e/pig/streaming/PigStreaming.pl (original) +++ pig/branches/spark/test/e2e/pig/streaming/PigStreaming.pl Fri Feb 24 03:34:37 2017 @@ -73,7 +73,7 @@ while (<$input_handle>) { chomp; $data = $_; - if (exists $hash{$data}) + if (defined(%hash) && (exists $hash{$data})) { print $output_handle "$hash{$data}\n"; } Modified: pig/branches/spark/test/e2e/pig/tests/grunt.conf URL: http://svn.apache.org/viewvc/pig/branches/spark/test/e2e/pig/tests/grunt.conf?rev=1784224&r1=1784223&r2=1784224&view=diff ============================================================================== --- pig/branches/spark/test/e2e/pig/tests/grunt.conf (original) +++ pig/branches/spark/test/e2e/pig/tests/grunt.conf Fri Feb 24 03:34:37 2017 @@ -46,12 +46,7 @@ $cfg = { 'execonly' => 'mapred,tez', # don't have a clue what their cwd will be for local mode 'expected_out_regex' => "/user", 'rc' => 0 - },{ - 'num' => 3, - 'pig' => "ls .", - 'execonly' => 'mapred,tez', - 'expected_out_regex' => "/user", - 'rc' => 0 + },{ 'num' => 4, 'pig' => "ls :INPATH:", @@ -82,22 +77,21 @@ $cfg = { 'rc' => 0 },{ 'num' => 10, - 'pig' => "mkdir :OUTPATH: - cp :INPATH:/singlefile/studenttab10k :OUTPATH: - ls :OUTPATH:", + 'pig' => "cp :INPATH:/singlefile/studenttab10k . + ls .", 'expected_out_regex' => ".*studenttab10k", 'rc' => 0 },{ 'num' => 11, - 'pig' => "cp :INPATH:/singlefile/studenttab10k :OUTPATH:/fred - ls :OUTPATH:", + 'pig' => "cp :INPATH:/singlefile/studenttab10k ./fred + ls .", 'expected_out_regex' => ".*fred", 'rc' => 0 },{ 'num' => 12, - 'pig' => "cp :INPATH:/singlefile/studenttab10k :OUTPATH:/jim - mv :OUTPATH:/jim :OUTPATH:/bob - ls :OUTPATH:", + 'pig' => "cp :INPATH:/singlefile/studenttab10k ./jim + mv ./jim ./bob + ls .", 'expected_out_regex' => ".*bob", 'rc' => 0 },{ @@ -109,19 +103,18 @@ $cfg = { },{ 'num' => 14, 'pig' => "copyToLocal :INPATH:/singlefile/votertab10k :TMP: - copyFromLocal :TMP:/votertab10k :OUTPATH:/joe - cat :OUTPATH:/joe", + copyFromLocal :TMP:/votertab10k ./joe + cat ./joe", 'expected_out_regex' => ":Grunt_14_output:", 'rc' => 0 },{ 'num' => 15, - 'pig' => "cp :INPATH:/singlefile/studenttab10k :OUTPATH:/fred - rm :OUTPATH:/fred", - 'not_expected_out_regex' => "fred", + 'pig' => "rm fred bob joe", + 'not_expected_out_regex' => "joe", 'rc' => 0 },{ 'num' => 16, - 'pig' => "rmf :OUTPATH:/jill", + 'pig' => "rmf jill", 'rc' => 0 } ] Modified: pig/branches/spark/test/e2e/pig/tests/hcat.conf URL: http://svn.apache.org/viewvc/pig/branches/spark/test/e2e/pig/tests/hcat.conf?rev=1784224&r1=1784223&r2=1784224&view=diff ============================================================================== --- pig/branches/spark/test/e2e/pig/tests/hcat.conf (original) +++ pig/branches/spark/test/e2e/pig/tests/hcat.conf Fri Feb 24 03:34:37 2017 @@ -44,7 +44,7 @@ stored as textfile;\, 'num' => 2, 'java_params' => ['-Dhcat.bin=:HCATBIN:'], 'pig' => q\a = load ':INPATH:/singlefile/studenttab10k' as (name, age, gpa); -SQL drop table if exists pig_hcat_ddl_1 purge; +SQL drop table if exists pig_hcat_ddl_1; sql create table pig_hcat_ddl_1(name string, age int, gpa double) @@ -55,35 +55,6 @@ store a into ':OUTPATH:';\, }, ] }, - { - 'name' => 'Jython_HCatDDL', - 'tests' => [ - { - # sql command - 'num' => 1 - ,'java_params' => ['-Dhcat.bin=:HCATBIN:'] - ,'pig' => q\#!/usr/bin/python -from org.apache.pig.scripting import Pig - -#create pig script - -Pig.sql("""sql drop table if exists pig_script_hcat_ddl_1;""") -ret = Pig.sql("""sql create table pig_script_hcat_ddl_1(name string, -age int, -gpa double) -stored as textfile; -""") - -if ret==0: - print "SQL command PASSED" - -else: - raise "SQL command FAILED" -\ - ,'rc' => 0 - }, - ] - }, ] } ; Modified: pig/branches/spark/test/e2e/pig/tests/multiquery.conf URL: http://svn.apache.org/viewvc/pig/branches/spark/test/e2e/pig/tests/multiquery.conf?rev=1784224&r1=1784223&r2=1784224&view=diff ============================================================================== --- pig/branches/spark/test/e2e/pig/tests/multiquery.conf (original) +++ pig/branches/spark/test/e2e/pig/tests/multiquery.conf Fri Feb 24 03:34:37 2017 @@ -728,52 +728,6 @@ b = union a1, a2; c = rank b by name ASC, age DESC DENSE; store c into ':OUTPATH:';\, }, - { - # Union + Split + Two replicate join - 'num' => 12, - 'pig' => q\a = load ':INPATH:/singlefile/studentnulltab10k' as (name, age, gpa); -a1 = filter a by gpa is null or gpa <= 3.9; -a2 = filter a by gpa < 2; -b = union a1, a2; -c = load ':INPATH:/singlefile/voternulltab10k' as (name, age, registration, contributions); -c1 = filter c by age < 30; -c2 = filter c by age > 50; -d = join b by name, c1 by name using 'replicated'; -e = join d by b::name, c2 by name using 'replicated'; -store e into ':OUTPATH:';\, - }, - { - # Multiple Union + Multiple Split + Single store - 'num' => 13, - 'pig' => q\a = load ':INPATH:/singlefile/studentnulltab10k' as (name, age:int, gpa); -b = load ':INPATH:/singlefile/studenttab10k' as (name, age:int, gpa); -u1 = union onschema a, b; -SPLIT u1 INTO r IF age < 30, s OTHERWISE; -c = load ':INPATH:/singlefile/voternulltab10k' as (votername, voterage, registration, contributions); -d = JOIN r BY name LEFT, c BY votername; -u2 = UNION ONSCHEMA d, s; -e = FILTER u2 BY name == 'nick miller'; -f = FILTER u2 BY age > 70 ; -u3 = UNION ONSCHEMA e, f; -store u3 into ':OUTPATH:';\, - }, - { - # PIG-5082. Similar to MultiQuery_Union_13 but for non-store vertex group - 'num' => 14, - 'pig' => q\a = load ':INPATH:/singlefile/studentnulltab10k' as (name, age:int, gpa); -b = load ':INPATH:/singlefile/studenttab10k' as (name, age:int, gpa); -u1 = union onschema a, b; -SPLIT u1 INTO r IF age < 30, s OTHERWISE; -c = load ':INPATH:/singlefile/voternulltab10k' as (votername, voterage, registration, contributions); -d = JOIN r BY name LEFT, c BY votername; -u2 = UNION ONSCHEMA d, s; -e = FILTER u2 BY name == 'nick miller'; -f = FILTER u2 BY age > 70 ; -u3 = UNION ONSCHEMA e, f; -SPLIT u3 INTO t if age > 75, u OTHERWISE; -v = JOIN t BY name LEFT, c BY votername; -store v into ':OUTPATH:';\, - } ] # end of tests }, @@ -906,38 +860,7 @@ m = UNION e, i, j, n; n = JOIN a BY name, m BY name; store n into ':OUTPATH:';\, - }, - { - # Self join bloom left outer - 'num' => 12, - 'execonly' => 'tez', - 'pig' => q\a = load ':INPATH:/singlefile/studentnulltab10k' as (name, age, gpa); -b = filter a by gpa >= 3.9; -c = filter a by gpa > 3; -d = join b by name left outer, c by name using 'bloom'; -store d into ':OUTPATH:';\, - 'verify_pig_script' => q\a = load ':INPATH:/singlefile/studentnulltab10k' as (name, age, gpa); -b = filter a by gpa >= 3.9; -c = filter a by gpa > 3; -d = join b by name left outer, c by name; -store d into ':OUTPATH:';\, - }, - { - # Self join bloom left outer with strategy as reduce - 'num' => 13, - 'execonly' => 'tez', - 'java_params' => ['-Dpig.bloomjoin.strategy=reduce'], - 'pig' => q\a = load ':INPATH:/singlefile/studentnulltab10k' as (name, age, gpa); -b = filter a by gpa >= 3.9; -c = filter a by gpa > 3; -d = join b by name left outer, c by name using 'bloom'; -store d into ':OUTPATH:';\, - 'verify_pig_script' => q\a = load ':INPATH:/singlefile/studentnulltab10k' as (name, age, gpa); -b = filter a by gpa >= 3.9; -c = filter a by gpa > 3; -d = join b by name left outer, c by name; -store d into ':OUTPATH:';\, - }, + } ] # end of tests }, Modified: pig/branches/spark/test/e2e/pig/tests/negative.conf URL: http://svn.apache.org/viewvc/pig/branches/spark/test/e2e/pig/tests/negative.conf?rev=1784224&r1=1784223&r2=1784224&view=diff ============================================================================== --- pig/branches/spark/test/e2e/pig/tests/negative.conf (original) +++ pig/branches/spark/test/e2e/pig/tests/negative.conf Fri Feb 24 03:34:37 2017 @@ -473,7 +473,7 @@ define CMD `perl PigStreaming.pl` input( A = load ':INPATH:/singlefile/studenttab10k'; B = stream A through CMD; store B into ':OUTPATH:';\, - 'expected_err_regex' => "Error reading output from Streaming binary|Error while reading from POStream and passing it to the streaming process", + 'expected_err_regex' => "Error reading output from Streaming binary", }, { # Invalid serializer - throws exception @@ -568,7 +568,24 @@ store D into ':OUTPATH:';\, 'expected_err_regex' => "Could not resolve StringStoreBad using imports", }, ] - } + }, + { + 'name' => 'LineageErrors', + 'tests' => [ + { + # UDF returns a bytearray that is cast to an integer + 'num' => 1, + 'pig' => q\register :FUNCPATH:/testudf.jar; +a = load ':INPATH:/singlefile/studenttab10k' as (name, age, gpa); +b = filter a by name lt 'b'; +c = foreach b generate org.apache.pig.test.udf.evalfunc.CreateMap((chararray)name, age); +d = foreach c generate $0#'alice young'; +split d into e if $0 < 42, f if $0 >= 42; +store e into ':OUTPATH:';\, + 'expected_err_regex' => "Received a bytearray from the UDF or Union from two different Loaders. Cannot determine how to convert the bytearray to int", + }, + ] + } ] } ;
