Modified: pig/branches/spark/src/org/apache/pig/newplan/logical/rules/ConstantCalculator.java URL: http://svn.apache.org/viewvc/pig/branches/spark/src/org/apache/pig/newplan/logical/rules/ConstantCalculator.java?rev=1642132&r1=1642131&r2=1642132&view=diff ============================================================================== --- pig/branches/spark/src/org/apache/pig/newplan/logical/rules/ConstantCalculator.java (original) +++ pig/branches/spark/src/org/apache/pig/newplan/logical/rules/ConstantCalculator.java Thu Nov 27 12:49:54 2014 @@ -26,6 +26,7 @@ import java.util.Map; import org.apache.pig.backend.executionengine.ExecException; import org.apache.pig.backend.hadoop.datastorage.ConfigurationUtil; +import org.apache.pig.backend.hadoop.executionengine.mapReduceLayer.PigHadoopLogger; import org.apache.pig.backend.hadoop.executionengine.physicalLayer.PhysicalOperator; import org.apache.pig.backend.hadoop.executionengine.physicalLayer.plans.PhysicalPlan; import org.apache.pig.impl.PigContext; @@ -48,6 +49,7 @@ import org.apache.pig.newplan.logical.op import org.apache.pig.newplan.logical.relational.LogicalRelationalOperator; import org.apache.pig.newplan.optimizer.Rule; import org.apache.pig.newplan.optimizer.Transformer; +import org.joda.time.DateTimeZone; public abstract class ConstantCalculator extends Rule { private List<LogicalRelationalOperator> processedOperators = new ArrayList<LogicalRelationalOperator>(); @@ -107,6 +109,7 @@ public abstract class ConstantCalculator public static class ConstantCalculatorExpressionVisitor extends AllSameExpressionVisitor { private LogicalRelationalOperator currentOp; private PigContext pc; + private DateTimeZone currentDTZ = null; public ConstantCalculatorExpressionVisitor(OperatorPlan expPlan, LogicalRelationalOperator currentOp, PigContext pc) throws FrontendException { super(expPlan, new ReverseDependencyOrderWalkerWOSeenChk(expPlan)); @@ -148,7 +151,11 @@ public abstract class ConstantCalculator PhysicalOperator root = expPhysicalPlan.getLeaves().get(0); try { UDFContext.getUDFContext().addJobConf(ConfigurationUtil.toConfiguration(pc.getProperties(), true)); + PigHadoopLogger pigHadoopLogger = PigHadoopLogger.getInstance(); + PhysicalOperator.setPigLogger(pigHadoopLogger); + setDefaultTimeZone(); val = root.getNext(root.getResultType()).result; + restoreDefaultTimeZone(); UDFContext.getUDFContext().addJobConf(null); } catch (ExecException e) { throw new FrontendException(e); @@ -159,7 +166,9 @@ public abstract class ConstantCalculator UserFuncExpression udf = (UserFuncExpression)op; try { UDFContext.getUDFContext().addJobConf(ConfigurationUtil.toConfiguration(pc.getProperties(), true)); + setDefaultTimeZone(); val = udf.getEvalFunc().exec(null); + restoreDefaultTimeZone(); UDFContext.getUDFContext().addJobConf(null); } catch (IOException e) { throw new FrontendException(e); @@ -173,6 +182,21 @@ public abstract class ConstantCalculator currentWalker.getPlan().replace(op, constantExpr); } } + + private void setDefaultTimeZone() { + String dtzStr = pc.getProperties().getProperty("pig.datetime.default.tz"); + if (dtzStr != null && dtzStr.length() > 0) { + currentDTZ = DateTimeZone.getDefault(); + DateTimeZone.setDefault(DateTimeZone.forID(dtzStr)); + } + } + + private void restoreDefaultTimeZone() { + if (currentDTZ != null) { + DateTimeZone.setDefault(currentDTZ); + currentDTZ = null; + } + } } @Override
Modified: pig/branches/spark/src/org/apache/pig/parser/AliasMasker.g URL: http://svn.apache.org/viewvc/pig/branches/spark/src/org/apache/pig/parser/AliasMasker.g?rev=1642132&r1=1642131&r2=1642132&view=diff ============================================================================== --- pig/branches/spark/src/org/apache/pig/parser/AliasMasker.g (original) +++ pig/branches/spark/src/org/apache/pig/parser/AliasMasker.g Thu Nov 27 12:49:54 2014 @@ -227,7 +227,7 @@ bag_type : ^( BAG_TYPE IDENTIFIER? tuple_type? ) ; -map_type : ^( MAP_TYPE type? ) +map_type : ^( MAP_TYPE IDENTIFIER? type? ) ; func_clause Modified: pig/branches/spark/src/org/apache/pig/parser/AstPrinter.g URL: http://svn.apache.org/viewvc/pig/branches/spark/src/org/apache/pig/parser/AstPrinter.g?rev=1642132&r1=1642131&r2=1642132&view=diff ============================================================================== --- pig/branches/spark/src/org/apache/pig/parser/AstPrinter.g (original) +++ pig/branches/spark/src/org/apache/pig/parser/AstPrinter.g Thu Nov 27 12:49:54 2014 @@ -222,7 +222,7 @@ bag_type : ^( BAG_TYPE { sb.append("bag{"); } ( { sb.append("T:"); } IDENTIFIER? tuple_type )? ) { sb.append("}"); } ; -map_type : ^( MAP_TYPE { sb.append("map["); } type? ) { sb.append("]"); } +map_type : ^( MAP_TYPE { sb.append("map["); } IDENTIFIER? type? ) { sb.append("]"); } ; func_clause Modified: pig/branches/spark/src/org/apache/pig/parser/AstValidator.g URL: http://svn.apache.org/viewvc/pig/branches/spark/src/org/apache/pig/parser/AstValidator.g?rev=1642132&r1=1642131&r2=1642132&view=diff ============================================================================== --- pig/branches/spark/src/org/apache/pig/parser/AstValidator.g (original) +++ pig/branches/spark/src/org/apache/pig/parser/AstValidator.g Thu Nov 27 12:49:54 2014 @@ -276,7 +276,7 @@ tuple_type : ^( TUPLE_TYPE field_def_lis bag_type : ^( BAG_TYPE IDENTIFIER? tuple_type? ) ; -map_type : ^( MAP_TYPE type? ) +map_type : ^( MAP_TYPE IDENTIFIER? type? ) ; func_clause : ^( FUNC_REF func_name ) Modified: pig/branches/spark/src/org/apache/pig/parser/LogicalPlanBuilder.java URL: http://svn.apache.org/viewvc/pig/branches/spark/src/org/apache/pig/parser/LogicalPlanBuilder.java?rev=1642132&r1=1642131&r2=1642132&view=diff ============================================================================== --- pig/branches/spark/src/org/apache/pig/parser/LogicalPlanBuilder.java (original) +++ pig/branches/spark/src/org/apache/pig/parser/LogicalPlanBuilder.java Thu Nov 27 12:49:54 2014 @@ -100,6 +100,8 @@ import org.apache.pig.newplan.logical.ru import org.apache.pig.newplan.logical.visitor.ProjStarInUdfExpander; import org.apache.pig.newplan.logical.visitor.ProjectStarExpander; import org.apache.pig.newplan.logical.visitor.ResetProjectionAttachedRelationalOpVisitor; +import org.apache.pig.validator.BlackAndWhitelistFilter; +import org.apache.pig.validator.PigCommandFilter; public class LogicalPlanBuilder { @@ -123,6 +125,8 @@ public class LogicalPlanBuilder { private int storeIndex = 0; private int loadIndex = 0; + private final BlackAndWhitelistFilter filter; + private static NodeIdGenerator nodeIdGen = NodeIdGenerator.getGenerator(); public static long getNextId(String scope) { @@ -135,6 +139,7 @@ public class LogicalPlanBuilder { this.scope = scope; this.fileNameMap = fileNameMap; this.intStream = input; + this.filter = new BlackAndWhitelistFilter(this.pigContext); } LogicalPlanBuilder(IntStream input) throws ExecException { @@ -143,6 +148,7 @@ public class LogicalPlanBuilder { this.scope = "test"; this.fileNameMap = new HashMap<String, String>(); this.intStream = input; + this.filter = new BlackAndWhitelistFilter(this.pigContext); } Operator lookupOperator(String alias) { @@ -158,10 +164,20 @@ public class LogicalPlanBuilder { } void defineCommand(String alias, StreamingCommand command) { + try { + filter.validate(PigCommandFilter.Command.DEFINE); + } catch (FrontendException e) { + throw new RuntimeException(e.getMessage()); + } pigContext.registerStreamCmd( alias, command ); } void defineFunction(String alias, FuncSpec fs) { + try { + filter.validate(PigCommandFilter.Command.DEFINE); + } catch (FrontendException e) { + throw new RuntimeException(e); + } pigContext.registerFunction( alias, fs ); } Modified: pig/branches/spark/src/org/apache/pig/parser/LogicalPlanGenerator.g URL: http://svn.apache.org/viewvc/pig/branches/spark/src/org/apache/pig/parser/LogicalPlanGenerator.g?rev=1642132&r1=1642131&r2=1642132&view=diff ============================================================================== --- pig/branches/spark/src/org/apache/pig/parser/LogicalPlanGenerator.g (original) +++ pig/branches/spark/src/org/apache/pig/parser/LogicalPlanGenerator.g Thu Nov 27 12:49:54 2014 @@ -430,24 +430,19 @@ tuple_type returns[LogicalSchema logical bag_type returns[LogicalSchema logicalSchema] : ^( BAG_TYPE IDENTIFIER? tuple_type? ) { - if ($tuple_type.logicalSchema!=null && $tuple_type.logicalSchema.size()==1 && $tuple_type.logicalSchema.getField(0).type==DataType.TUPLE) { - $logicalSchema = $tuple_type.logicalSchema; - } - else { - LogicalSchema s = new LogicalSchema(); - s.addField(new LogicalFieldSchema($IDENTIFIER.text, $tuple_type.logicalSchema, DataType.TUPLE)); - $logicalSchema = s; - } + LogicalSchema s = new LogicalSchema(); + s.addField(new LogicalFieldSchema($IDENTIFIER.text, $tuple_type.logicalSchema, DataType.TUPLE)); + $logicalSchema = s; } ; map_type returns[LogicalSchema logicalSchema] - : ^( MAP_TYPE type? ) + : ^( MAP_TYPE IDENTIFIER? type? ) { LogicalSchema s = null; if( $type.datatype != null ) { s = new LogicalSchema(); - s.addField( new LogicalFieldSchema( null, $type.logicalSchema, $type.datatype ) ); + s.addField( new LogicalFieldSchema( $IDENTIFIER.text, $type.logicalSchema, $type.datatype ) ); } $logicalSchema = s; } Modified: pig/branches/spark/src/org/apache/pig/parser/QueryParser.g URL: http://svn.apache.org/viewvc/pig/branches/spark/src/org/apache/pig/parser/QueryParser.g?rev=1642132&r1=1642131&r2=1642132&view=diff ============================================================================== --- pig/branches/spark/src/org/apache/pig/parser/QueryParser.g (original) +++ pig/branches/spark/src/org/apache/pig/parser/QueryParser.g Thu Nov 27 12:49:54 2014 @@ -329,7 +329,7 @@ explicit_bag_type : BAG! implicit_bag_ty explicit_bag_type_cast : BAG LEFT_CURLY explicit_tuple_type_cast? RIGHT_CURLY -> ^( BAG_TYPE_CAST explicit_tuple_type_cast? ) ; -implicit_map_type : LEFT_BRACKET type? RIGHT_BRACKET -> ^( MAP_TYPE type? ) +implicit_map_type : LEFT_BRACKET ( ( identifier_plus COLON )? type )? RIGHT_BRACKET -> ^( MAP_TYPE identifier_plus? type? ) ; explicit_map_type : MAP! implicit_map_type Modified: pig/branches/spark/src/org/apache/pig/parser/QueryParserDriver.java URL: http://svn.apache.org/viewvc/pig/branches/spark/src/org/apache/pig/parser/QueryParserDriver.java?rev=1642132&r1=1642131&r2=1642132&view=diff ============================================================================== --- pig/branches/spark/src/org/apache/pig/parser/QueryParserDriver.java (original) +++ pig/branches/spark/src/org/apache/pig/parser/QueryParserDriver.java Thu Nov 27 12:49:54 2014 @@ -47,12 +47,15 @@ import org.apache.pig.impl.PigContext; import org.apache.pig.impl.io.FileLocalizer; import org.apache.pig.impl.io.FileLocalizer.FetchFileRet; import org.apache.pig.impl.io.ResourceNotFoundException; +import org.apache.pig.impl.logicalLayer.FrontendException; import org.apache.pig.newplan.Operator; import org.apache.pig.newplan.logical.relational.LogicalPlan; import org.apache.pig.newplan.logical.relational.LogicalSchema; import org.apache.pig.parser.QueryParser.literal_return; import org.apache.pig.parser.QueryParser.schema_return; import org.apache.pig.tools.pigstats.ScriptState; +import org.apache.pig.validator.BlackAndWhitelistFilter; +import org.apache.pig.validator.PigCommandFilter; public class QueryParserDriver { private static final Log LOG = LogFactory.getLog(QueryParserDriver.class); @@ -367,8 +370,17 @@ public class QueryParserDriver { private boolean expandImport(Tree ast) throws ParserException { List<CommonTree> nodes = new ArrayList<CommonTree>(); traverseImport(ast, nodes); - if (nodes.isEmpty()) return false; + if (nodes.isEmpty()) + return false; + // Validate if imports are enabled/disabled + final BlackAndWhitelistFilter filter = new BlackAndWhitelistFilter( + this.pigContext); + try { + filter.validate(PigCommandFilter.Command.IMPORT); + } catch (FrontendException e) { + throw new ParserException(e.getMessage()); + } for (CommonTree t : nodes) { macroImport(t); } @@ -562,6 +574,7 @@ public class QueryParserDriver { String macroText = null; try { + in.close(); in = new BufferedReader(new StringReader(sb.toString())); macroText = pigContext.doParamSubstitution(in); } catch (IOException e) { Modified: pig/branches/spark/src/org/apache/pig/scripting/Pig.java URL: http://svn.apache.org/viewvc/pig/branches/spark/src/org/apache/pig/scripting/Pig.java?rev=1642132&r1=1642131&r2=1642132&view=diff ============================================================================== --- pig/branches/spark/src/org/apache/pig/scripting/Pig.java (original) +++ pig/branches/spark/src/org/apache/pig/scripting/Pig.java Thu Nov 27 12:49:54 2014 @@ -120,7 +120,7 @@ public class Pig { */ public static void registerUDF(String udffile, String namespace) throws IOException { - LOG.info("Register script UFD file: "+ udffile); + LOG.info("Register script UDF file: "+ udffile); ScriptPigContext ctx = getScriptContext(); ScriptEngine engine = ctx.getScriptEngine(); // script file contains only functions, no need to separate @@ -349,13 +349,16 @@ public class Pig { private static String getScriptFromFile(String filename) throws IOException { LineNumberReader rd = new LineNumberReader(new FileReader(filename)); StringBuilder sb = new StringBuilder(); - String line = rd.readLine(); - while (line != null) { - sb.append(line); - sb.append("\n"); - line = rd.readLine(); + try { + String line = rd.readLine(); + while (line != null) { + sb.append(line); + sb.append("\n"); + line = rd.readLine(); + } + } finally { + rd.close(); } - rd.close(); return sb.toString(); } Modified: pig/branches/spark/src/org/apache/pig/scripting/ScriptingOutputCapturer.java URL: http://svn.apache.org/viewvc/pig/branches/spark/src/org/apache/pig/scripting/ScriptingOutputCapturer.java?rev=1642132&r1=1642131&r2=1642132&view=diff ============================================================================== --- pig/branches/spark/src/org/apache/pig/scripting/ScriptingOutputCapturer.java (original) +++ pig/branches/spark/src/org/apache/pig/scripting/ScriptingOutputCapturer.java Thu Nov 27 12:49:54 2014 @@ -32,6 +32,7 @@ import org.apache.commons.logging.LogFac import org.apache.hadoop.conf.Configuration; import org.apache.pig.ExecType; import org.apache.pig.backend.hadoop.executionengine.mapReduceLayer.MRConfiguration; +import org.apache.pig.impl.PigImplConstants; import org.apache.pig.impl.util.UDFContext; import com.google.common.base.Charsets; @@ -100,7 +101,7 @@ public class ScriptingOutputCapturer { log.debug("TaskId: " + taskId); log.debug("hadoopLogDir: " + hadoopLogDir); - if (execType.isLocal()) { + if (execType.isLocal() || conf.getBoolean(PigImplConstants.CONVERTED_TO_FETCH, false)) { String logDir = System.getProperty("pig.udf.scripting.log.dir"); if (logDir == null) logDir = "."; Modified: pig/branches/spark/src/org/apache/pig/scripting/jython/JythonScriptEngine.java URL: http://svn.apache.org/viewvc/pig/branches/spark/src/org/apache/pig/scripting/jython/JythonScriptEngine.java?rev=1642132&r1=1642131&r2=1642132&view=diff ============================================================================== --- pig/branches/spark/src/org/apache/pig/scripting/jython/JythonScriptEngine.java (original) +++ pig/branches/spark/src/org/apache/pig/scripting/jython/JythonScriptEngine.java Thu Nov 27 12:49:54 2014 @@ -300,7 +300,7 @@ public class JythonScriptEngine extends // determine the relative path that the file should have in the jar int pos = apath.lastIndexOf(File.separatorChar + name.replace('.', File.separatorChar)); if (pos > 0) { - files.put(apath.substring(pos), apath); + files.put(apath.substring(pos + 1), apath); } else { files.put(apath, apath); } @@ -409,13 +409,15 @@ public class JythonScriptEngine extends throw new IOException("Can't read file: " + scriptFile); } - // TODO: fis1 is not closed FileInputStream fis1 = new FileInputStream(scriptFile); - if (hasFunction(fis1)) { - registerFunctions(scriptFile, null, pigContext); + try { + if (hasFunction(fis1)) { + registerFunctions(scriptFile, null, pigContext); + } + } finally { + fis1.close(); } - Interpreter.setMain(true); FileInputStream fis = new FileInputStream(scriptFile); try { Modified: pig/branches/spark/src/org/apache/pig/scripting/streaming/python/PythonScriptEngine.java URL: http://svn.apache.org/viewvc/pig/branches/spark/src/org/apache/pig/scripting/streaming/python/PythonScriptEngine.java?rev=1642132&r1=1642131&r2=1642132&view=diff ============================================================================== --- pig/branches/spark/src/org/apache/pig/scripting/streaming/python/PythonScriptEngine.java (original) +++ pig/branches/spark/src/org/apache/pig/scripting/streaming/python/PythonScriptEngine.java Thu Nov 27 12:49:54 2014 @@ -56,7 +56,12 @@ public class PythonScriptEngine extends } FileInputStream fin = new FileInputStream(f); - List<String[]> functions = getFunctions(fin); + List<String[]> functions = null; + try { + functions = getFunctions(fin); + } finally { + fin.close(); + } namespace = namespace == null ? "" : namespace + NAMESPACE_SEPARATOR; for(String[] functionInfo : functions) { String name = functionInfo[0]; @@ -75,7 +80,6 @@ public class PythonScriptEngine extends execType, isIllustrate })); } - fin.close(); } @Override Modified: pig/branches/spark/src/org/apache/pig/tools/grunt/GruntParser.java URL: http://svn.apache.org/viewvc/pig/branches/spark/src/org/apache/pig/tools/grunt/GruntParser.java?rev=1642132&r1=1642131&r2=1642132&view=diff ============================================================================== --- pig/branches/spark/src/org/apache/pig/tools/grunt/GruntParser.java (original) +++ pig/branches/spark/src/org/apache/pig/tools/grunt/GruntParser.java Thu Nov 27 12:49:54 2014 @@ -45,6 +45,7 @@ import java.util.Set; import jline.ConsoleReader; import jline.ConsoleReaderInputStream; +import org.apache.commons.io.output.NullOutputStream; import org.apache.commons.logging.Log; import org.apache.commons.logging.LogFactory; import org.apache.hadoop.fs.FileSystem; @@ -354,7 +355,7 @@ public class GruntParser extends PigScri List<String> params, List<String> files, boolean dontPrintOutput) throws IOException, ParseException { - + filter.validate(PigCommandFilter.Command.EXPLAIN); if (null != mExplain) { return; } @@ -390,26 +391,9 @@ public class GruntParser extends PigScri explainCurrentBatch(false); } - /** - * A {@link PrintStream} implementation which does not write anything - * Used with '-check' command line option to pig Main - * (through {@link GruntParser#explainCurrentBatch(boolean) } ) - */ - static class NullPrintStream extends PrintStream { - public NullPrintStream(String fileName) throws FileNotFoundException { - super(fileName); - } - @Override - public void write(byte[] buf, int off, int len) {} - @Override - public void write(int b) {} - @Override - public void write(byte [] b) {} - } - protected void explainCurrentBatch(boolean dontPrintOutput) throws IOException { - PrintStream lp = (dontPrintOutput) ? new NullPrintStream("dummy") : System.out; - PrintStream ep = (dontPrintOutput) ? new NullPrintStream("dummy") : System.out; + PrintStream lp = (dontPrintOutput) ? new PrintStream(new NullOutputStream()) : System.out; + PrintStream ep = (dontPrintOutput) ? new PrintStream(new NullOutputStream()) : System.out; if (!(mExplain.mLast && mExplain.mCount == 0)) { if (mPigServer.isBatchEmpty()) { @@ -489,13 +473,20 @@ public class GruntParser extends PigScri PigContext context = mPigServer.getPigContext(); BufferedReader reader = new BufferedReader(new FileReader(scriptPath)); - return context.doParamSubstitution(reader, params, paramFiles); + String result = context.doParamSubstitution(reader, params, paramFiles); + reader.close(); + return result; } @Override protected void processScript(String script, boolean batch, List<String> params, List<String> files) throws IOException, ParseException { + if(batch) { + filter.validate(PigCommandFilter.Command.EXEC); + } else { + filter.validate(PigCommandFilter.Command.RUN); + } if(mExplain == null) { // process only if not in "explain" mode if (script == null) { @@ -1203,10 +1194,16 @@ public class GruntParser extends PigScri } public static int runSQLCommand(String hcatBin, String cmd, boolean mInteractive) throws IOException { - String[] tokens = new String[3]; - tokens[0] = hcatBin; - tokens[1] = "-e"; - tokens[2] = cmd.substring(cmd.indexOf("sql")).substring(4); + List<String> tokensList = new ArrayList<String>(); + if (hcatBin.endsWith(".py")) { + tokensList.add("python"); + tokensList.add(hcatBin); + } else { + tokensList.add(hcatBin); + } + tokensList.add("-e"); + tokensList.add(cmd.substring(cmd.indexOf("sql")).substring(4).replaceAll("\n", " ")); + String[] tokens = tokensList.toArray(new String[]{}); // create new environment = environment - HADOOP_CLASSPATH // This is because of antlr version conflict between Pig and Hive @@ -1218,7 +1215,7 @@ public class GruntParser extends PigScri } } - log.info("Going to run hcat command: " + tokens[2]); + log.info("Going to run hcat command: " + tokens[tokens.length-1]); Process executor = Runtime.getRuntime().exec(tokens, envSet.toArray(new String[0])); StreamPrinter outPrinter = new StreamPrinter(executor.getInputStream(), null, System.out); StreamPrinter errPrinter = new StreamPrinter(executor.getErrorStream(), null, System.err); Modified: pig/branches/spark/src/org/apache/pig/tools/parameters/PigFileParser.jj URL: http://svn.apache.org/viewvc/pig/branches/spark/src/org/apache/pig/tools/parameters/PigFileParser.jj?rev=1642132&r1=1642131&r2=1642132&view=diff ============================================================================== --- pig/branches/spark/src/org/apache/pig/tools/parameters/PigFileParser.jj (original) +++ pig/branches/spark/src/org/apache/pig/tools/parameters/PigFileParser.jj Thu Nov 27 12:49:54 2014 @@ -287,7 +287,7 @@ void Parse() throws IOException : {} void input() throws IOException : { String s; - Token strTok; + Token strTok = null; } { strTok = <PIG> @@ -296,14 +296,16 @@ void input() throws IOException : out.append(strTok.image ); } | - <DECLARE> + strTok = <DECLARE> ( param_value(true) // overwrite=true + { pc.validate(strTok.toString()); } ) | - <PIGDEFAULT> + strTok = <PIGDEFAULT> ( param_value(false) // overwrite=false + { pc.validate(strTok.toString()); } ) | s = paramString(){} Modified: pig/branches/spark/src/org/apache/pig/tools/parameters/PreprocessorContext.java URL: http://svn.apache.org/viewvc/pig/branches/spark/src/org/apache/pig/tools/parameters/PreprocessorContext.java?rev=1642132&r1=1642131&r2=1642132&view=diff ============================================================================== --- pig/branches/spark/src/org/apache/pig/tools/parameters/PreprocessorContext.java (original) +++ pig/branches/spark/src/org/apache/pig/tools/parameters/PreprocessorContext.java Thu Nov 27 12:49:54 2014 @@ -36,6 +36,11 @@ import java.util.regex.Pattern; import org.apache.commons.logging.Log; import org.apache.commons.logging.LogFactory; import org.apache.hadoop.util.Shell; +import org.apache.pig.impl.PigContext; +import org.apache.pig.impl.logicalLayer.FrontendException; +import org.apache.pig.validator.BlackAndWhitelistFilter; +import org.apache.pig.validator.PigCommandFilter; +import org.python.google.common.base.Preconditions; public class PreprocessorContext { @@ -44,6 +49,8 @@ public class PreprocessorContext { // used internally to detect when a param is set multiple times, // but it set with the same value so it's ok not to log a warning private Map<String, String> param_source; + + private PigContext pigContext; public Map<String, String> getParamVal() { return param_val; @@ -65,6 +72,10 @@ public class PreprocessorContext { param_source = new Hashtable<String, String>(paramVal); } + public void setPigContext(PigContext context) { + this.pigContext = context; + } + /* public void processLiteral(String key, String val) { processLiteral(key, val, true); @@ -76,7 +87,7 @@ public class PreprocessorContext { * @param key - parameter name * @param val - string containing command to be executed */ - public void processShellCmd(String key, String val) throws ParameterSubstitutionException { + public void processShellCmd(String key, String val) throws ParameterSubstitutionException, FrontendException { processShellCmd(key, val, true); } @@ -112,13 +123,18 @@ public class PreprocessorContext { * @param key - parameter name * @param val - string containing command to be executed */ - public void processShellCmd(String key, String val, Boolean overwrite) throws ParameterSubstitutionException { + public void processShellCmd(String key, String val, Boolean overwrite) throws ParameterSubstitutionException, FrontendException { + if (pigContext != null) { + BlackAndWhitelistFilter filter = new BlackAndWhitelistFilter(pigContext); + filter.validate(PigCommandFilter.Command.SH); + } if (param_val.containsKey(key)) { if (param_source.get(key).equals(val) || !overwrite) { return; } else { - log.warn("Warning : Multiple values found for " + key + ". Using value " + val); + log.warn("Warning : Multiple values found for " + key + + ". Using value " + val); } } @@ -130,6 +146,25 @@ public class PreprocessorContext { param_val.put(key, sub_val); } + public void validate(String preprocessorCmd) throws FrontendException { + if (pigContext == null) { + return; + } + + final BlackAndWhitelistFilter filter = new BlackAndWhitelistFilter(pigContext); + final String declareToken = "%declare"; + final String defaultToken = "%default"; + + if (preprocessorCmd.toLowerCase().equals(declareToken)) { + filter.validate(PigCommandFilter.Command.DECLARE); + } else if (preprocessorCmd.toLowerCase().equals(defaultToken)) { + filter.validate(PigCommandFilter.Command.DEFAULT); + } else { + throw new IllegalArgumentException("Pig Internal Error. Invalid preprocessor command specified : " + + preprocessorCmd); + } + } + /** * This method generates value for the specified key by * performing substitution if needed within the value first. Modified: pig/branches/spark/src/org/apache/pig/tools/pigstats/JobStats.java URL: http://svn.apache.org/viewvc/pig/branches/spark/src/org/apache/pig/tools/pigstats/JobStats.java?rev=1642132&r1=1642131&r2=1642132&view=diff ============================================================================== --- pig/branches/spark/src/org/apache/pig/tools/pigstats/JobStats.java (original) +++ pig/branches/spark/src/org/apache/pig/tools/pigstats/JobStats.java Thu Nov 27 12:49:54 2014 @@ -21,7 +21,6 @@ package org.apache.pig.tools.pigstats; import java.io.FileNotFoundException; import java.io.IOException; import java.util.ArrayList; -import java.util.Arrays; import java.util.Collections; import java.util.List; import java.util.Map; @@ -182,17 +181,17 @@ public abstract class JobStats extends O * @param durations * @return median value */ - protected long calculateMedianValue(long[] durations) { + protected long calculateMedianValue(List<Long> durations) { long median; // figure out the median - Arrays.sort(durations); - int midPoint = durations.length /2; - if ((durations.length & 1) == 1) { + Collections.sort(durations); + int midPoint = durations.size() /2; + if ((durations.size() & 1) == 1) { // odd - median = durations[midPoint]; + median = durations.get(midPoint); } else { // even - median = (durations[midPoint-1] + durations[midPoint]) / 2; + median = (durations.get(midPoint-1) + durations.get(midPoint)) / 2; } return median; } Modified: pig/branches/spark/src/org/apache/pig/tools/pigstats/PigProgressNotificationListener.java URL: http://svn.apache.org/viewvc/pig/branches/spark/src/org/apache/pig/tools/pigstats/PigProgressNotificationListener.java?rev=1642132&r1=1642131&r2=1642132&view=diff ============================================================================== --- pig/branches/spark/src/org/apache/pig/tools/pigstats/PigProgressNotificationListener.java (original) +++ pig/branches/spark/src/org/apache/pig/tools/pigstats/PigProgressNotificationListener.java Thu Nov 27 12:49:54 2014 @@ -18,12 +18,11 @@ package org.apache.pig.tools.pigstats; +import org.apache.pig.PigRunner; import org.apache.pig.classification.InterfaceAudience; import org.apache.pig.classification.InterfaceStability; import org.apache.pig.impl.plan.OperatorPlan; -import org.apache.pig.PigRunner; - /** * Should be implemented by an object that wants to receive notifications * from {@link PigRunner}. @@ -33,7 +32,7 @@ import org.apache.pig.PigRunner; public interface PigProgressNotificationListener extends java.util.EventListener { /** - * Invoked before any Hadoop jobs are run with the plan that is to be executed. + * Invoked before any Hadoop jobs (or a Tez DAG) are run with the plan that is to be executed. * * @param scriptId the unique id of the script * @param plan the OperatorPlan that is to be executed @@ -41,36 +40,36 @@ public interface PigProgressNotification public void initialPlanNotification(String scriptId, OperatorPlan<?> plan); /** - * Invoked just before launching Hadoop jobs spawned by the script. + * Invoked just before launching Hadoop jobs (or tez DAGs) spawned by the script. * @param scriptId the unique id of the script - * @param numJobsToLaunch the total number of Hadoop jobs spawned by the script + * @param numJobsToLaunch the total number of Hadoop jobs (or Tez DAGs) spawned by the script */ public void launchStartedNotification(String scriptId, int numJobsToLaunch); /** - * Invoked just before submitting a batch of Hadoop jobs. + * Invoked just before submitting a batch of Hadoop jobs (or Tez DAGs). * @param scriptId the unique id of the script - * @param numJobsSubmitted the number of Hadoop jobs in the batch + * @param numJobsSubmitted the number of Hadoop jobs (or Tez DAGs) in the batch */ public void jobsSubmittedNotification(String scriptId, int numJobsSubmitted); /** - * Invoked after a Hadoop job is started. - * @param scriptId the unique id of the script - * @param assignedJobId the Hadoop job id + * Invoked after a Hadoop job (or Tez DAG) is started. + * @param scriptId the unique id of the script + * @param assignedJobId the Hadoop job id (or Tez DAG job id) */ public void jobStartedNotification(String scriptId, String assignedJobId); /** - * Invoked just after a Hadoop job is completed successfully. - * @param scriptId the unique id of the script - * @param jobStats the {@link JobStats} object associated with the Hadoop job + * Invoked just after a Hadoop job (or Tez DAG) is completed successfully. + * @param scriptId the unique id of the script + * @param jobStats the {@link JobStats} object associated with the Hadoop job (or Tez DAG) */ public void jobFinishedNotification(String scriptId, JobStats jobStats); /** * Invoked when a Hadoop job fails. - * @param scriptId the unique id of the script + * @param scriptId the unique id of the script * @param jobStats the {@link JobStats} object associated with the Hadoop job */ public void jobFailedNotification(String scriptId, JobStats jobStats); @@ -83,16 +82,16 @@ public interface PigProgressNotification public void outputCompletedNotification(String scriptId, OutputStats outputStats); /** - * Invoked to update the execution progress. + * Invoked to update the execution progress. * @param scriptId the unique id of the script * @param progress the percentage of the execution progress */ public void progressUpdatedNotification(String scriptId, int progress); /** - * Invoked just after all Hadoop jobs spawned by the script are completed. + * Invoked just after all Hadoop jobs (Tez DAGs) spawned by the script are completed. * @param scriptId the unique id of the script - * @param numJobsSucceeded the total number of Hadoop jobs succeeded + * @param numJobsSucceeded the total number of Hadoop jobs (Tez DAGs) succeeded */ public void launchCompletedNotification(String scriptId, int numJobsSucceeded); } Modified: pig/branches/spark/src/org/apache/pig/tools/pigstats/PigStatusReporter.java URL: http://svn.apache.org/viewvc/pig/branches/spark/src/org/apache/pig/tools/pigstats/PigStatusReporter.java?rev=1642132&r1=1642131&r2=1642132&view=diff ============================================================================== --- pig/branches/spark/src/org/apache/pig/tools/pigstats/PigStatusReporter.java (original) +++ pig/branches/spark/src/org/apache/pig/tools/pigstats/PigStatusReporter.java Thu Nov 27 12:49:54 2014 @@ -21,6 +21,8 @@ package org.apache.pig.tools.pigstats; import org.apache.hadoop.mapreduce.Counter; import org.apache.hadoop.mapreduce.StatusReporter; import org.apache.hadoop.util.Progressable; +import org.apache.pig.JVMReuseManager; +import org.apache.pig.StaticDataCleanup; import org.apache.pig.backend.hadoop.executionengine.TaskContext; import org.apache.pig.classification.InterfaceAudience; import org.apache.pig.classification.InterfaceStability; @@ -33,6 +35,15 @@ public class PigStatusReporter extends S private TaskContext<?> context = null; + static { + JVMReuseManager.getInstance().registerForStaticDataCleanup(PigStatusReporter.class); + } + + @StaticDataCleanup + public static void staticDataCleanup() { + reporter = null; + } + private PigStatusReporter() { } @@ -46,10 +57,6 @@ public class PigStatusReporter extends S return reporter; } - public void destroy() { - context = null; - } - public void setContext(TaskContext<?> context) { this.context = context; } Modified: pig/branches/spark/src/org/apache/pig/tools/pigstats/ScriptState.java URL: http://svn.apache.org/viewvc/pig/branches/spark/src/org/apache/pig/tools/pigstats/ScriptState.java?rev=1642132&r1=1642131&r2=1642132&view=diff ============================================================================== --- pig/branches/spark/src/org/apache/pig/tools/pigstats/ScriptState.java (original) +++ pig/branches/spark/src/org/apache/pig/tools/pigstats/ScriptState.java Thu Nov 27 12:49:54 2014 @@ -296,7 +296,7 @@ public abstract class ScriptState { // restrict the size of the script to be stored in job conf int maxScriptSize = 10240; if (pigContext != null) { - String prop = pigContext.getProperties().getProperty(PigConfiguration.MAX_SCRIPT_SIZE); + String prop = pigContext.getProperties().getProperty(PigConfiguration.PIG_SCRIPT_MAX_SIZE); if (prop != null) { maxScriptSize = Integer.valueOf(prop); } Modified: pig/branches/spark/src/org/apache/pig/tools/pigstats/mapreduce/MRJobStats.java URL: http://svn.apache.org/viewvc/pig/branches/spark/src/org/apache/pig/tools/pigstats/mapreduce/MRJobStats.java?rev=1642132&r1=1642131&r2=1642132&view=diff ============================================================================== --- pig/branches/spark/src/org/apache/pig/tools/pigstats/mapreduce/MRJobStats.java (original) +++ pig/branches/spark/src/org/apache/pig/tools/pigstats/mapreduce/MRJobStats.java Thu Nov 27 12:49:54 2014 @@ -31,9 +31,7 @@ import org.apache.commons.logging.LogFac import org.apache.hadoop.conf.Configuration; import org.apache.hadoop.mapred.Counters; import org.apache.hadoop.mapred.Counters.Counter; -import org.apache.hadoop.mapred.JobClient; import org.apache.hadoop.mapred.JobID; -import org.apache.hadoop.mapred.RunningJob; import org.apache.hadoop.mapred.TaskReport; import org.apache.hadoop.mapred.jobcontrol.Job; import org.apache.hadoop.mapreduce.TaskType; @@ -70,7 +68,7 @@ public final class MRJobStats extends Jo } public static final String SUCCESS_HEADER = "JobId\tMaps\tReduces\t" + - "MaxMapTime\tMinMapTIme\tAvgMapTime\tMedianMapTime\tMaxReduceTime\t" + + "MaxMapTime\tMinMapTime\tAvgMapTime\tMedianMapTime\tMaxReduceTime\t" + "MinReduceTime\tAvgReduceTime\tMedianReducetime\tAlias\tFeature\tOutputs"; public static final String FAILURE_HEADER = "JobId\tAlias\tFeature\tMessage\tOutputs"; @@ -349,13 +347,13 @@ public final class MRJobStats extends Jo } void addMapReduceStatistics(Job job) { - TaskReport[] maps = null; + Iterator<TaskReport> maps = null; try { maps = HadoopShims.getTaskReports(job, TaskType.MAP); } catch (IOException e) { LOG.warn("Failed to get map task report", e); } - TaskReport[] reduces = null; + Iterator<TaskReport> reduces = null; try { reduces = HadoopShims.getTaskReports(job, TaskType.REDUCE); } catch (IOException e) { @@ -364,21 +362,22 @@ public final class MRJobStats extends Jo addMapReduceStatistics(maps, reduces); } - private TaskStat getTaskStat(TaskReport[] tasks) { - int size = tasks.length; + private TaskStat getTaskStat(Iterator<TaskReport> tasks) { + int size = 0; long max = 0; long min = Long.MAX_VALUE; long median = 0; long total = 0; - long durations[] = new long[size]; + List<Long> durations = new ArrayList<Long>(); - for (int i = 0; i < tasks.length; i++) { - TaskReport rpt = tasks[i]; + while(tasks.hasNext()){ + TaskReport rpt = tasks.next(); long duration = rpt.getFinishTime() - rpt.getStartTime(); - durations[i] = duration; + durations.add(duration); max = (duration > max) ? duration : max; min = (duration < min) ? duration : min; total += duration; + size++; } long avg = total / size; @@ -387,8 +386,8 @@ public final class MRJobStats extends Jo return new TaskStat(size, max, min, avg, median); } - private void addMapReduceStatistics(TaskReport[] maps, TaskReport[] reduces) { - if (maps != null && maps.length > 0) { + private void addMapReduceStatistics(Iterator<TaskReport> maps, Iterator<TaskReport> reduces) { + if (maps != null && maps.hasNext()) { TaskStat st = getTaskStat(maps); setMapStat(st.size, st.max, st.min, st.avg, st.median); } else { @@ -398,7 +397,7 @@ public final class MRJobStats extends Jo } } - if (reduces != null && reduces.length > 0) { + if (reduces != null && reduces.hasNext()) { TaskStat st = getTaskStat(reduces); setReduceStat(st.size, st.max, st.min, st.avg, st.median); } else { Modified: pig/branches/spark/src/org/apache/pig/tools/pigstats/tez/TezScriptState.java URL: http://svn.apache.org/viewvc/pig/branches/spark/src/org/apache/pig/tools/pigstats/tez/TezScriptState.java?rev=1642132&r1=1642131&r2=1642132&view=diff ============================================================================== --- pig/branches/spark/src/org/apache/pig/tools/pigstats/tez/TezScriptState.java (original) +++ pig/branches/spark/src/org/apache/pig/tools/pigstats/tez/TezScriptState.java Thu Nov 27 12:49:54 2014 @@ -20,9 +20,10 @@ package org.apache.pig.tools.pigstats.te import java.util.ArrayList; import java.util.BitSet; import java.util.Collections; -import java.util.Iterator; +import java.util.HashSet; import java.util.List; import java.util.Map; +import java.util.Set; import org.apache.commons.logging.Log; import org.apache.commons.logging.LogFactory; @@ -31,15 +32,21 @@ import org.apache.pig.LoadFunc; import org.apache.pig.backend.hadoop.executionengine.physicalLayer.relationalOperators.POLoad; import org.apache.pig.backend.hadoop.executionengine.physicalLayer.relationalOperators.POStore; import org.apache.pig.backend.hadoop.executionengine.physicalLayer.util.PlanHelper; -import org.apache.pig.backend.hadoop.executionengine.tez.TezOperPlan; -import org.apache.pig.backend.hadoop.executionengine.tez.TezOperator; +import org.apache.pig.backend.hadoop.executionengine.tez.plan.TezOpPlanVisitor; +import org.apache.pig.backend.hadoop.executionengine.tez.plan.TezOperPlan; +import org.apache.pig.backend.hadoop.executionengine.tez.plan.TezOperator; +import org.apache.pig.backend.hadoop.executionengine.tez.plan.TezPlanContainerNode; +import org.apache.pig.impl.plan.DependencyOrderWalker; +import org.apache.pig.impl.plan.OperatorKey; +import org.apache.pig.impl.plan.OperatorPlan; import org.apache.pig.impl.plan.VisitorException; import org.apache.pig.newplan.Operator; import org.apache.pig.tools.pigstats.JobStats; +import org.apache.pig.tools.pigstats.PigProgressNotificationListener; import org.apache.pig.tools.pigstats.PigStats; -import org.apache.pig.tools.pigstats.PigStats.JobGraph; import org.apache.pig.tools.pigstats.ScriptState; +import com.google.common.collect.Lists; import com.google.common.collect.Maps; /** @@ -52,9 +59,8 @@ import com.google.common.collect.Maps; public class TezScriptState extends ScriptState { private static final Log LOG = LogFactory.getLog(TezScriptState.class); - private Map<TezOperator, String> featureMap = null; - private Map<TezOperator, String> aliasMap = Maps.newHashMap(); - private Map<TezOperator, String> aliasLocationMap = Maps.newHashMap(); + private List<PigTezProgressNotificationListener> tezListeners = Lists.newArrayList(); + private Map<String, TezDAGScriptInfo> dagScriptInfo = Maps.newHashMap(); public TezScriptState(String id) { super(id); @@ -64,13 +70,48 @@ public class TezScriptState extends Scri return (TezScriptState) ScriptState.get(); } - public void addSettingsToConf(TezOperator tezOp, Configuration conf) { + @Override + public void registerListener(PigProgressNotificationListener listener) { + super.registerListener(listener); + if (listener instanceof PigTezProgressNotificationListener) { + tezListeners.add((PigTezProgressNotificationListener) listener); + } + } + + public void dagLaunchNotification(String dagName, OperatorPlan<?> dagPlan, int numVerticesToLaunch) { + for (PigTezProgressNotificationListener listener: tezListeners) { + listener.dagLaunchNotification(id, dagName, dagPlan, numVerticesToLaunch); + } + } + + public void dagStartedNotification(String dagName, String assignedApplicationId) { + for (PigTezProgressNotificationListener listener: tezListeners) { + listener.dagStartedNotification(id, dagName, assignedApplicationId); + } + } + + public void dagProgressNotification(String dagName, int numVerticesCompleted, int progress) { + for (PigTezProgressNotificationListener listener: tezListeners) { + listener.dagProgressNotification(id, dagName, numVerticesCompleted, progress); + } + } + + public void dagCompletedNotification(String dagName, TezDAGStats tezDAGStats) { + for (PigTezProgressNotificationListener listener: tezListeners) { + listener.dagCompletedNotification(id, dagName, tezDAGStats.isSuccessful(), tezDAGStats); + } + } + + public void addDAGSettingsToConf(Configuration conf) { LOG.info("Pig script settings are added to the job"); conf.set(PIG_PROPERTY.HADOOP_VERSION.toString(), getHadoopVersion()); conf.set(PIG_PROPERTY.VERSION.toString(), getPigVersion()); conf.set(PIG_PROPERTY.SCRIPT_ID.toString(), id); conf.set(PIG_PROPERTY.SCRIPT.toString(), getScript()); conf.set(PIG_PROPERTY.COMMAND_LINE.toString(), getCommandLine()); + } + + public void addVertexSettingsToConf(String dagName, TezOperator tezOp, Configuration conf) { try { List<POStore> stores = PlanHelper.getPhysicalOperators(tezOp.plan, POStore.class); @@ -95,8 +136,8 @@ public class TezScriptState extends Scri LOG.warn("unable to get the map loads", e); } - setPigFeature(tezOp, conf); - setJobParents(tezOp, conf); + setPigFeature(dagName, tezOp, conf); + setJobParents(dagName, tezOp, conf); conf.set("mapreduce.workflow.id", "pig_" + id); conf.set("mapreduce.workflow.name", getFileName().isEmpty() ? "default" : getFileName()); @@ -116,32 +157,30 @@ public class TezScriptState extends Scri } } - private void setPigFeature(TezOperator tezOp, Configuration conf) { - conf.set(PIG_PROPERTY.JOB_FEATURE.toString(), getPigFeature(tezOp)); + private void setPigFeature(String dagName, TezOperator tezOp, Configuration conf) { + if (tezOp.isVertexGroup()) { + return; + } + TezDAGScriptInfo dagInfo = getDAGScriptInfo(dagName); + conf.set(PIG_PROPERTY.JOB_FEATURE.toString(), dagInfo.getPigFeatures(tezOp)); if (scriptFeatures != 0) { conf.set(PIG_PROPERTY.SCRIPT_FEATURES.toString(), String.valueOf(scriptFeatures)); } - conf.set(PIG_PROPERTY.JOB_ALIAS.toString(), getAlias(tezOp)); - conf.set(PIG_PROPERTY.JOB_ALIAS_LOCATION.toString(), getAliasLocation(tezOp)); + conf.set(PIG_PROPERTY.JOB_ALIAS.toString(), dagInfo.getAlias(tezOp)); + conf.set(PIG_PROPERTY.JOB_ALIAS_LOCATION.toString(), dagInfo.getAliasLocation(tezOp)); } - private void setJobParents(TezOperator tezOp, Configuration conf) { + private void setJobParents(String dagName, TezOperator tezOp, Configuration conf) { + if (tezOp.isVertexGroup()) { + return; + } // PigStats maintains a job DAG with the job id being updated // upon available. Therefore, before a job is submitted, the ids // of its parent jobs are already available. - JobGraph jg = PigStats.get().getJobGraph(); - JobStats js = null; - Iterator<JobStats> iter = jg.iterator(); - while (iter.hasNext()) { - JobStats job = iter.next(); - if (job.getName().equals(tezOp.getOperatorKey().toString())) { - js = job; - break; - } - } + JobStats js = ((TezPigScriptStats)PigStats.get()).getVertexStats(dagName, tezOp.getOperatorKey().toString()); if (js != null) { - List<Operator> preds = jg.getPredecessors(js); + List<Operator> preds = js.getPlan().getPredecessors(js); if (preds != null) { StringBuilder sb = new StringBuilder(); for (Operator op : preds) { @@ -154,87 +193,173 @@ public class TezScriptState extends Scri } } - public String getAlias(TezOperator tezOp) { - if (!aliasMap.containsKey(tezOp)) { - setAlias(tezOp); - } - return aliasMap.get(tezOp); + public TezDAGScriptInfo setDAGScriptInfo(TezPlanContainerNode tezPlanNode) { + TezDAGScriptInfo info = new TezDAGScriptInfo(tezPlanNode.getTezOperPlan()); + dagScriptInfo.put(tezPlanNode.getOperatorKey().toString(), info); + return info; } - private void setAlias(TezOperator tezOp) { - ArrayList<String> alias = new ArrayList<String>(); - String aliasLocationStr = ""; - try { - ArrayList<String> aliasLocation = new ArrayList<String>(); - new AliasVisitor(tezOp.plan, alias, aliasLocation).visit(); - aliasLocationStr += LoadFunc.join(aliasLocation, ","); - if (!alias.isEmpty()) { - Collections.sort(alias); - } - } catch (VisitorException e) { - LOG.warn("unable to get alias", e); - } - aliasMap.put(tezOp, LoadFunc.join(alias, ",")); - aliasLocationMap.put(tezOp, aliasLocationStr); + public TezDAGScriptInfo getDAGScriptInfo(String dagName) { + return dagScriptInfo.get(dagName); } - public String getAliasLocation(TezOperator tezOp) { - if (!aliasLocationMap.containsKey(tezOp)) { - setAlias(tezOp); - } - return aliasLocationMap.get(tezOp); - } + static class TezDAGScriptInfo { - public String getPigFeature(TezOperator tezOp) { - if (featureMap == null) { - featureMap = Maps.newHashMap(); - } + private static final Log LOG = LogFactory.getLog(TezDAGScriptInfo.class); + private TezOperPlan tezPlan; + private String alias; + private String aliasLocation; + private String features; + + private Map<OperatorKey, String> featuresMap = Maps.newHashMap(); + private Map<OperatorKey, String> aliasMap = Maps.newHashMap(); + private Map<OperatorKey, String> aliasLocationMap = Maps.newHashMap(); + + class DAGAliasVisitor extends TezOpPlanVisitor { + + private Set<String> aliases; + private Set<String> aliasLocations; + private BitSet featureSet; + + public DAGAliasVisitor(TezOperPlan plan) { + super(plan, new DependencyOrderWalker<TezOperator, TezOperPlan>(plan)); + this.aliases = new HashSet<String>(); + this.aliasLocations = new HashSet<String>(); + this.featureSet = new BitSet(); + } + + @Override + public void visitTezOp(TezOperator tezOp) throws VisitorException { + if (tezOp.isVertexGroup()) { + featureSet.set(PIG_FEATURE.UNION.ordinal()); + return; + } + ArrayList<String> aliasList = new ArrayList<String>(); + String aliasLocationStr = ""; + try { + ArrayList<String> aliasLocationList = new ArrayList<String>(); + new AliasVisitor(tezOp.plan, aliasList, aliasLocationList).visit(); + aliasLocationStr += LoadFunc.join(aliasLocationList, ","); + if (!aliasList.isEmpty()) { + Collections.sort(aliasList); + aliases.addAll(aliasList); + aliasLocations.addAll(aliasLocationList); + } + } catch (VisitorException e) { + LOG.warn("unable to get alias", e); + } + aliasMap.put(tezOp.getOperatorKey(), LoadFunc.join(aliasList, ",")); + aliasLocationMap.put(tezOp.getOperatorKey(), aliasLocationStr); - String retStr = featureMap.get(tezOp); - if (retStr == null) { - BitSet feature = new BitSet(); - feature.clear(); - if (tezOp.isSkewedJoin()) { - feature.set(PIG_FEATURE.SKEWED_JOIN.ordinal()); - } - if (tezOp.isGlobalSort()) { - feature.set(PIG_FEATURE.ORDER_BY.ordinal()); - } - if (tezOp.isSampler()) { - feature.set(PIG_FEATURE.SAMPLER.ordinal()); - } - if (tezOp.isIndexer()) { - feature.set(PIG_FEATURE.INDEXER.ordinal()); - } - if (tezOp.isCogroup()) { - feature.set(PIG_FEATURE.COGROUP.ordinal()); - } - if (tezOp.isGroupBy()) { - feature.set(PIG_FEATURE.GROUP_BY.ordinal()); - } - if (tezOp.isRegularJoin()) { - feature.set(PIG_FEATURE.HASH_JOIN.ordinal()); - } - if (tezOp.isUnion()) { - feature.set(PIG_FEATURE.UNION.ordinal()); + + BitSet feature = new BitSet(); + feature.clear(); + if (tezOp.isSkewedJoin()) { + feature.set(PIG_FEATURE.SKEWED_JOIN.ordinal()); + } + if (tezOp.isGlobalSort()) { + feature.set(PIG_FEATURE.ORDER_BY.ordinal()); + } + if (tezOp.isSampler()) { + feature.set(PIG_FEATURE.SAMPLER.ordinal()); + } + if (tezOp.isIndexer()) { + feature.set(PIG_FEATURE.INDEXER.ordinal()); + } + if (tezOp.isCogroup()) { + feature.set(PIG_FEATURE.COGROUP.ordinal()); + } + if (tezOp.isGroupBy()) { + feature.set(PIG_FEATURE.GROUP_BY.ordinal()); + } + if (tezOp.isRegularJoin()) { + feature.set(PIG_FEATURE.HASH_JOIN.ordinal()); + } + if (tezOp.isUnion()) { + feature.set(PIG_FEATURE.UNION.ordinal()); + } + if (tezOp.isNative()) { + feature.set(PIG_FEATURE.NATIVE.ordinal()); + } + if (tezOp.isLimit() || tezOp.isLimitAfterSort()) { + feature.set(PIG_FEATURE.LIMIT.ordinal()); + } + try { + new FeatureVisitor(tezOp.plan, feature).visit(); + } catch (VisitorException e) { + LOG.warn("Feature visitor failed", e); + } + StringBuilder sb = new StringBuilder(); + for (int i=feature.nextSetBit(0); i>=0; i=feature.nextSetBit(i+1)) { + if (sb.length() > 0) sb.append(","); + sb.append(PIG_FEATURE.values()[i].name()); + } + featuresMap.put(tezOp.getOperatorKey(), sb.toString()); + for (int i=0; i < feature.length(); i++) { + if (feature.get(i)) { + featureSet.set(i); + } + } } - if (tezOp.isNative()) { - feature.set(PIG_FEATURE.NATIVE.ordinal()); + + @Override + public void visit() throws VisitorException { + super.visit(); + if (!aliases.isEmpty()) { + ArrayList<String> aliasList = new ArrayList<String>(aliases); + ArrayList<String> aliasLocationList = new ArrayList<String>(aliasLocations); + Collections.sort(aliasList); + Collections.sort(aliasLocationList); + alias = LoadFunc.join(aliasList, ","); + aliasLocation = LoadFunc.join(aliasLocationList, ","); + } + StringBuilder sb = new StringBuilder(); + for (int i = featureSet.nextSetBit(0); i >= 0; i = featureSet.nextSetBit(i+1)) { + if (sb.length() > 0) sb.append(","); + sb.append(PIG_FEATURE.values()[i].name()); + } + features = sb.toString(); } + + } + + public TezDAGScriptInfo(TezOperPlan tezPlan) { + this.tezPlan = tezPlan; + initialize(); + } + + private void initialize() { try { - new FeatureVisitor(tezOp.plan, feature).visit(); + new DAGAliasVisitor(tezPlan).visit(); } catch (VisitorException e) { - LOG.warn("Feature visitor failed", e); + LOG.warn("Cannot calculate alias information for DAG", e); } - StringBuilder sb = new StringBuilder(); - for (int i=feature.nextSetBit(0); i>=0; i=feature.nextSetBit(i+1)) { - if (sb.length() > 0) sb.append(","); - sb.append(PIG_FEATURE.values()[i].name()); - } - retStr = sb.toString(); - featureMap.put(tezOp, retStr); } - return retStr; + + public String getAlias() { + return alias; + } + + public String getAliasLocation() { + return aliasLocation; + } + + public String getPigFeatures() { + return features; + } + + public String getAlias(TezOperator tezOp) { + return aliasMap.get(tezOp.getOperatorKey()); + } + + public String getAliasLocation(TezOperator tezOp) { + return aliasLocationMap.get(tezOp.getOperatorKey()); + } + + public String getPigFeatures(TezOperator tezOp) { + return featuresMap.get(tezOp.getOperatorKey()); + } + } } Modified: pig/branches/spark/src/org/apache/pig/validator/BlackAndWhitelistFilter.java URL: http://svn.apache.org/viewvc/pig/branches/spark/src/org/apache/pig/validator/BlackAndWhitelistFilter.java?rev=1642132&r1=1642131&r2=1642132&view=diff ============================================================================== --- pig/branches/spark/src/org/apache/pig/validator/BlackAndWhitelistFilter.java (original) +++ pig/branches/spark/src/org/apache/pig/validator/BlackAndWhitelistFilter.java Thu Nov 27 12:49:54 2014 @@ -35,12 +35,17 @@ public final class BlackAndWhitelistFilt private static final Splitter SPLITTER = Splitter.on(',').trimResults() .omitEmptyStrings(); - private final PigServer pigServer; + private final PigContext context; private final Set<String> whitelist; private final Set<String> blacklist; public BlackAndWhitelistFilter(PigServer pigServer) { - this.pigServer = pigServer; + this(pigServer.getPigContext()); + } + + public BlackAndWhitelistFilter(PigContext context) { + this.context = context; + whitelist = Sets.newHashSet(); blacklist = Sets.newHashSet(); @@ -48,7 +53,6 @@ public final class BlackAndWhitelistFilt } private void init() { - PigContext context = pigServer.getPigContext(); String whitelistConfig = context.getProperties().getProperty(PigConfiguration.PIG_WHITELIST); if (whitelistConfig != null) { Modified: pig/branches/spark/src/org/apache/pig/validator/PigCommandFilter.java URL: http://svn.apache.org/viewvc/pig/branches/spark/src/org/apache/pig/validator/PigCommandFilter.java?rev=1642132&r1=1642131&r2=1642132&view=diff ============================================================================== --- pig/branches/spark/src/org/apache/pig/validator/PigCommandFilter.java (original) +++ pig/branches/spark/src/org/apache/pig/validator/PigCommandFilter.java Thu Nov 27 12:49:54 2014 @@ -34,7 +34,7 @@ import org.apache.pig.impl.logicalLayer. public interface PigCommandFilter { public enum Command { - FS, LS, SH, MAPREDUCE, REGISTER, SET, CAT, CD, DUMP, KILL, PWD, MV, CP, COPYTOLOCAL, COPYFROMLOCAL, MKDIR, RM, RMF, ILLUSTRATE + DEFINE, DECLARE, DEFAULT, EXPLAIN, EXEC, FS, IMPORT, LS, SH, MAPREDUCE, REGISTER, SET, CAT, CD, DUMP, KILL, PWD, MV, CP, COPYTOLOCAL, COPYFROMLOCAL, MKDIR, RM, RMF, RUN, ILLUSTRATE } /** Modified: pig/branches/spark/src/python/streaming/controller.py URL: http://svn.apache.org/viewvc/pig/branches/spark/src/python/streaming/controller.py?rev=1642132&r1=1642131&r2=1642132&view=diff ============================================================================== --- pig/branches/spark/src/python/streaming/controller.py (original) +++ pig/branches/spark/src/python/streaming/controller.py Thu Nov 27 12:49:54 2014 @@ -96,7 +96,7 @@ class PythonStreamingController: if is_illustrate or udf_logging.udf_log_level != logging.DEBUG: #Only log output for illustrate after we get the flag to capture output. - sys.stdout = open("/dev/null", 'w') + sys.stdout = open(os.devnull, 'w') else: sys.stdout = self.output_stream @@ -157,7 +157,14 @@ class PythonStreamingController: input_str = input_stream.readline() while input_str.endswith(END_RECORD_DELIM) == False: - input_str += input_stream.readline() + line = input_stream.readline() + if line == '': + input_str = '' + break + input_str += line + + if input_str == '': + return END_OF_STREAM if input_str == TURN_ON_OUTPUT_CAPTURING: logging.debug("Turned on Output Capturing") @@ -297,6 +304,12 @@ def _deserialize_collection(input_str, r else: return list_result +def wrap_tuple(o, serialized_item): + if type(o) != tuple: + return WRAPPED_TUPLE_START + serialized_item + WRAPPED_TUPLE_END + else: + return serialized_item + def serialize_output(output, utfEncodeAllFields=False): """ @param utfEncodeStrings - Generally we want to utf encode only strings. But for @@ -314,7 +327,7 @@ def serialize_output(output, utfEncodeAl WRAPPED_TUPLE_END) elif output_type == list: return (WRAPPED_BAG_START + - WRAPPED_FIELD_DELIMITER.join([serialize_output(o, utfEncodeAllFields) for o in output]) + + WRAPPED_FIELD_DELIMITER.join([wrap_tuple(o, serialize_output(o, utfEncodeAllFields)) for o in output]) + WRAPPED_BAG_END) elif output_type == dict: return (WRAPPED_MAP_START + Modified: pig/branches/spark/test/e2e/harness/TestDriver.pm URL: http://svn.apache.org/viewvc/pig/branches/spark/test/e2e/harness/TestDriver.pm?rev=1642132&r1=1642131&r2=1642132&view=diff ============================================================================== --- pig/branches/spark/test/e2e/harness/TestDriver.pm (original) +++ pig/branches/spark/test/e2e/harness/TestDriver.pm Thu Nov 27 12:49:54 2014 @@ -717,7 +717,7 @@ sub runTestGroup() { $testStatuses->{$testName} = $failedStr; } - $msg .= "\nEnding test $testName at " . time ."\n"; + $msg .= " at " . time . "\nEnding test $testName at " . time ."\n"; print $subLog $msg; $duration = $endTime - $beginTime; $dbinfo{'duration'} = $duration; Modified: pig/branches/spark/test/e2e/pig/build.xml URL: http://svn.apache.org/viewvc/pig/branches/spark/test/e2e/pig/build.xml?rev=1642132&r1=1642131&r2=1642132&view=diff ============================================================================== --- pig/branches/spark/test/e2e/pig/build.xml (original) +++ pig/branches/spark/test/e2e/pig/build.xml Thu Nov 27 12:49:54 2014 @@ -18,15 +18,14 @@ <project name="TestHarnessPigTests" default="test"> <property name="pig.dir" value="${basedir}/../../.."/> - <property name="ivy.dir" location="${pig.dir}/ivy" /> + <property name="pig.base.dir" value="${basedir}/../../.."/> + <property name="ivy.dir" location="${pig.base.dir}/ivy" /> <loadproperties srcfile="${ivy.dir}/libraries.properties"/> - <property name="jython.jar" - value="${pig.dir}/build/ivy/lib/Pig/jython-standalone-${jython.version}.jar"/> - <property name="jruby.jar" - value="${pig.dir}/build/ivy/lib/Pig/jruby-complete-${jruby.version}.jar"/> + <property name="piggybank.jar" + value="${pig.base.dir}/contrib/piggybank/java/piggybank.jar"/> <property name="hive.lib.dir" - value="${pig.dir}/build/ivy/lib/Pig"/> + value="${pig.base.dir}/build/ivy/lib/Pig"/> <condition property="hive.hadoop.shims.version" value="0.23" else="0.20S"> <equals arg1="${hadoopversion}" arg2="23" /> @@ -216,23 +215,14 @@ <!-- Check that the necessary properties are setup --> <target name="property-check"> - <fail message="Please set the property harness.cluster.conf to the conf directory of your hadoop installation or harness.hadoop.home to HADOOP_HOME for your Hadoop installation."> + <fail message="Please set the property harness.cluster.conf to the conf directory of your hadoop installation,harness.hadoop.home to HADOOP_HOME for your Hadoop installation and harness.cluster.bin to the binary executable of your hadoop installation."> <condition> <not> - <or> + <and> <isset property="harness.cluster.conf"/> <isset property="harness.hadoop.home"/> - </or> - </not> - </condition> - </fail> - <fail message="Please set the property harness.cluster.bin to the binary executable of your hadoop installation or harness.hadoop.home to HADOOP_HOME for your Hadoop installation."> - <condition> - <not> - <or> <isset property="harness.cluster.bin"/> - <isset property="harness.hadoop.home"/> - </or> + </and> </not> </condition> </fail> @@ -279,6 +269,7 @@ <env key="HADOOP_CONF_DIR" value="${harness.cluster.conf}"/> <env key="PIG_USE_PYTHON" value="${harness.use.python}"/> <env key="PH_CLUSTER_BIN" value="${harness.cluster.bin}"/> + <env key="PH_PIGGYBANK_JAR" value="${piggybank.jar}"/> <env key="PH_HIVE_LIB_DIR" value="${hive.lib.dir}"/> <env key="PH_HIVE_VERSION" value="${hive.version}"/> <env key="PH_HIVE_SHIMS_VERSION" value="${hive.hadoop.shims.version}"/> Modified: pig/branches/spark/test/e2e/pig/conf/default.conf URL: http://svn.apache.org/viewvc/pig/branches/spark/test/e2e/pig/conf/default.conf?rev=1642132&r1=1642131&r2=1642132&view=diff ============================================================================== --- pig/branches/spark/test/e2e/pig/conf/default.conf (original) +++ pig/branches/spark/test/e2e/pig/conf/default.conf Thu Nov 27 12:49:54 2014 @@ -49,7 +49,7 @@ $cfg = { , 'testconfigpath' => "$ENV{HADOOP_CONF_DIR}" , 'funcjarPath' => "$ENV{PH_ROOT}/lib/java" , 'paramPath' => "$ENV{PH_ROOT}/paramfiles" - , 'piggybankjarPath' => "$ENV{PH_PIG}/contrib/piggybank/java" + , 'piggybankjarPath' => "$ENV{PH_PIGGYBANK_JAR}" , 'pigpath' => "$ENV{PH_PIG}" , 'oldpigpath' => "$ENV{PH_OLDPIG}" , 'hcatbin' => "$ENV{HCAT_BIN}" Modified: pig/branches/spark/test/e2e/pig/conf/local.conf URL: http://svn.apache.org/viewvc/pig/branches/spark/test/e2e/pig/conf/local.conf?rev=1642132&r1=1642131&r2=1642132&view=diff ============================================================================== --- pig/branches/spark/test/e2e/pig/conf/local.conf (original) +++ pig/branches/spark/test/e2e/pig/conf/local.conf Thu Nov 27 12:49:54 2014 @@ -15,7 +15,7 @@ # limitations under the License. my $me = `whoami`; -chomp $me; +$me =~ s/[^a-zA-Z0-9]*//g; # The contents of this file can be rewritten to fit your installation. # Also, you can define the following environment variables and set things up as in the test setup @@ -40,13 +40,13 @@ $cfg = { #TEST , 'benchmarkPath' => "$ENV{PH_OUT}/benchmarks" , 'scriptPath' => "$ENV{PH_ROOT}/libexec" - , 'tmpPath' => '/tmp/pigtest' + , 'tmpPath' => 'tmp/pigtest' #PIG , 'testconfigpath' => "$ENV{PH_CLUSTER}/conf/" , 'funcjarPath' => "$ENV{PH_ROOT}/lib/java" , 'paramPath' => "$ENV{PH_ROOT}/paramfiles" - , 'piggybankjarPath' => "$ENV{PH_PIG}/contrib/piggybank/java" + , 'piggybankjarPath' => "$ENV{PH_PIGGYBANK_JAR}" , 'pigpath' => "$ENV{PH_PIG}" , 'oldpigpath' => "$ENV{PH_OLDPIG}" , 'exectype' => 'local' Modified: pig/branches/spark/test/e2e/pig/conf/rpm.conf URL: http://svn.apache.org/viewvc/pig/branches/spark/test/e2e/pig/conf/rpm.conf?rev=1642132&r1=1642131&r2=1642132&view=diff ============================================================================== --- pig/branches/spark/test/e2e/pig/conf/rpm.conf (original) +++ pig/branches/spark/test/e2e/pig/conf/rpm.conf Thu Nov 27 12:49:54 2014 @@ -15,7 +15,7 @@ # limitations under the License. my $me = `whoami`; -chomp $me; +$me =~ s/[^a-zA-Z0-9]*//g; # The contents of this file can be rewritten to fit your installation. # Also, you can define the following environment variables and set things up as in the test setup @@ -43,14 +43,13 @@ $cfg = { #TEST , 'benchmarkPath' => "$ENV{PH_OUT}/benchmarks" , 'scriptPath' => "$ENV{PH_ROOT}/libexec" - , 'tmpPath' => '/tmp/pigtest' + , 'tmpPath' => 'tmp/pigtest' #PIG , 'testconfigpath' => "$ENV{HADOOP_CONF_DIR}" , 'funcjarPath' => "$ENV{PH_ROOT}/lib/java" - , 'piggybankjarPath' => "/usr/lib/pig" + , 'piggybankjarPath' => "/usr/lib/pig/lib/piggybank.jar" , 'paramPath' => "$ENV{PH_ROOT}/paramfiles" - , 'piggybankjarPath' => "/usr/lib/pig" , 'pigpath' => "/usr" , 'oldpigpath' => "$ENV{PH_OLDPIG}" , 'hcatbin' => "$ENV{HCAT_BIN}" Modified: pig/branches/spark/test/e2e/pig/conf/tez.conf URL: http://svn.apache.org/viewvc/pig/branches/spark/test/e2e/pig/conf/tez.conf?rev=1642132&r1=1642131&r2=1642132&view=diff ============================================================================== --- pig/branches/spark/test/e2e/pig/conf/tez.conf (original) +++ pig/branches/spark/test/e2e/pig/conf/tez.conf Thu Nov 27 12:49:54 2014 @@ -15,7 +15,7 @@ # limitations under the License. my $me = `whoami`; -chomp $me; +$me =~ s/[^a-zA-Z0-9]*//g; # The contents of this file can be rewritten to fit your installation. # Also, you can define the following environment variables and set things up as in the test setup @@ -43,13 +43,13 @@ $cfg = { #TEST , 'benchmarkPath' => "$ENV{PH_OUT}/benchmarks" , 'scriptPath' => "$ENV{PH_ROOT}/libexec" - , 'tmpPath' => '/tmp/pigtest' + , 'tmpPath' => 'tmp/pigtest' #PIG , 'testconfigpath' => "$ENV{HADOOP_CONF_DIR}" , 'funcjarPath' => "$ENV{PH_ROOT}/lib/java" , 'paramPath' => "$ENV{PH_ROOT}/paramfiles" - , 'piggybankjarPath' => "$ENV{PH_PIG}/contrib/piggybank/java" + , 'piggybankjarPath' => "$ENV{PH_PIGGYBANK_JAR}" , 'pigpath' => "$ENV{PH_PIG}" , 'oldpigpath' => "$ENV{PH_OLDPIG}" , 'hcatbin' => "$ENV{HCAT_BIN}" Modified: pig/branches/spark/test/e2e/pig/deployers/LocalDeployer.pm URL: http://svn.apache.org/viewvc/pig/branches/spark/test/e2e/pig/deployers/LocalDeployer.pm?rev=1642132&r1=1642131&r2=1642132&view=diff ============================================================================== --- pig/branches/spark/test/e2e/pig/deployers/LocalDeployer.pm (original) +++ pig/branches/spark/test/e2e/pig/deployers/LocalDeployer.pm Thu Nov 27 12:49:54 2014 @@ -120,11 +120,6 @@ sub generateData 'rows' => 10000, 'outfile' => "singlefile/studenttab10k", }, { - 'name' => "studenttab20m", - 'filetype' => "studenttab", - 'rows' => 20000000, - 'outfile' => "singlefile/studenttab20m", - }, { 'name' => "votertab10k", 'filetype' => "votertab", 'rows' => 10000, @@ -210,11 +205,6 @@ sub generateData 'rows' => 5000, 'outfile' => "types/numbers.txt", }, { - 'name' => "biggish", - 'filetype' => "biggish", - 'rows' => 1000000, - 'outfile' => "singlefile/biggish", - }, { 'name' => "prerank", 'filetype' => "ranking", 'rows' => 30, Modified: pig/branches/spark/test/e2e/pig/drivers/TestDriverPig.pm URL: http://svn.apache.org/viewvc/pig/branches/spark/test/e2e/pig/drivers/TestDriverPig.pm?rev=1642132&r1=1642131&r2=1642132&view=diff ============================================================================== --- pig/branches/spark/test/e2e/pig/drivers/TestDriverPig.pm (original) +++ pig/branches/spark/test/e2e/pig/drivers/TestDriverPig.pm Thu Nov 27 12:49:54 2014 @@ -73,7 +73,7 @@ sub replaceParameters $cmd =~ s/:INPATH:/$testCmd->{'inpathbase'}/g; $cmd =~ s/:OUTPATH:/$outfile/g; $cmd =~ s/:FUNCPATH:/$testCmd->{'funcjarPath'}/g; - $cmd =~ s/:PIGGYBANKPATH:/$testCmd->{'piggybankjarPath'}/g; + $cmd =~ s/:PIGGYBANKJAR:/$testCmd->{'piggybankjarPath'}/g; $cmd =~ s/:PIGPATH:/$testCmd->{'pigpath'}/g; $cmd =~ s/:RUNID:/$testCmd->{'UID'}/g; $cmd =~ s/:USRHOMEPATH:/$testCmd->{'userhomePath'}/g; @@ -672,6 +672,9 @@ sub generateBenchmark if ((Util::isWindows()||Util::isCygwin()) && $testCmd->{'pig_win'}) { $modifiedTestCmd{'pig'} = $testCmd->{'pig_win'}; } + if ( $testCmd->{'hadoopversion'} == '23' && $testCmd->{'pig23'}) { + $modifiedTestCmd{'pig'} = $testCmd->{'pig23'}; + } # Change so we're looking at the old version of Pig if (defined $testCmd->{'oldpigpath'} && $testCmd->{'oldpigpath'} ne "") { $modifiedTestCmd{'pigpath'} = $testCmd->{'oldpigpath'}; Modified: pig/branches/spark/test/e2e/pig/tests/bigdata.conf URL: http://svn.apache.org/viewvc/pig/branches/spark/test/e2e/pig/tests/bigdata.conf?rev=1642132&r1=1642131&r2=1642132&view=diff ============================================================================== --- pig/branches/spark/test/e2e/pig/tests/bigdata.conf (original) +++ pig/branches/spark/test/e2e/pig/tests/bigdata.conf Thu Nov 27 12:49:54 2014 @@ -28,11 +28,12 @@ $cfg = { 'groups' => [ { - 'name' => 'BigData', + 'name' => 'BigData_Checkin', 'tests' => [ { 'num' => 1, ,'floatpostprocess' => 1 + ,'java_params' => ['-Dpig.tez.auto.parallelism=false'] ,'delimiter' => ' ', 'pig' => q\a = load ':INPATH:/singlefile/studenttab20m' as (name, age, gpa); b = load ':INPATH:/singlefile/votertab10k' as (name, age, registration, contributions); @@ -48,18 +49,6 @@ store i into ':OUTPATH:';\, { 'num' => 2, 'pig' => q\a = load ':INPATH:/singlefile/studenttab20m' as (name, age, gpa); -a1 = filter a by age < '50'; -b = group a1 by (name, age); -c = foreach b generate group as g, AVG(a1.gpa); -d = filter c by $1 > 3.0; -d1 = foreach d generate g.$0 as name, g.$1 as age, $1 as gpa; -e = group d1 by name; -f = foreach e generate group, AVG(d1.age); -store f into ':OUTPATH:';\, - }, - { - 'num' => 3, - 'pig' => q\a = load ':INPATH:/singlefile/studenttab20m' as (name, age, gpa); b = load ':INPATH:/singlefile/votertab10k' as (name, age, registration, contributions); c = foreach a generate name; d = foreach b generate name; @@ -68,57 +57,84 @@ f = union d, e; g = distinct f parallel 20; store g into ':OUTPATH:';\, }, - { - 'num' => 4, - 'pig' => q\a = load ':INPATH:/singlefile/studenttab20m' as (name, age, gpa); + ] + }, + { + 'name' => 'BigData_Group', + 'tests' => [ + { + 'num' => 1, + 'pig' => q\a = load ':INPATH:/singlefile/studenttab20m' as (name, age, gpa); +a1 = filter a by age < '50'; +b = group a1 by (name, age); +c = foreach b generate group as g, AVG(a1.gpa); +d = filter c by $1 > 3.0; +d1 = foreach d generate g.$0 as name, g.$1 as age, $1 as gpa; +e = group d1 by name; +f = foreach e generate group, AVG(d1.age); +store f into ':OUTPATH:';\, + }, + { + 'num' => 2, + 'pig' => q\a = load ':INPATH:/singlefile/studenttab20m' as (name, age, gpa); b = group a all parallel 20; c = foreach b generate COUNT(a.$0); store c into ':OUTPATH:';\, - }, - { - 'num' => 5, - 'pig' => q\a = load ':INPATH:/singlefile/studenttab20m' as (name, age, gpa); + }, + { + 'num' => 3, + 'java_params' => ['-Dpig.exec.mapPartAgg=true'], + 'pig' => q\a = load ':INPATH:/singlefile/studenttab20m' as (name, age, gpa); b = group a by name parallel 20; c = foreach b generate group, COUNT($1); store c into ':OUTPATH:';\, - }, - { - 'num' => 6, - 'pig' => q\ + }, + ] + }, + { + 'name' => 'BigData_Stream', + 'tests' => [ + { + 'num' => 1, + 'pig' => q\ define cmd `perl PigStreaming.pl` ship(':SCRIPTHOMEPATH:/PigStreaming.pl') stderr('CMD' limit 3); a = load ':INPATH:/singlefile/studenttab20m' as (name, age, gpa); b = stream a through cmd as (n, a, g); c = foreach b generate n, a; store c into ':OUTPATH:';\, - }, - { - 'num' => 7, - 'pig' => q\a = load ':INPATH:/singlefile/studenttab20m' as (name, age, gpa); + }, + ] + }, + { + 'name' => 'BigData_Order', + 'tests' => [ + { + 'num' => 1, + 'pig' => q\a = load ':INPATH:/singlefile/studenttab20m' as (name, age, gpa); b = order a by name parallel 20; store b into ':OUTPATH:';\, - 'sortArgs' => ['-t', ' ', '-k', '1,1'], - }, - { - 'num' => 8, - 'pig' => q\a = load ':INPATH:/singlefile/studenttab20m' as (name:chararray, age:int, gpa:double); + 'sortArgs' => ['-t', ' ', '-k', '1,1'], + }, + { + 'num' => 2, + 'pig' => q\a = load ':INPATH:/singlefile/studenttab20m' as (name:chararray, age:int, gpa:double); b = order a by name, age desc parallel 20; store b into ':OUTPATH:';\, - 'sortArgs' => ['-t', ' ', '-k', '1,1', '-k', '2nr,2nr'], - }, - { - 'num' => 9, - 'pig' => q\A = load ':INPATH:/singlefile/studenttab20m' as (name, age, gpa); + 'sortArgs' => ['-t', ' ', '-k', '1,1', '-k', '2nr,2nr'], + }, + { + 'num' => 3, + 'pig' => q\A = load ':INPATH:/singlefile/studenttab20m' as (name, age, gpa); B = filter A by age > 20; C = group B by name; D = foreach C generate group, COUNT(B) PARALLEL 16; E = order D by $0 PARALLEL 16; F = limit E 10; store F into ':OUTPATH:';\, - 'sortArgs' => ['-t', ' ', '-k', '1,1'], - }, - - ] - }, + 'sortArgs' => ['-t', ' ', '-k', '1,1'], + }, + ] + }, ] } ; Modified: pig/branches/spark/test/e2e/pig/tests/grunt.conf URL: http://svn.apache.org/viewvc/pig/branches/spark/test/e2e/pig/tests/grunt.conf?rev=1642132&r1=1642131&r2=1642132&view=diff ============================================================================== --- pig/branches/spark/test/e2e/pig/tests/grunt.conf (original) +++ pig/branches/spark/test/e2e/pig/tests/grunt.conf Thu Nov 27 12:49:54 2014 @@ -44,7 +44,7 @@ $cfg = { 'num' => 2, 'pig' => "pwd", 'execonly' => 'mapred,tez', # don't have a clue what their cwd will be for local mode - 'expected_out_regex' => "hdfs:", + 'expected_out_regex' => "/user", 'rc' => 0 },{ @@ -55,10 +55,8 @@ $cfg = { },{ 'num' => 6, - 'pig' => q\ -sh touch /bin/bad -\, - ,'expected_err_regex' => "Permission denied" + 'pig' => "cat nonexist" + ,'expected_err_regex' => "does not exist" ,'rc' => 5 },{ Modified: pig/branches/spark/test/e2e/pig/tests/macro.conf URL: http://svn.apache.org/viewvc/pig/branches/spark/test/e2e/pig/tests/macro.conf?rev=1642132&r1=1642131&r2=1642132&view=diff ============================================================================== --- pig/branches/spark/test/e2e/pig/tests/macro.conf (original) +++ pig/branches/spark/test/e2e/pig/tests/macro.conf Thu Nov 27 12:49:54 2014 @@ -373,10 +373,16 @@ $cfg = { A = load ':INPATH:/singlefile/studenttab10k' as (name, age, gpa); x = test(A); store x into ':OUTPATH:';#, + 'pig_win' => q#define CMD `perl -ne "print $_;"`; + define test(in) returns B { + $B = stream $in through CMD as (name, age, gpa); + } + + A = load ':INPATH:/singlefile/studenttab10k' as (name, age, gpa); + x = test(A); + store x into ':OUTPATH:';#, 'verify_pig_script' => q#A = load ':INPATH:/singlefile/studenttab10k' as (name, age, gpa); - define CMD `perl -ne 'print $_;'`; - B = stream A through CMD as (name, age, gpa); - store B into ':OUTPATH:';#, + store A into ':OUTPATH:';#, 'floatpostprocess' => 1, 'delimiter' => ' ' }, Modified: pig/branches/spark/test/e2e/pig/tests/multiquery.conf URL: http://svn.apache.org/viewvc/pig/branches/spark/test/e2e/pig/tests/multiquery.conf?rev=1642132&r1=1642131&r2=1642132&view=diff ============================================================================== --- pig/branches/spark/test/e2e/pig/tests/multiquery.conf (original) +++ pig/branches/spark/test/e2e/pig/tests/multiquery.conf Thu Nov 27 12:49:54 2014 @@ -236,6 +236,14 @@ $cfg = { C = stream B through CMD2 as (name, age, gpa); D = JOIN B by name, C by name; store D into ':OUTPATH:.2'; #, + 'pig_win' => q# define CMD1 `perl -ne "print $_;"`; + define CMD2 `perl -ne "print $_;"`; + A = load ':INPATH:/singlefile/studenttab10k' as (name, age, gpa); + B = stream A through CMD1 as (name, age, gpa); + store B into ':OUTPATH:.1'; + C = stream B through CMD2 as (name, age, gpa); + D = JOIN B by name, C by name; + store D into ':OUTPATH:.2'; #, 'sql' => "select name, age, gpa from studenttab10k; select A.name, A.age, A.gpa, B.name, B.age, B.gpa from studenttab10k as A join studenttab10k as B using(name);",
