Added: pig/branches/spark/src/org/apache/pig/newplan/logical/visitor/ForEachUserSchemaVisitor.java URL: http://svn.apache.org/viewvc/pig/branches/spark/src/org/apache/pig/newplan/logical/visitor/ForEachUserSchemaVisitor.java?rev=1784237&view=auto ============================================================================== --- pig/branches/spark/src/org/apache/pig/newplan/logical/visitor/ForEachUserSchemaVisitor.java (added) +++ pig/branches/spark/src/org/apache/pig/newplan/logical/visitor/ForEachUserSchemaVisitor.java Fri Feb 24 08:19:42 2017 @@ -0,0 +1,253 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.pig.newplan.logical.visitor; + +import java.util.ArrayList; +import java.util.List; + +import org.apache.pig.data.DataType; +import org.apache.pig.impl.logicalLayer.FrontendException; +import org.apache.pig.newplan.DependencyOrderWalker; +import org.apache.pig.newplan.Operator; +import org.apache.pig.newplan.OperatorPlan; +import org.apache.pig.newplan.logical.expression.CastExpression; +import org.apache.pig.newplan.logical.expression.LogicalExpressionPlan; +import org.apache.pig.newplan.logical.expression.ProjectExpression; +import org.apache.pig.newplan.logical.relational.LOForEach; +import org.apache.pig.newplan.logical.relational.LOGenerate; +import org.apache.pig.newplan.logical.relational.LOInnerLoad; +import org.apache.pig.newplan.logical.relational.LogicalPlan; +import org.apache.pig.newplan.logical.relational.LogicalRelationalNodesVisitor; +import org.apache.pig.newplan.logical.relational.LogicalSchema; +import org.apache.pig.newplan.logical.relational.LogicalSchema.LogicalFieldSchema; + +public class ForEachUserSchemaVisitor extends LogicalRelationalNodesVisitor { + public ForEachUserSchemaVisitor(OperatorPlan plan) throws FrontendException { + super(plan, new DependencyOrderWalker(plan)); + } + + private static LogicalSchema replaceNullByteArraySchema( + LogicalSchema originalSchema, + LogicalSchema userSchema) throws FrontendException { + if( originalSchema == null && userSchema == null ) { + return null; + } else if ( originalSchema == null ) { + return userSchema.deepCopy(); + } else if ( userSchema == null ) { + return originalSchema.deepCopy(); + } + + LogicalSchema replacedSchema = new LogicalSchema(); + for (int i=0;i<originalSchema.size();i++) { + LogicalFieldSchema replacedFS = replaceNullByteArrayFieldSchema(originalSchema.getField(i), userSchema.getField(i)); + replacedSchema.addField(replacedFS); + } + return replacedSchema; + } + + private static LogicalFieldSchema replaceNullByteArrayFieldSchema( + LogicalFieldSchema originalFS, + LogicalFieldSchema userFS) throws FrontendException { + if( originalFS == null && userFS == null ) { + return null; + } else if ( originalFS == null ) { + return userFS.deepCopy(); + } else if ( userFS == null ) { + return originalFS.deepCopy(); + } + if ( originalFS.type==DataType.NULL + || originalFS.type==DataType.BYTEARRAY ) { + return userFS.deepCopy(); + } else if ( userFS.type==DataType.NULL + || userFS.type==DataType.BYTEARRAY ) { + // Use originalFS schema but keep the alias from userFS + return new LogicalFieldSchema(userFS.alias, originalFS.schema, originalFS.type); + } + + if ( !DataType.isSchemaType(originalFS.type) ) { + return userFS.deepCopy(); + } else { + LogicalSchema replacedSchema = replaceNullByteArraySchema(originalFS.schema, userFS.schema); + return new LogicalFieldSchema(userFS.alias, replacedSchema, userFS.type); + } + } + + private static boolean hasOnlyNullOrByteArraySchema (LogicalFieldSchema fs) { + if( DataType.isSchemaType(fs.type) ) { + if( fs.schema != null ) { + for (LogicalFieldSchema sub_fs : fs.schema.getFields() ) { + if( !hasOnlyNullOrByteArraySchema(sub_fs) ) { + return false; + } + } + } + } else if( fs.type != DataType.NULL && fs.type != DataType.BYTEARRAY ) { + return false; + } + return true; + } + + @Override + public void visit(LOForEach foreach) throws FrontendException { + LOGenerate generate = (LOGenerate)foreach.getInnerPlan().getSinks().get(0); + List<LogicalSchema> mExpSchemas = generate.getExpSchemas(); + List<LogicalSchema> mUserDefinedSchemas = generate.getUserDefinedSchema(); + + // Skip if no way to figure out schema (usually both expression schema and + // user defined schema are null) + if (foreach.getSchema()==null) { + return; + } + + if (mUserDefinedSchemas==null) { + return; + } + + boolean hasUserDefinedSchema = false; + for (LogicalSchema mUserDefinedSchema : mUserDefinedSchemas) { + if (mUserDefinedSchema!=null) { + hasUserDefinedSchema = true; + break; + } + } + + if (!hasUserDefinedSchema) { + return; + } + + if (mExpSchemas.size()!=mUserDefinedSchemas.size()) { + throw new FrontendException("Size mismatch: Get " + mExpSchemas.size() + + " mExpSchemas, but " + mUserDefinedSchemas.size() + " mUserDefinedSchemas", + 0, generate.getLocation()); + } + + LogicalPlan innerPlan = new LogicalPlan(); + LOForEach casterForEach = new LOForEach(plan); + casterForEach.setInnerPlan(innerPlan); + casterForEach.setAlias(foreach.getAlias()); + + List<LogicalExpressionPlan> exps = new ArrayList<LogicalExpressionPlan>(); + LOGenerate gen = new LOGenerate(innerPlan, exps, null); + innerPlan.add(gen); + + int index = 0; + boolean needCast = false; + for(int i=0;i<mExpSchemas.size();i++) { + LogicalSchema mExpSchema = mExpSchemas.get(i); + LogicalSchema mUserDefinedSchema = mUserDefinedSchemas.get(i); + + // Use user defined schema to cast, this is the prevailing use case + if (mExpSchema==null) { + for (LogicalFieldSchema fs : mUserDefinedSchema.getFields()) { + if (hasOnlyNullOrByteArraySchema(fs)) { + addToExps(casterForEach, innerPlan, gen, exps, index, false, null); + } else { + addToExps(casterForEach, innerPlan, gen, exps, index, true, fs); + needCast = true; + } + index++; + } + continue; + } + + // No user defined schema, no need to cast + if (mUserDefinedSchema==null) { + for (int j=0;j<mExpSchema.size();j++) { + addToExps(casterForEach, innerPlan, gen, exps, index, false, null); + index++; + } + continue; + } + + // Expression has schema, but user also define schema, need cast only + // when there is a mismatch + if (mExpSchema.size()!=mUserDefinedSchema.size()) { + throw new FrontendException("Size mismatch: Cannot cast " + mExpSchema.size() + + " fields to " + mUserDefinedSchema.size(), 0, foreach.getLocation()); + } + + LogicalSchema replacedSchema = replaceNullByteArraySchema(mExpSchema,mUserDefinedSchema); + for (int j=0;j<mExpSchema.size();j++) { + LogicalFieldSchema mExpFieldSchema = mExpSchema.getField(j); + LogicalFieldSchema mUserDefinedFieldSchema = replacedSchema.getField(j); + + if (hasOnlyNullOrByteArraySchema(mUserDefinedFieldSchema) || + LogicalFieldSchema.typeMatch(mExpFieldSchema, mUserDefinedFieldSchema)) { + addToExps(casterForEach, innerPlan, gen, exps, index, false, null); + } else { + addToExps(casterForEach, innerPlan, gen, exps, index, true, mUserDefinedFieldSchema); + needCast = true; + } + index++; + } + } + + gen.setFlattenFlags(new boolean[index]); + if (needCast) { + // Insert the casterForEach into the plan and patch up the plan. + List <Operator> successorOps = plan.getSuccessors(foreach); + if (successorOps != null && successorOps.size() > 0){ + Operator next = plan.getSuccessors(foreach).get(0); + plan.insertBetween(foreach, casterForEach, next); + }else{ + plan.add(casterForEach); + plan.connect(foreach,casterForEach); + } + + // Since the explict cast is now inserted after the original foreach, + // throwing away the user defined "types" but keeping the user + // defined names from the original foreach. + // 'generate' (LOGenerate) still holds the reference to this + // mUserDefinedSchemas + for( LogicalSchema mUserDefinedSchema : mUserDefinedSchemas ) { + resetTypeToNull( mUserDefinedSchema ); + } + } + } + + private void resetTypeToNull (LogicalSchema s1) { + if( s1 != null ) { + for (LogicalFieldSchema fs : s1.getFields()) { + if( DataType.isSchemaType(fs.type) ) { + resetTypeToNull(fs.schema); + } else { + fs.type = DataType.NULL; + } + } + } + } + + private void addToExps(LOForEach casterForEach, LogicalPlan innerPlan, LOGenerate gen, + List<LogicalExpressionPlan> exps, int index, boolean needCaster, LogicalFieldSchema fs) { + + LOInnerLoad innerLoad = new LOInnerLoad(innerPlan, casterForEach, index); + innerPlan.add(innerLoad); + innerPlan.connect(innerLoad, gen); + + LogicalExpressionPlan exp = new LogicalExpressionPlan(); + + ProjectExpression prj = new ProjectExpression(exp, index, 0, gen); + exp.add(prj); + + if (needCaster) { + CastExpression cast = new CastExpression(exp, prj, new LogicalSchema.LogicalFieldSchema(fs)); + exp.add(cast); + } + exps.add(exp); + } +}
Modified: pig/branches/spark/src/org/apache/pig/newplan/logical/visitor/LineageFindRelVisitor.java URL: http://svn.apache.org/viewvc/pig/branches/spark/src/org/apache/pig/newplan/logical/visitor/LineageFindRelVisitor.java?rev=1784237&r1=1784236&r2=1784237&view=diff ============================================================================== --- pig/branches/spark/src/org/apache/pig/newplan/logical/visitor/LineageFindRelVisitor.java (original) +++ pig/branches/spark/src/org/apache/pig/newplan/logical/visitor/LineageFindRelVisitor.java Fri Feb 24 08:19:42 2017 @@ -24,6 +24,7 @@ import java.util.List; import java.util.Map; import java.util.Set; +import org.apache.pig.EvalFunc; import org.apache.pig.FuncSpec; import org.apache.pig.LoadCaster; import org.apache.pig.LoadFunc; @@ -729,6 +730,44 @@ public class LineageFindRelVisitor exten } } + @Override + public void visit(UserFuncExpression op) throws FrontendException { + + if( op.getFieldSchema() == null ) { + return; + } + + FuncSpec funcSpec = null; + Class loader = instantiateCaster(op.getFuncSpec()); + List<LogicalExpression> arguments = op.getArguments(); + if ( loader != null ) { + // if evalFunc.getLoadCaster() returns, simply use that. + funcSpec = op.getFuncSpec(); + } else if (arguments.size() != 0 ) { + FuncSpec baseFuncSpec = null; + LogicalFieldSchema fs = arguments.get(0).getFieldSchema(); + if ( fs != null ) { + baseFuncSpec = uid2LoadFuncMap.get(fs.uid); + if( baseFuncSpec != null ) { + funcSpec = baseFuncSpec; + for(int i = 1; i < arguments.size(); i++) { + fs = arguments.get(i).getFieldSchema(); + if( fs == null || !haveIdenticalCasters(baseFuncSpec, uid2LoadFuncMap.get(fs.uid)) ) { + funcSpec = null; + break; + } + } + } + } + } + + if( funcSpec != null ) { + addUidLoadFuncToMap(op.getFieldSchema().uid, funcSpec); + // in case schema is nested, set funcSpec for all + setLoadFuncForUids(op.getFieldSchema().schema, funcSpec); + } + } + /** * if there is a null constant under casts, return it * @param rel @@ -770,6 +809,8 @@ public class LineageFindRelVisitor exten caster = ((LoadFunc)obj).getLoadCaster(); } else if (obj instanceof StreamToPig) { caster = ((StreamToPig)obj).getLoadCaster(); + } else if (obj instanceof EvalFunc) { + caster = ((EvalFunc)obj).getLoadCaster(); } else { throw new VisitorException("Invalid class type " + funcSpec.getClassName(), 2270, PigException.BUG ); Modified: pig/branches/spark/src/org/apache/pig/newplan/logical/visitor/TypeCheckingExpVisitor.java URL: http://svn.apache.org/viewvc/pig/branches/spark/src/org/apache/pig/newplan/logical/visitor/TypeCheckingExpVisitor.java?rev=1784237&r1=1784236&r2=1784237&view=diff ============================================================================== --- pig/branches/spark/src/org/apache/pig/newplan/logical/visitor/TypeCheckingExpVisitor.java (original) +++ pig/branches/spark/src/org/apache/pig/newplan/logical/visitor/TypeCheckingExpVisitor.java Fri Feb 24 08:19:42 2017 @@ -458,6 +458,7 @@ public class TypeCheckingExpVisitor exte collectCastWarning(node, arg.getType(), toFs.type, msgCollector); CastExpression cast = new CastExpression(plan, arg, toFs); + cast.setLocation(node.getLocation()); try { // disconnect cast and arg because the connection is already // added by cast constructor and insertBetween call is going @@ -490,7 +491,7 @@ public class TypeCheckingExpVisitor exte byte outType = cast.getType(); if(outType == DataType.BYTEARRAY && inType != outType) { int errCode = 1051; - String msg = "Cannot cast to bytearray"; + String msg = "Cannot cast from " + DataType.findTypeName(inType) + " to bytearray"; msgCollector.collect(msg, MessageType.Error) ; throw new TypeCheckerException(cast, msg, errCode, PigException.INPUT) ; } @@ -607,7 +608,7 @@ public class TypeCheckingExpVisitor exte // Matching schemas if we're working with tuples/bags if (DataType.isSchemaType(lhsType)) { try { - if(! binCond.getLhs().getFieldSchema().isEqual(binCond.getRhs().getFieldSchema())){ + if(!LogicalFieldSchema.isEqualUnlessUnknown(binCond.getLhs().getFieldSchema(), binCond.getRhs().getFieldSchema())){ int errCode = 1048; String msg = "Two inputs of BinCond must have compatible schemas." + " left hand side: " + binCond.getLhs().getFieldSchema() Modified: pig/branches/spark/src/org/apache/pig/newplan/logical/visitor/TypeCheckingRelVisitor.java URL: http://svn.apache.org/viewvc/pig/branches/spark/src/org/apache/pig/newplan/logical/visitor/TypeCheckingRelVisitor.java?rev=1784237&r1=1784236&r2=1784237&view=diff ============================================================================== --- pig/branches/spark/src/org/apache/pig/newplan/logical/visitor/TypeCheckingRelVisitor.java (original) +++ pig/branches/spark/src/org/apache/pig/newplan/logical/visitor/TypeCheckingRelVisitor.java Fri Feb 24 08:19:42 2017 @@ -351,7 +351,8 @@ public class TypeCheckingRelVisitor exte if (outFieldSchema.type != fs.type) { castNeededCounter++ ; - new CastExpression(genPlan, project, outFieldSchema); + CastExpression castexp = new CastExpression(genPlan, project, outFieldSchema); + castexp.setLocation(toOp.getLocation()); } generatePlans.add(genPlan) ; Modified: pig/branches/spark/src/org/apache/pig/newplan/logical/visitor/UnionOnSchemaSetter.java URL: http://svn.apache.org/viewvc/pig/branches/spark/src/org/apache/pig/newplan/logical/visitor/UnionOnSchemaSetter.java?rev=1784237&r1=1784236&r2=1784237&view=diff ============================================================================== --- pig/branches/spark/src/org/apache/pig/newplan/logical/visitor/UnionOnSchemaSetter.java (original) +++ pig/branches/spark/src/org/apache/pig/newplan/logical/visitor/UnionOnSchemaSetter.java Fri Feb 24 08:19:42 2017 @@ -21,6 +21,7 @@ package org.apache.pig.newplan.logical.v import java.util.ArrayList; import java.util.List; +import org.apache.pig.PigException; import org.apache.pig.data.DataType; import org.apache.pig.impl.logicalLayer.FrontendException; import org.apache.pig.impl.util.Pair; @@ -110,9 +111,20 @@ public class UnionOnSchemaSetter extends } else { ProjectExpression projExpr = new ProjectExpression( exprPlan, genInputs.size(), 0, gen ); - if( fs.type != DataType.BYTEARRAY - && opSchema.getField( pos ).type != fs.type ) { - new CastExpression( exprPlan, projExpr, fs ); + if( opSchema.getField( pos ).type != fs.type ) { + if( fs.type != DataType.BYTEARRAY ) { + CastExpression castexpr = new CastExpression( exprPlan, projExpr, fs ); + castexpr.setLocation(union.getLocation()); + } else { + int errCode = 1056; + String msg = "Union of incompatible types not allowed. " + + "Cannot cast from " + + DataType.findTypeName(opSchema.getField( pos ).type) + + " to bytearray for '" + + opSchema.getField( pos ).alias + + "'. Please typecast to compatible types before union." ; + throw new FrontendException(union, msg, errCode, PigException.INPUT) ; + } } genInputs.add( new LOInnerLoad( innerPlan, foreach, pos ) ); } Modified: pig/branches/spark/src/org/apache/pig/parser/LogicalPlanBuilder.java URL: http://svn.apache.org/viewvc/pig/branches/spark/src/org/apache/pig/parser/LogicalPlanBuilder.java?rev=1784237&r1=1784236&r2=1784237&view=diff ============================================================================== --- pig/branches/spark/src/org/apache/pig/parser/LogicalPlanBuilder.java (original) +++ pig/branches/spark/src/org/apache/pig/parser/LogicalPlanBuilder.java Fri Feb 24 08:19:42 2017 @@ -34,6 +34,7 @@ import org.antlr.runtime.RecognitionExce import org.apache.pig.ExecType; import org.apache.pig.FuncSpec; import org.apache.pig.LoadFunc; +import org.apache.pig.NonFSLoadFunc; import org.apache.pig.PigConfiguration; import org.apache.pig.StoreFuncInterface; import org.apache.pig.backend.executionengine.ExecException; @@ -888,7 +889,7 @@ public class LogicalPlanBuilder { if (absolutePath == null) { absolutePath = loFunc.relativeToAbsolutePath( filename, QueryParserUtils.getCurrentDir( pigContext ) ); - if (absolutePath!=null) { + if (absolutePath!=null && !(loFunc instanceof NonFSLoadFunc)) { QueryParserUtils.setHdfsServers( absolutePath, pigContext ); } fileNameMap.put( fileNameKey, absolutePath ); @@ -1357,13 +1358,19 @@ public class LogicalPlanBuilder { return Long.parseLong( num ); } + /** + * Parse big integer formatted string (e.g. "1234567890123BI") into BigInteger object + */ static BigInteger parseBigInteger(String s) { - String num = s.substring( 0, s.length() - 1 ); + String num = s.substring( 0, s.length() - 2 ); return new BigInteger( num ); } + /** + * Parse big decimal formatted string (e.g. "123456.7890123BD") into BigDecimal object + */ static BigDecimal parseBigDecimal(String s) { - String num = s.substring( 0, s.length() - 1 ); + String num = s.substring( 0, s.length() - 2 ); return new BigDecimal( num ); } @@ -1781,6 +1788,8 @@ public class LogicalPlanBuilder { return JOINTYPE.REPLICATED; } else if( modifier.equalsIgnoreCase( "hash" ) || modifier.equalsIgnoreCase( "default" ) ) { return LOJoin.JOINTYPE.HASH; + } else if( modifier.equalsIgnoreCase( "bloom" ) ) { + return LOJoin.JOINTYPE.BLOOM; } else if( modifier.equalsIgnoreCase( "skewed" ) ) { return JOINTYPE.SKEWED; } else if (modifier.equalsIgnoreCase("merge")) { @@ -1789,7 +1798,7 @@ public class LogicalPlanBuilder { return JOINTYPE.MERGESPARSE; } else { throw new ParserValidationException( intStream, loc, - "Only REPL, REPLICATED, HASH, SKEWED, MERGE, and MERGE-SPARSE are vaild JOIN modifiers." ); + "Only REPL, REPLICATED, HASH, BLOOM, SKEWED, MERGE, and MERGE-SPARSE are vaild JOIN modifiers." ); } } Modified: pig/branches/spark/src/org/apache/pig/parser/PigMacro.java URL: http://svn.apache.org/viewvc/pig/branches/spark/src/org/apache/pig/parser/PigMacro.java?rev=1784237&r1=1784236&r2=1784237&view=diff ============================================================================== --- pig/branches/spark/src/org/apache/pig/parser/PigMacro.java (original) +++ pig/branches/spark/src/org/apache/pig/parser/PigMacro.java Fri Feb 24 08:19:42 2017 @@ -168,14 +168,9 @@ class PigMacro { Map<String, String> paramVal = pc.getParamVal(); for (Map.Entry<String, String> e : pigContext.getParamVal().entrySet()) { - if (paramVal.containsKey(e.getKey())) { - throw new ParserException( - "Macro contains argument or return value " + e.getKey() + " which conflicts " + - "with a Pig parameter of the same name." - ); - } else { - paramVal.put(e.getKey(), e.getValue()); - } + // overwrite=false since macro parameters should have precedence + // over commandline parameters (if keys overlap) + pc.processOrdLine(e.getKey(), e.getValue(), false); } ParameterSubstitutionPreprocessor psp = new ParameterSubstitutionPreprocessor(pc); @@ -219,6 +214,7 @@ class PigMacro { try { result = parser.query(); } catch (RecognitionException e) { + e.line += startLine -1; String msg = (fileName == null) ? parser.getErrorHeader(e) : QueryParserUtils.generateErrorHeader(e, fileName); msg += " " + parser.getErrorMessage(e, parser.getTokenNames()); @@ -236,7 +232,7 @@ class PigMacro { if (!macroDefNodes.isEmpty()) { String fname = ((PigParserNode)ast).getFileName(); String msg = getErrorMessage(fname, ast.getLine(), - "Invalide macro definition", "macro '" + name + "Invalid macro definition", "macro '" + name + "' contains macro definition.\nmacro content: " + body); throw new ParserException(msg); @@ -273,6 +269,7 @@ class PigMacro { try { result2 = walker.query(); } catch (RecognitionException e) { + e.line += startLine - 1; String msg = walker.getErrorHeader(e) + " " + walker.getErrorMessage(e, walker.getTokenNames()); String msg2 = getErrorMessage(file, line, "Failed to mask macro '" Modified: pig/branches/spark/src/org/apache/pig/parser/QueryParser.g URL: http://svn.apache.org/viewvc/pig/branches/spark/src/org/apache/pig/parser/QueryParser.g?rev=1784237&r1=1784236&r2=1784237&view=diff ============================================================================== --- pig/branches/spark/src/org/apache/pig/parser/QueryParser.g (original) +++ pig/branches/spark/src/org/apache/pig/parser/QueryParser.g Fri Feb 24 08:19:42 2017 @@ -889,6 +889,8 @@ scalar : INTEGER | LONGINTEGER | FLOATNUMBER | DOUBLENUMBER + | BIGINTEGERNUMBER + | BIGDECIMALNUMBER | QUOTEDSTRING | NULL | TRUE Modified: pig/branches/spark/src/org/apache/pig/parser/RegisterResolver.java URL: http://svn.apache.org/viewvc/pig/branches/spark/src/org/apache/pig/parser/RegisterResolver.java?rev=1784237&r1=1784236&r2=1784237&view=diff ============================================================================== --- pig/branches/spark/src/org/apache/pig/parser/RegisterResolver.java (original) +++ pig/branches/spark/src/org/apache/pig/parser/RegisterResolver.java Fri Feb 24 08:19:42 2017 @@ -23,6 +23,10 @@ import java.net.URISyntaxException; import org.apache.pig.PigServer; import org.apache.pig.tools.DownloadResolver; +import org.apache.hadoop.conf.Configuration; +import org.apache.pig.backend.hadoop.datastorage.ConfigurationUtil; +import org.apache.pig.backend.hadoop.executionengine.shims.HadoopShims; +import org.apache.hadoop.fs.Path; public class RegisterResolver { @@ -66,15 +70,24 @@ public class RegisterResolver { String scheme = uri.getScheme(); if (scheme != null) { scheme = scheme.toLowerCase(); + if (scheme.equals("ivy")) { + DownloadResolver downloadResolver = DownloadResolver.getInstance(); + return downloadResolver.downloadArtifact(uri, pigServer); + } + if (!hasFileSystemImpl(uri)) { + throw new ParserException("Invalid Scheme: " + uri.getScheme()); + } } - if (scheme == null || scheme.equals("file") || scheme.equals("hdfs")) { - return new URI[] { uri }; - } else if (scheme.equals("ivy")) { - DownloadResolver downloadResolver = DownloadResolver.getInstance(); - return downloadResolver.downloadArtifact(uri, pigServer); - } else { - throw new ParserException("Invalid Scheme: " + uri.getScheme()); - } + return new URI[] { uri }; + } + + /** + * @param uri + * @return True if the uri has valid file system implementation + */ + private boolean hasFileSystemImpl(URI uri) { + Configuration conf = ConfigurationUtil.toConfiguration(pigServer.getPigContext().getProperties(), true); + return HadoopShims.hasFileSystemImpl(new Path(uri), conf); } /** Modified: pig/branches/spark/src/org/apache/pig/parser/SourceLocation.java URL: http://svn.apache.org/viewvc/pig/branches/spark/src/org/apache/pig/parser/SourceLocation.java?rev=1784237&r1=1784236&r2=1784237&view=diff ============================================================================== --- pig/branches/spark/src/org/apache/pig/parser/SourceLocation.java (original) +++ pig/branches/spark/src/org/apache/pig/parser/SourceLocation.java Fri Feb 24 08:19:42 2017 @@ -75,12 +75,11 @@ public class SourceLocation { if (node != null) { InvocationPoint pt = node.getNextInvocationPoint(); while (pt != null) { - sb.append("\n"); sb.append("at expanding macro '" + pt.getMacro() + "' (" + pt.getFile() + ":" + pt.getLine() + ")"); pt = node.getNextInvocationPoint(); + sb.append("\n"); } - sb.append("\n"); } sb.append( "<" ); if( file != null && !file.isEmpty() ) Modified: pig/branches/spark/src/org/apache/pig/pen/LocalMapReduceSimulator.java URL: http://svn.apache.org/viewvc/pig/branches/spark/src/org/apache/pig/pen/LocalMapReduceSimulator.java?rev=1784237&r1=1784236&r2=1784237&view=diff ============================================================================== --- pig/branches/spark/src/org/apache/pig/pen/LocalMapReduceSimulator.java (original) +++ pig/branches/spark/src/org/apache/pig/pen/LocalMapReduceSimulator.java Fri Feb 24 08:19:42 2017 @@ -27,6 +27,7 @@ import java.util.Map; import org.apache.hadoop.conf.Configuration; import org.apache.hadoop.io.Text; import org.apache.hadoop.io.Writable; +import org.apache.hadoop.mapred.TaskID; import org.apache.hadoop.mapred.jobcontrol.Job; import org.apache.hadoop.mapred.jobcontrol.JobControl; import org.apache.hadoop.mapreduce.Mapper; @@ -35,6 +36,7 @@ import org.apache.pig.PigException; import org.apache.pig.backend.executionengine.ExecException; import org.apache.pig.backend.hadoop.datastorage.ConfigurationUtil; import org.apache.pig.backend.hadoop.executionengine.mapReduceLayer.JobControlCompiler; +import org.apache.pig.backend.hadoop.executionengine.mapReduceLayer.MRConfiguration; import org.apache.pig.backend.hadoop.executionengine.mapReduceLayer.MapReduceLauncher; import org.apache.pig.backend.hadoop.executionengine.mapReduceLayer.MapReduceOper; import org.apache.pig.backend.hadoop.executionengine.mapReduceLayer.PigMapBase; @@ -75,9 +77,9 @@ import org.apache.pig.pen.util.LineageTr * */ public class LocalMapReduceSimulator { - + private MapReduceLauncher launcher = new MapReduceLauncher(); - + private Map<PhysicalOperator, PhysicalOperator> phyToMRMap = new HashMap<PhysicalOperator, PhysicalOperator>();; @SuppressWarnings("unchecked") @@ -88,12 +90,12 @@ public class LocalMapReduceSimulator { PigContext pc) throws PigException, IOException, InterruptedException { phyToMRMap.clear(); MROperPlan mrp = launcher.compile(php, pc); - + ConfigurationValidator.validatePigProperties(pc.getProperties()); Configuration conf = ConfigurationUtil.toConfiguration(pc.getProperties()); - + JobControlCompiler jcc = new JobControlCompiler(pc, conf); - + JobControl jc; int numMRJobsCompl = 0; DataBag input; @@ -106,6 +108,8 @@ public class LocalMapReduceSimulator { boolean needFileInput; final ArrayList<OperatorKey> emptyInpTargets = new ArrayList<OperatorKey>(); pc.getProperties().setProperty("pig.illustrating", "true"); + String jtIdentifier = "" + System.currentTimeMillis(); + int jobId = 0; while(mrp.size() != 0) { jc = jcc.compile(mrp, "Illustrator"); if(jc == null) { @@ -113,6 +117,7 @@ public class LocalMapReduceSimulator { } List<Job> jobs = jc.getWaitingJobs(); for (Job job : jobs) { + jobId++; jobConf = job.getJobConf(); FileLocalizer.setInitialized(false); ArrayList<ArrayList<OperatorKey>> inpTargets = @@ -123,14 +128,14 @@ public class LocalMapReduceSimulator { PigSplit split = null; List<POStore> stores = null; PhysicalOperator pack = null; - // revisit as there are new physical operators from MR compilation + // revisit as there are new physical operators from MR compilation if (!mro.mapPlan.isEmpty()) attacher.revisit(mro.mapPlan); if (!mro.reducePlan.isEmpty()) { attacher.revisit(mro.reducePlan); pack = mro.reducePlan.getRoots().get(0); } - + List<POLoad> lds = PlanHelper.getPhysicalOperators(mro.mapPlan, POLoad.class); if (!mro.mapPlan.isEmpty()) { stores = PlanHelper.getPhysicalOperators(mro.mapPlan, POStore.class); @@ -145,10 +150,10 @@ public class LocalMapReduceSimulator { for (POStore store : stores) { output.put(store.getSFile().getFileName(), attacher.getDataMap().get(store)); } - + OutputAttacher oa = new OutputAttacher(mro.mapPlan, output); oa.visit(); - + if (!mro.reducePlan.isEmpty()) { oa = new OutputAttacher(mro.reducePlan, output); oa.visit(); @@ -168,6 +173,7 @@ public class LocalMapReduceSimulator { if (input != null) mro.mapPlan.remove(ld); } + int mapTaskId = 0; for (POLoad ld : lds) { // check newly generated data first input = output.get(ld.getLFile().getFileName()); @@ -180,7 +186,7 @@ public class LocalMapReduceSimulator { break; } } - } + } } needFileInput = (input == null); split = new PigSplit(null, index, needFileInput ? emptyInpTargets : inpTargets.get(index), 0); @@ -199,6 +205,7 @@ public class LocalMapReduceSimulator { context = ((PigMapReduceCounter.PigMapCounter) map).getIllustratorContext(jobConf, input, intermediateData, split); } ((PigMapBase) map).setMapPlan(mro.mapPlan); + context.getConfiguration().set(MRConfiguration.TASK_ID, new TaskID(jtIdentifier, jobId, true, mapTaskId++).toString()); map.run(context); } else { if ("true".equals(jobConf.get("pig.usercomparator"))) @@ -210,10 +217,11 @@ public class LocalMapReduceSimulator { Mapper<Text, Tuple, PigNullableWritable, Writable>.Context context = ((PigMapBase) map) .getIllustratorContext(jobConf, input, intermediateData, split); ((PigMapBase) map).setMapPlan(mro.mapPlan); + context.getConfiguration().set(MRConfiguration.TASK_ID, new TaskID(jtIdentifier, jobId, true, mapTaskId++).toString()); map.run(context); } } - + if (!mro.reducePlan.isEmpty()) { if (pack instanceof POPackage) @@ -233,19 +241,20 @@ public class LocalMapReduceSimulator { } ((PigMapReduce.Reduce) reduce).setReducePlan(mro.reducePlan); + context.getConfiguration().set(MRConfiguration.TASK_ID, new TaskID(jtIdentifier, jobId, false, 0).toString()); reduce.run(context); } for (PhysicalOperator key : mro.phyToMRMap.keySet()) for (PhysicalOperator value : mro.phyToMRMap.get(key)) phyToMRMap.put(key, value); } - - + + int removedMROp = jcc.updateMROpPlan(new LinkedList<Job>()); - + numMRJobsCompl += removedMROp; } - + jcc.reset(); } @@ -256,7 +265,7 @@ public class LocalMapReduceSimulator { plan)); this.outputBuffer = output; } - + @Override public void visitUserFunc(POUserFunc userFunc) throws VisitorException { if (userFunc.getFunc() != null && userFunc.getFunc() instanceof ReadScalars) { Modified: pig/branches/spark/src/org/apache/pig/scripting/ScriptEngine.java URL: http://svn.apache.org/viewvc/pig/branches/spark/src/org/apache/pig/scripting/ScriptEngine.java?rev=1784237&r1=1784236&r2=1784237&view=diff ============================================================================== --- pig/branches/spark/src/org/apache/pig/scripting/ScriptEngine.java (original) +++ pig/branches/spark/src/org/apache/pig/scripting/ScriptEngine.java Fri Feb 24 08:19:42 2017 @@ -38,6 +38,7 @@ import java.util.regex.Pattern; import org.apache.hadoop.util.Shell; import org.apache.pig.backend.executionengine.ExecException; import org.apache.pig.impl.PigContext; +import org.apache.pig.impl.util.UDFContext; import org.apache.pig.tools.pigstats.PigStats; /** @@ -127,7 +128,9 @@ public abstract class ScriptEngine { //protected static InputStream getScriptAsStream(String scriptPath) { InputStream is = null; File file = new File(scriptPath); - if (file.exists()) { + // In the frontend give preference to the local file. + // In the backend, try the jar first + if (UDFContext.getUDFContext().isFrontend() && file.exists()) { try { is = new FileInputStream(file); } catch (FileNotFoundException e) { @@ -156,7 +159,14 @@ public abstract class ScriptEngine { } } } - + if (is == null && file.exists()) { + try { + is = new FileInputStream(file); + } catch (FileNotFoundException e) { + throw new IllegalStateException("could not find existing file "+scriptPath, e); + } + } + // TODO: discuss if we want to add logic here to load a script from HDFS if (is == null) { Modified: pig/branches/spark/src/org/apache/pig/scripting/js/JsFunction.java URL: http://svn.apache.org/viewvc/pig/branches/spark/src/org/apache/pig/scripting/js/JsFunction.java?rev=1784237&r1=1784236&r2=1784237&view=diff ============================================================================== --- pig/branches/spark/src/org/apache/pig/scripting/js/JsFunction.java (original) +++ pig/branches/spark/src/org/apache/pig/scripting/js/JsFunction.java Fri Feb 24 08:19:42 2017 @@ -95,7 +95,7 @@ public class JsFunction extends EvalFunc private void debugConvertPigToJS(int depth, String pigType, Object value, Schema schema) { if (LOG.isDebugEnabled()) { - LOG.debug(indent(depth)+"converting from Pig " + pigType + " " + value + " using " + stringify(schema)); + LOG.debug(indent(depth)+"converting from Pig " + pigType + " " + toString(value) + " using " + stringify(schema)); } } Modified: pig/branches/spark/src/org/apache/pig/scripting/jython/JythonFunction.java URL: http://svn.apache.org/viewvc/pig/branches/spark/src/org/apache/pig/scripting/jython/JythonFunction.java?rev=1784237&r1=1784236&r2=1784237&view=diff ============================================================================== --- pig/branches/spark/src/org/apache/pig/scripting/jython/JythonFunction.java (original) +++ pig/branches/spark/src/org/apache/pig/scripting/jython/JythonFunction.java Fri Feb 24 08:19:42 2017 @@ -54,7 +54,7 @@ public class JythonFunction extends Eval try { f = JythonScriptEngine.getFunction(filename, functionName); this.function = f; - num_parameters = ((PyBaseCode) f.func_code).co_argcount; + num_parameters = ((PyBaseCode) f.__code__).co_argcount; PyObject outputSchemaDef = f.__findattr__("outputSchema".intern()); if (outputSchemaDef != null) { this.schema = Utils.getSchemaFromString(outputSchemaDef.toString()); @@ -105,7 +105,7 @@ public class JythonFunction extends Eval @Override public Object exec(Tuple tuple) throws IOException { try { - if (tuple == null || (num_parameters == 0 && !((PyTableCode)function.func_code).varargs)) { + if (tuple == null || (num_parameters == 0 && !((PyTableCode)function.__code__).varargs)) { // ignore input tuple PyObject out = function.__call__(); return JythonUtils.pythonToPig(out); Modified: pig/branches/spark/src/org/apache/pig/tools/DownloadResolver.java URL: http://svn.apache.org/viewvc/pig/branches/spark/src/org/apache/pig/tools/DownloadResolver.java?rev=1784237&r1=1784236&r2=1784237&view=diff ============================================================================== --- pig/branches/spark/src/org/apache/pig/tools/DownloadResolver.java (original) +++ pig/branches/spark/src/org/apache/pig/tools/DownloadResolver.java Fri Feb 24 08:19:42 2017 @@ -44,8 +44,6 @@ public class DownloadResolver { private static DownloadResolver downloadResolver = new DownloadResolver(); private DownloadResolver() { - System.setProperty("groovy.grape.report.downloads", "true"); - if (System.getProperty("grape.config") != null) { LOG.info("Using ivysettings file from " + System.getProperty("grape.config")); } else { Added: pig/branches/spark/src/org/apache/pig/tools/grunt/ConsoleReaderInputStream.java URL: http://svn.apache.org/viewvc/pig/branches/spark/src/org/apache/pig/tools/grunt/ConsoleReaderInputStream.java?rev=1784237&view=auto ============================================================================== --- pig/branches/spark/src/org/apache/pig/tools/grunt/ConsoleReaderInputStream.java (added) +++ pig/branches/spark/src/org/apache/pig/tools/grunt/ConsoleReaderInputStream.java Fri Feb 24 08:19:42 2017 @@ -0,0 +1,122 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.pig.tools.grunt; + +import java.io.IOException; +import java.io.InputStream; +import java.io.SequenceInputStream; +import java.util.Enumeration; + +import jline.console.ConsoleReader; + +/** Borrowed from jline.console.internal.ConsoleReaderInputStream. However, + * we cannot use ConsoleReaderInputStream directly since: + * 1. ConsoleReaderInputStream is not public + * 2. ConsoleReaderInputStream has a bug which does not deal with UTF-8 correctly + */ +public class ConsoleReaderInputStream extends SequenceInputStream { + private static InputStream systemIn = System.in; + + public static void setIn() throws IOException { + setIn(new ConsoleReader()); + } + + public static void setIn(final ConsoleReader reader) { + System.setIn(new ConsoleReaderInputStream(reader)); + } + + /** + * Restore the original {@link System#in} input stream. + */ + public static void restoreIn() { + System.setIn(systemIn); + } + + public ConsoleReaderInputStream(final ConsoleReader reader) { + super(new ConsoleEnumeration(reader)); + } + + private static class ConsoleEnumeration implements Enumeration { + private final ConsoleReader reader; + private ConsoleLineInputStream next = null; + private ConsoleLineInputStream prev = null; + + public ConsoleEnumeration(final ConsoleReader reader) { + this.reader = reader; + } + + public Object nextElement() { + if (next != null) { + InputStream n = next; + prev = next; + next = null; + + return n; + } + + return new ConsoleLineInputStream(reader); + } + + public boolean hasMoreElements() { + // the last line was null + if ((prev != null) && (prev.wasNull == true)) { + return false; + } + + if (next == null) { + next = (ConsoleLineInputStream) nextElement(); + } + + return next != null; + } + } + + private static class ConsoleLineInputStream extends InputStream { + private final ConsoleReader reader; + private byte[] buffer = null; + private int index = 0; + private boolean eol = false; + protected boolean wasNull = false; + + public ConsoleLineInputStream(final ConsoleReader reader) { + this.reader = reader; + } + + public int read() throws IOException { + if (eol) { + return -1; + } + + if (buffer == null) { + buffer = reader.readLine().getBytes(); + } + + if (buffer == null) { + wasNull = true; + return -1; + } + + if (index >= buffer.length) { + eol = true; + return '\n'; // lines are ended with a newline + } + + return buffer[index++]; + } + } +} \ No newline at end of file Modified: pig/branches/spark/src/org/apache/pig/tools/grunt/Grunt.java URL: http://svn.apache.org/viewvc/pig/branches/spark/src/org/apache/pig/tools/grunt/Grunt.java?rev=1784237&r1=1784236&r2=1784237&view=diff ============================================================================== --- pig/branches/spark/src/org/apache/pig/tools/grunt/Grunt.java (original) +++ pig/branches/spark/src/org/apache/pig/tools/grunt/Grunt.java Fri Feb 24 08:19:42 2017 @@ -20,7 +20,7 @@ package org.apache.pig.tools.grunt; import java.io.BufferedReader; import java.util.ArrayList; -import jline.ConsoleReader; +import jline.console.ConsoleReader; import org.apache.commons.logging.Log; import org.apache.commons.logging.LogFactory; @@ -52,8 +52,8 @@ public class Grunt public void setConsoleReader(ConsoleReader c) { - c.addCompletor(new PigCompletorAliases(pig)); - c.addCompletor(new PigCompletor()); + c.addCompleter(new PigCompletorAliases(pig)); + c.addCompleter(new PigCompletor()); parser.setConsoleReader(c); } Modified: pig/branches/spark/src/org/apache/pig/tools/grunt/GruntParser.java URL: http://svn.apache.org/viewvc/pig/branches/spark/src/org/apache/pig/tools/grunt/GruntParser.java?rev=1784237&r1=1784236&r2=1784237&view=diff ============================================================================== --- pig/branches/spark/src/org/apache/pig/tools/grunt/GruntParser.java (original) +++ pig/branches/spark/src/org/apache/pig/tools/grunt/GruntParser.java Fri Feb 24 08:19:42 2017 @@ -26,7 +26,6 @@ import java.io.FileReader; import java.io.IOException; import java.io.InputStream; import java.io.InputStreamReader; -import java.io.OutputStreamWriter; import java.io.PrintStream; import java.io.Reader; import java.io.StringReader; @@ -42,8 +41,7 @@ import java.util.Map; import java.util.Properties; import java.util.Set; -import jline.ConsoleReader; -import jline.ConsoleReaderInputStream; +import jline.console.ConsoleReader; import org.apache.commons.io.output.NullOutputStream; import org.apache.commons.logging.Log; @@ -264,7 +262,7 @@ public class GruntParser extends PigScri public void prompt() { if (mInteractive) { - mConsoleReader.setDefaultPrompt("grunt> "); + mConsoleReader.setPrompt("grunt> "); } } @@ -516,8 +514,13 @@ public class GruntParser extends PigScri ConsoleReader reader; boolean interactive; - mPigServer.getPigContext().setParams(params); - mPigServer.getPigContext().setParamFiles(files); + PigContext pc = mPigServer.getPigContext(); + + if( !loadOnly ) { + pc.getPreprocessorContext().paramScopePush(); + } + pc.setParams(params); + pc.setParamFiles(files); try { FetchFileRet fetchFile = FileLocalizer.fetchFile(mConf, script); @@ -528,7 +531,7 @@ public class GruntParser extends PigScri cmds = cmds.replaceAll("\t"," "); reader = new ConsoleReader(new ByteArrayInputStream(cmds.getBytes()), - new OutputStreamWriter(System.out)); + System.out); reader.setHistory(mConsoleReader.getHistory()); InputStream in = new ConsoleReaderInputStream(reader); inputReader = new BufferedReader(new InputStreamReader(in)); @@ -560,6 +563,9 @@ public class GruntParser extends PigScri if (interactive) { System.out.println(""); } + if( ! loadOnly ) { + pc.getPreprocessorContext().paramScopePop(); + } } @Override Modified: pig/branches/spark/src/org/apache/pig/tools/grunt/PigCompletor.java URL: http://svn.apache.org/viewvc/pig/branches/spark/src/org/apache/pig/tools/grunt/PigCompletor.java?rev=1784237&r1=1784236&r2=1784237&view=diff ============================================================================== --- pig/branches/spark/src/org/apache/pig/tools/grunt/PigCompletor.java (original) +++ pig/branches/spark/src/org/apache/pig/tools/grunt/PigCompletor.java Fri Feb 24 08:19:42 2017 @@ -33,9 +33,9 @@ import java.util.TreeSet; import org.apache.commons.logging.Log; import org.apache.commons.logging.LogFactory; -import jline.Completor; +import jline.console.completer.Completer; -public class PigCompletor implements Completor { +public class PigCompletor implements Completer { private final Log log = LogFactory.getLog(getClass()); Set<String> candidates; static final String AUTOCOMPLETE_FILENAME = "autocomplete"; Modified: pig/branches/spark/src/org/apache/pig/tools/grunt/PigCompletorAliases.java URL: http://svn.apache.org/viewvc/pig/branches/spark/src/org/apache/pig/tools/grunt/PigCompletorAliases.java?rev=1784237&r1=1784236&r2=1784237&view=diff ============================================================================== --- pig/branches/spark/src/org/apache/pig/tools/grunt/PigCompletorAliases.java (original) +++ pig/branches/spark/src/org/apache/pig/tools/grunt/PigCompletorAliases.java Fri Feb 24 08:19:42 2017 @@ -26,12 +26,11 @@ import java.util.TreeSet; import org.apache.commons.logging.Log; import org.apache.commons.logging.LogFactory; - import org.apache.pig.PigServer; -import jline.Completor; +import jline.console.completer.Completer; -public class PigCompletorAliases implements Completor { +public class PigCompletorAliases implements Completer { private final Log log = LogFactory.getLog(getClass()); Set<String> keywords; PigServer pig; Modified: pig/branches/spark/src/org/apache/pig/tools/parameters/PigFileParser.jj URL: http://svn.apache.org/viewvc/pig/branches/spark/src/org/apache/pig/tools/parameters/PigFileParser.jj?rev=1784237&r1=1784236&r2=1784237&view=diff ============================================================================== --- pig/branches/spark/src/org/apache/pig/tools/parameters/PigFileParser.jj (original) +++ pig/branches/spark/src/org/apache/pig/tools/parameters/PigFileParser.jj Fri Feb 24 08:19:42 2017 @@ -259,8 +259,11 @@ TOKEN : <PIGDEFAULT: "%default" > } + TOKEN : { + <REGISTER: "register"> : IN_REGISTER + | <IDENTIFIER: (<SPECIALCHAR>)*<LETTER>(<DIGIT> | <LETTER> | <SPECIALCHAR>)*> | <LITERAL: ("\"" ((~["\""])*("\\\"")?)* "\"")|("'" ((~["'"])*("\\\'")?)* "'") > @@ -276,7 +279,14 @@ TOKEN : <OTHER: (~["\"" , "'" , "`" , "a"-"z" , "A"-"Z" , "_" , "#" , "=" , " " , "\n" , "\t" , "\r", "%", "/", "-", "$"])+ > | <NOT_OTHER_CHAR: ["\"" , "'" , "`" , "a"-"z" , "A"-"Z" , "_" , "#" , "=" , " " , "\n" , "\t" , "\r", "%", "/", "-", "$"] > - +} + +<IN_REGISTER> MORE : { " " | "\t" | "\r" | "\n"} + +<IN_REGISTER> TOKEN: { + <PATH: (~["(", ")", ";", "\r", " ", "\t", "\n"])+> { + matchedToken.image = image.toString(); + }: DEFAULT } void Parse() throws IOException : {} @@ -288,6 +298,7 @@ void input() throws IOException : { String s; Token strTok = null; + Token strTok2 = null; } { strTok = <PIG> @@ -308,6 +319,20 @@ void input() throws IOException : { pc.validate(strTok.toString()); } ) | + strTok = <REGISTER> + strTok2 = <PATH> {} + { + // Adding a special case for register since it handles "/*" globbing + // and this conflicts with general multi-line comment "/* */". + // See the comment above on OTHERS on how tokenizer matches the longest + // match. Here, string next to "register" is treated as PATH TOKEN + // and therefore not consider "/*" as part of the comment + // (and avoid the longest match problem). + out.append(strTok.image); + String sub_line = pc.substitute(strTok2.image); + out.append(sub_line); + } + | s = paramString(){} { //process an ordinary pig line - perform substitution Modified: pig/branches/spark/src/org/apache/pig/tools/parameters/PreprocessorContext.java URL: http://svn.apache.org/viewvc/pig/branches/spark/src/org/apache/pig/tools/parameters/PreprocessorContext.java?rev=1784237&r1=1784236&r2=1784237&view=diff ============================================================================== --- pig/branches/spark/src/org/apache/pig/tools/parameters/PreprocessorContext.java (original) +++ pig/branches/spark/src/org/apache/pig/tools/parameters/PreprocessorContext.java Fri Feb 24 08:19:42 2017 @@ -27,6 +27,8 @@ import java.io.FileReader; import java.io.IOException; import java.io.InputStreamReader; import java.io.StringReader; +import java.util.ArrayDeque; +import java.util.Deque; import java.util.Hashtable; import java.util.List; import java.util.Map; @@ -40,20 +42,26 @@ import org.apache.pig.impl.PigContext; import org.apache.pig.impl.logicalLayer.FrontendException; import org.apache.pig.validator.BlackAndWhitelistFilter; import org.apache.pig.validator.PigCommandFilter; -import org.python.google.common.base.Preconditions; public class PreprocessorContext { - private Map<String, String> param_val; + private int tableinitsize = 10; + private Deque<Map<String,String>> param_val_stack; - // used internally to detect when a param is set multiple times, - // but it set with the same value so it's ok not to log a warning - private Map<String, String> param_source; - private PigContext pigContext; public Map<String, String> getParamVal() { - return param_val; + Map <String, String> ret = new Hashtable <String, String>(tableinitsize); + + //stack (deque) iterates LIFO + for (Map <String, String> map : param_val_stack ) { + for (Map.Entry<String, String> entry : map.entrySet()) { + if( ! ret.containsKey(entry.getKey()) ) { + ret.put(entry.getKey(), entry.getValue()); + } + } + } + return ret; } private final Log log = LogFactory.getLog(getClass()); @@ -63,24 +71,15 @@ public class PreprocessorContext { * smaller number only impacts performance */ public PreprocessorContext(int limit) { - param_val = new Hashtable<String, String> (limit); - param_source = new Hashtable<String, String> (limit); - } - - public PreprocessorContext(Map<String, String> paramVal) { - param_val = paramVal; - param_source = new Hashtable<String, String>(paramVal); + tableinitsize = limit; + param_val_stack = new ArrayDeque<Map<String,String>> (); + param_val_stack.push(new Hashtable<String, String> (tableinitsize)); } public void setPigContext(PigContext context) { this.pigContext = context; } - /* - public void processLiteral(String key, String val) { - processLiteral(key, val, true); - } */ - /** * This method generates parameter value by running specified command * @@ -102,20 +101,35 @@ public class PreprocessorContext { processOrdLine(key, val, true); } - /* - public void processLiteral(String key, String val, Boolean overwrite) { + public void paramScopePush() { + param_val_stack.push( new Hashtable<String, String> (tableinitsize) ); + } - if (param_val.containsKey(key)) { - if (overwrite) { - log.warn("Warning : Multiple values found for " + key + ". Using value " + val); - } else { - return; + public void paramScopePop() { + param_val_stack.pop(); + } + + public boolean paramval_containsKey(String key) { + for (Map <String, String> map : param_val_stack ) { + if( map.containsKey(key) ) { + return true; } } + return false; + } - String sub_val = substitute(val); - param_val.put(key, sub_val); - } */ + public String paramval_get(String key) { + for (Map <String, String> map : param_val_stack ) { + if( map.containsKey(key) ) { + return map.get(key); + } + } + return null; + } + + public void paramval_put(String key, String value) { + param_val_stack.peek().put(key, value); + } /** * This method generates parameter value by running specified command @@ -129,21 +143,21 @@ public class PreprocessorContext { filter.validate(PigCommandFilter.Command.SH); } - if (param_val.containsKey(key)) { - if (param_source.get(key).equals(val) || !overwrite) { - return; - } else { - log.warn("Warning : Multiple values found for " + key - + ". Using value " + val); - } + if (paramval_containsKey(key) && !overwrite) { + return; } - param_source.put(key, val); - val = val.substring(1, val.length()-1); //to remove the backticks String sub_val = substitute(val); sub_val = executeShellCommand(sub_val); - param_val.put(key, sub_val); + + if (paramval_containsKey(key) && !paramval_get(key).equals(sub_val) ) { + //(boolean overwrite is always true here) + log.warn("Warning : Multiple values found for " + key + " command `" + val + "`. " + + "Previous value " + paramval_get(key) + ", now using value " + sub_val); + } + + paramval_put(key, sub_val); } public void validate(String preprocessorCmd) throws FrontendException { @@ -175,18 +189,18 @@ public class PreprocessorContext { */ public void processOrdLine(String key, String val, Boolean overwrite) throws ParameterSubstitutionException { - if (param_val.containsKey(key)) { - if (param_source.get(key).equals(val) || !overwrite) { + String sub_val = substitute(val, key); + if (paramval_containsKey(key)) { + if (paramval_get(key).equals(sub_val) || !overwrite) { return; } else { - log.warn("Warning : Multiple values found for " + key + ". Using value " + val); + log.warn("Warning : Multiple values found for " + key + + ". Previous value " + paramval_get(key) + + ", now using value " + sub_val); } } - param_source.put(key, val); - - String sub_val = substitute(val, key); - param_val.put(key, sub_val); + paramval_put(key, sub_val); } @@ -318,7 +332,7 @@ public class PreprocessorContext { while (bracketKeyMatcher.find()) { if ( (bracketKeyMatcher.start() == 0) || (line.charAt( bracketKeyMatcher.start() - 1)) != '\\' ) { key = bracketKeyMatcher.group(1); - if (!(param_val.containsKey(key))) { + if (!(paramval_containsKey(key))) { String message; if (parentKey == null) { message = "Undefined parameter : " + key; @@ -327,7 +341,7 @@ public class PreprocessorContext { } throw new ParameterSubstitutionException(message); } - val = param_val.get(key); + val = paramval_get(key); if (val.contains("$")) { val = val.replaceAll("(?<!\\\\)\\$", "\\\\\\$"); } @@ -345,7 +359,7 @@ public class PreprocessorContext { // for escaped vars of the form \$<id> if ( (keyMatcher.start() == 0) || (line.charAt( keyMatcher.start() - 1)) != '\\' ) { key = keyMatcher.group(1); - if (!(param_val.containsKey(key))) { + if (!(paramval_containsKey(key))) { String message; if (parentKey == null) { message = "Undefined parameter : " + key; @@ -354,7 +368,7 @@ public class PreprocessorContext { } throw new ParameterSubstitutionException(message); } - val = param_val.get(key); + val = paramval_get(key); if (val.contains("$")) { val = val.replaceAll("(?<!\\\\)\\$", "\\\\\\$"); } Modified: pig/branches/spark/src/org/apache/pig/tools/pigscript/parser/PigScriptParser.jj URL: http://svn.apache.org/viewvc/pig/branches/spark/src/org/apache/pig/tools/pigscript/parser/PigScriptParser.jj?rev=1784237&r1=1784236&r2=1784237&view=diff ============================================================================== --- pig/branches/spark/src/org/apache/pig/tools/pigscript/parser/PigScriptParser.jj (original) +++ pig/branches/spark/src/org/apache/pig/tools/pigscript/parser/PigScriptParser.jj Fri Feb 24 08:19:42 2017 @@ -23,6 +23,7 @@ options { STATIC = false; // Case is ignored in keywords IGNORE_CASE = true; + // DEBUG_PARSER = true; JAVA_UNICODE_ESCAPE = true; } @@ -36,7 +37,7 @@ import java.util.List; import java.util.ArrayList; import org.apache.pig.impl.util.StringUtils; -import jline.ConsoleReader; +import jline.console.ConsoleReader; public abstract class PigScriptParser { @@ -217,7 +218,7 @@ TOKEN_MGR_DECLS : { { /*System.err.print(">> "); System.err.flush();*/ - consoleReader.setDefaultPrompt(">> "); + consoleReader.setPrompt(">> "); } } @@ -267,7 +268,7 @@ TOKEN_MGR_DECLS : { <"'"> {prevState = PIG_START;} : IN_STRING | <"`"> {prevState = PIG_START;} : IN_COMMAND | <(" " | "\t")+["A","a"]["S","s"](" " | "\t")+ > {prevState = PIG_START;} : SCHEMA_DEFINITION -| <(" " | "\t")+["G","g"]["E","e"]["N","n"]["E","e"]["R","r"]["A","a"]["T","t"]["E","e"](" " | "\t")+ > {prevState = PIG_START;} : GENERATE +| <(" " | "\t")+["G","g"]["E","e"]["N","n"]["E","e"]["R","r"]["A","a"]["T","t"]["E","e"](" " | "\t" | "\r" | "\n")+ > {prevState = PIG_START;} : GENERATE | <"{"> {pigBlockLevel = 1;} : IN_BLOCK | <"}"> {if (true) throw new TokenMgrError("Unmatched '}'", TokenMgrError.LEXICAL_ERROR);} | <";"> : PIG_END @@ -292,7 +293,8 @@ TOKEN_MGR_DECLS : { <IN_STRING> MORE : { - <"\\'"> + <"\\\\"> +| <"\\'"> | <"'"> { SwitchTo(prevState);} | <("\n" | "\r" | "\r\n")> {secondary_prompt();} | <(~[])> @@ -395,7 +397,7 @@ TOKEN_MGR_DECLS : { { <"\""> {prevState = IN_BLOCK;} : IN_DOUBLE_QUOTED_STRING | <(" " | "\t")+["A","a"]["S","s"](" " | "\t")+ > {prevState = IN_BLOCK;} : SCHEMA_DEFINITION -| <(" " | "\t")+["G","g"]["E","e"]["N","n"]["E","e"]["R","r"]["A","a"]["T","t"]["E","e"](" " | "\t")+> {prevState = IN_BLOCK;} : GENERATE +| <(" " | "\t")+["G","g"]["E","e"]["N","n"]["E","e"]["R","r"]["A","a"]["T","t"]["E","e"](" " | "\t" | "\r" | "\n")+> {prevState = IN_BLOCK;} : GENERATE | <"{"> {pigBlockLevel++;} | <"}"(";")?> {pigBlockLevel--; if (pigBlockLevel == 0) SwitchTo(PIG_END);} | <"'"> {prevState = IN_BLOCK;} : IN_STRING Modified: pig/branches/spark/src/org/apache/pig/tools/pigstats/EmbeddedPigStats.java URL: http://svn.apache.org/viewvc/pig/branches/spark/src/org/apache/pig/tools/pigstats/EmbeddedPigStats.java?rev=1784237&r1=1784236&r2=1784237&view=diff ============================================================================== --- pig/branches/spark/src/org/apache/pig/tools/pigstats/EmbeddedPigStats.java (original) +++ pig/branches/spark/src/org/apache/pig/tools/pigstats/EmbeddedPigStats.java Fri Feb 24 08:19:42 2017 @@ -147,6 +147,11 @@ final class EmbeddedPigStats extends Pig } @Override + public String getDisplayString() { + return null; + } + + @Override public long getProactiveSpillCountObjects() { throw new UnsupportedOperationException(); } Modified: pig/branches/spark/src/org/apache/pig/tools/pigstats/EmptyPigStats.java URL: http://svn.apache.org/viewvc/pig/branches/spark/src/org/apache/pig/tools/pigstats/EmptyPigStats.java?rev=1784237&r1=1784236&r2=1784237&view=diff ============================================================================== --- pig/branches/spark/src/org/apache/pig/tools/pigstats/EmptyPigStats.java (original) +++ pig/branches/spark/src/org/apache/pig/tools/pigstats/EmptyPigStats.java Fri Feb 24 08:19:42 2017 @@ -87,6 +87,11 @@ public class EmptyPigStats extends PigSt } @Override + public String getDisplayString() { + return null; + } + + @Override public JobGraph getJobGraph() { return emptyJobPlan; } Modified: pig/branches/spark/src/org/apache/pig/tools/pigstats/PigStats.java URL: http://svn.apache.org/viewvc/pig/branches/spark/src/org/apache/pig/tools/pigstats/PigStats.java?rev=1784237&r1=1784236&r2=1784237&view=diff ============================================================================== --- pig/branches/spark/src/org/apache/pig/tools/pigstats/PigStats.java (original) +++ pig/branches/spark/src/org/apache/pig/tools/pigstats/PigStats.java Fri Feb 24 08:19:42 2017 @@ -134,6 +134,11 @@ public abstract class PigStats { } /** + * Returns the display message in pig grunt + */ + public abstract String getDisplayString(); + + /** * Returns the DAG of jobs spawned by the script */ public JobGraph getJobGraph() { @@ -265,6 +270,13 @@ public abstract class PigStats { return ScriptState.get().getPigVersion(); } + /** + * Returns the contents of the script that was run. + */ + public String getScript() { + return ScriptState.get().getScript(); + } + public String getScriptId() { return ScriptState.get().getId(); } Modified: pig/branches/spark/src/org/apache/pig/tools/pigstats/PigStatsUtil.java URL: http://svn.apache.org/viewvc/pig/branches/spark/src/org/apache/pig/tools/pigstats/PigStatsUtil.java?rev=1784237&r1=1784236&r2=1784237&view=diff ============================================================================== --- pig/branches/spark/src/org/apache/pig/tools/pigstats/PigStatsUtil.java (original) +++ pig/branches/spark/src/org/apache/pig/tools/pigstats/PigStatsUtil.java Fri Feb 24 08:19:42 2017 @@ -24,7 +24,7 @@ import java.util.regex.Matcher; import java.util.regex.Pattern; import org.apache.pig.backend.hadoop.executionengine.physicalLayer.relationalOperators.POStore; -import org.apache.pig.backend.hadoop.executionengine.shims.HadoopShims; +import org.apache.pig.tools.pigstats.mapreduce.MRPigStatsUtil; import org.apache.pig.tools.pigstats.mapreduce.SimplePigStats; /** @@ -71,7 +71,7 @@ public class PigStatsUtil { */ @Deprecated public static final String FS_COUNTER_GROUP - = HadoopShims.getFsCounterGroupName(); + = MRPigStatsUtil.FS_COUNTER_GROUP; /** * Returns an empty PigStats object Use of this method is not advised as it Modified: pig/branches/spark/src/org/apache/pig/tools/pigstats/ScriptState.java URL: http://svn.apache.org/viewvc/pig/branches/spark/src/org/apache/pig/tools/pigstats/ScriptState.java?rev=1784237&r1=1784236&r2=1784237&view=diff ============================================================================== --- pig/branches/spark/src/org/apache/pig/tools/pigstats/ScriptState.java (original) +++ pig/branches/spark/src/org/apache/pig/tools/pigstats/ScriptState.java Fri Feb 24 08:19:42 2017 @@ -133,6 +133,8 @@ public abstract class ScriptState { MERGE_SPARSE_JOIN, REPLICATED_JOIN, SKEWED_JOIN, + BUILD_BLOOM, + FILTER_BLOOM, HASH_JOIN, COLLECTED_GROUP, MERGE_COGROUP, @@ -312,7 +314,7 @@ public abstract class ScriptState { maxScriptSize = Integer.valueOf(prop); } } - + this.truncatedScript = (script.length() > maxScriptSize) ? script.substring(0, maxScriptSize) : script; @@ -485,6 +487,10 @@ public abstract class ScriptState { public void visit(LOJoin op) { if (op.getJoinType() == JOINTYPE.HASH) { feature.set(PIG_FEATURE.HASH_JOIN.ordinal()); + } else if (op.getJoinType() == JOINTYPE.BLOOM) { + feature.set(PIG_FEATURE.HASH_JOIN.ordinal()); + feature.set(PIG_FEATURE.BUILD_BLOOM.ordinal()); + feature.set(PIG_FEATURE.FILTER_BLOOM.ordinal()); } else if (op.getJoinType() == JOINTYPE.MERGE) { feature.set(PIG_FEATURE.MERGE_JOIN.ordinal()); } else if (op.getJoinType() == JOINTYPE.MERGESPARSE) { @@ -506,6 +512,7 @@ public abstract class ScriptState { feature.set(PIG_FEATURE.RANK.ordinal()); } + @Override public void visit(LOSort op) { feature.set(PIG_FEATURE.ORDER_BY.ordinal()); } Modified: pig/branches/spark/src/org/apache/pig/tools/pigstats/mapreduce/MRJobStats.java URL: http://svn.apache.org/viewvc/pig/branches/spark/src/org/apache/pig/tools/pigstats/mapreduce/MRJobStats.java?rev=1784237&r1=1784236&r2=1784237&view=diff ============================================================================== --- pig/branches/spark/src/org/apache/pig/tools/pigstats/mapreduce/MRJobStats.java (original) +++ pig/branches/spark/src/org/apache/pig/tools/pigstats/mapreduce/MRJobStats.java Fri Feb 24 08:19:42 2017 @@ -32,15 +32,16 @@ import org.apache.hadoop.conf.Configurat import org.apache.hadoop.mapred.Counters; import org.apache.hadoop.mapred.Counters.Counter; import org.apache.hadoop.mapred.JobID; -import org.apache.hadoop.mapred.TaskReport; +import org.apache.hadoop.mapreduce.Cluster; +import org.apache.hadoop.mapreduce.TaskReport; import org.apache.hadoop.mapred.jobcontrol.Job; import org.apache.hadoop.mapreduce.TaskType; +import org.apache.pig.PigConfiguration; import org.apache.pig.PigCounters; import org.apache.pig.backend.hadoop.executionengine.mapReduceLayer.JobControlCompiler; import org.apache.pig.backend.hadoop.executionengine.mapReduceLayer.MRConfiguration; import org.apache.pig.backend.hadoop.executionengine.mapReduceLayer.MapReduceOper; import org.apache.pig.backend.hadoop.executionengine.physicalLayer.relationalOperators.POStore; -import org.apache.pig.backend.hadoop.executionengine.shims.HadoopShims; import org.apache.pig.classification.InterfaceAudience; import org.apache.pig.classification.InterfaceStability; import org.apache.pig.impl.io.FileSpec; @@ -53,6 +54,8 @@ import org.apache.pig.tools.pigstats.Out import org.apache.pig.tools.pigstats.PigStats.JobGraph; import org.apache.pig.tools.pigstats.PigStats.JobGraphPrinter; +import org.python.google.common.collect.Lists; + /** * This class encapsulates the runtime statistics of a MapReduce job. @@ -281,7 +284,7 @@ public final class MRJobStats extends Jo void addCounters(Job job) { try { - counters = HadoopShims.getCounters(job); + counters = getCounters(job); } catch (IOException e) { LOG.warn("Unable to get job counters", e); } @@ -349,13 +352,13 @@ public final class MRJobStats extends Jo void addMapReduceStatistics(Job job) { Iterator<TaskReport> maps = null; try { - maps = HadoopShims.getTaskReports(job, TaskType.MAP); + maps = getTaskReports(job, TaskType.MAP); } catch (IOException e) { LOG.warn("Failed to get map task report", e); } Iterator<TaskReport> reduces = null; try { - reduces = HadoopShims.getTaskReports(job, TaskType.REDUCE); + reduces = getTaskReports(job, TaskType.REDUCE); } catch (IOException e) { LOG.warn("Failed to get reduce task report", e); } @@ -515,4 +518,35 @@ public final class MRJobStats extends Jo inputs.add(is); } + public static Iterator<TaskReport> getTaskReports(Job job, TaskType type) throws IOException { + if (job.getJobConf().getBoolean(PigConfiguration.PIG_NO_TASK_REPORT, false)) { + LOG.info("TaskReports are disabled for job: " + job.getAssignedJobID()); + return null; + } + Cluster cluster = new Cluster(job.getJobConf()); + try { + org.apache.hadoop.mapreduce.Job mrJob = cluster.getJob(job.getAssignedJobID()); + if (mrJob == null) { // In local mode, mrJob will be null + mrJob = job.getJob(); + } + org.apache.hadoop.mapreduce.TaskReport[] reports = mrJob.getTaskReports(type); + return Lists.newArrayList(reports).iterator(); + } catch (InterruptedException ir) { + throw new IOException(ir); + } + } + + public static Counters getCounters(Job job) throws IOException { + try { + Cluster cluster = new Cluster(job.getJobConf()); + org.apache.hadoop.mapreduce.Job mrJob = cluster.getJob(job.getAssignedJobID()); + if (mrJob == null) { // In local mode, mrJob will be null + mrJob = job.getJob(); + } + return new Counters(mrJob.getCounters()); + } catch (Exception ir) { + throw new IOException(ir); + } + } + } Modified: pig/branches/spark/src/org/apache/pig/tools/pigstats/mapreduce/MRPigStatsUtil.java URL: http://svn.apache.org/viewvc/pig/branches/spark/src/org/apache/pig/tools/pigstats/mapreduce/MRPigStatsUtil.java?rev=1784237&r1=1784236&r2=1784237&view=diff ============================================================================== --- pig/branches/spark/src/org/apache/pig/tools/pigstats/mapreduce/MRPigStatsUtil.java (original) +++ pig/branches/spark/src/org/apache/pig/tools/pigstats/mapreduce/MRPigStatsUtil.java Fri Feb 24 08:19:42 2017 @@ -33,7 +33,6 @@ import org.apache.pig.backend.hadoop.exe import org.apache.pig.backend.hadoop.executionengine.mapReduceLayer.MapReduceOper; import org.apache.pig.backend.hadoop.executionengine.mapReduceLayer.NativeMapReduceOper; import org.apache.pig.backend.hadoop.executionengine.mapReduceLayer.plans.MROperPlan; -import org.apache.pig.backend.hadoop.executionengine.shims.HadoopShims; import org.apache.pig.classification.InterfaceAudience.Private; import org.apache.pig.impl.PigContext; import org.apache.pig.tools.pigstats.JobStats; @@ -51,7 +50,7 @@ public class MRPigStatsUtil extends PigS public static final String TASK_COUNTER_GROUP = "org.apache.hadoop.mapred.Task$Counter"; public static final String FS_COUNTER_GROUP - = HadoopShims.getFsCounterGroupName(); + = "org.apache.hadoop.mapreduce.FileSystemCounter"; private static final Log LOG = LogFactory.getLog(MRPigStatsUtil.class); Modified: pig/branches/spark/src/org/apache/pig/tools/pigstats/mapreduce/SimplePigStats.java URL: http://svn.apache.org/viewvc/pig/branches/spark/src/org/apache/pig/tools/pigstats/mapreduce/SimplePigStats.java?rev=1784237&r1=1784236&r2=1784237&view=diff ============================================================================== --- pig/branches/spark/src/org/apache/pig/tools/pigstats/mapreduce/SimplePigStats.java (original) +++ pig/branches/spark/src/org/apache/pig/tools/pigstats/mapreduce/SimplePigStats.java Fri Feb 24 08:19:42 2017 @@ -207,13 +207,18 @@ public final class SimplePigStats extend } void display() { + LOG.info(getDisplayString()); + } + + @Override + public String getDisplayString() { if (returnCode == ReturnCode.UNKNOWN) { LOG.warn("unknown return code, can't display the results"); - return; + return ""; } if (pigContext == null) { LOG.warn("unknown exec type, don't display the results"); - return; + return ""; } SimpleDateFormat sdf = new SimpleDateFormat(DATE_FORMAT); @@ -276,7 +281,7 @@ public final class SimplePigStats extend sb.append("\nJob DAG:\n").append(jobPlan.toString()); - LOG.info("Script Statistics: \n" + sb.toString()); + return "Script Statistics: \n" + sb.toString(); } void mapMROperToJob(MapReduceOper mro, Job job) { Modified: pig/branches/spark/src/org/apache/pig/tools/pigstats/spark/SparkPigStats.java URL: http://svn.apache.org/viewvc/pig/branches/spark/src/org/apache/pig/tools/pigstats/spark/SparkPigStats.java?rev=1784237&r1=1784236&r2=1784237&view=diff ============================================================================== --- pig/branches/spark/src/org/apache/pig/tools/pigstats/spark/SparkPigStats.java (original) +++ pig/branches/spark/src/org/apache/pig/tools/pigstats/spark/SparkPigStats.java Fri Feb 24 08:19:42 2017 @@ -115,6 +115,12 @@ public class SparkPigStats extends PigSt } private void display() { + LOG.info(getDisplayString()); + } + + @Override + public String getDisplayString() { + StringBuilder sb = new StringBuilder(); Iterator<JobStats> iter = jobPlan.iterator(); while (iter.hasNext()) { SparkJobStats js = (SparkJobStats)iter.next(); @@ -122,22 +128,23 @@ public class SparkPigStats extends PigSt SparkOperator sparkOperator = jobSparkOperatorMap.get(js); js.setAlias(sparkOperator); } - LOG.info( "Spark Job [" + js.getJobId() + "] Metrics"); + sb.append("Spark Job [" + js.getJobId() + "] Metrics"); Map<String, Long> stats = js.getStats(); if (stats == null) { - LOG.info("No statistics found for job " + js.getJobId()); - return; + sb.append("No statistics found for job " + js.getJobId()); + return sb.toString(); } Iterator statIt = stats.entrySet().iterator(); while (statIt.hasNext()) { Map.Entry pairs = (Map.Entry)statIt.next(); - LOG.info("\t" + pairs.getKey() + " : " + pairs.getValue()); + sb.append("\t" + pairs.getKey() + " : " + pairs.getValue()); } for (InputStats inputStat : js.getInputs()){ - LOG.info("\t"+inputStat.getDisplayString()); + sb.append("\t"+inputStat.getDisplayString()); } } + return sb.toString(); } @Override Modified: pig/branches/spark/src/org/apache/pig/tools/pigstats/tez/TezDAGStats.java URL: http://svn.apache.org/viewvc/pig/branches/spark/src/org/apache/pig/tools/pigstats/tez/TezDAGStats.java?rev=1784237&r1=1784236&r2=1784237&view=diff ============================================================================== --- pig/branches/spark/src/org/apache/pig/tools/pigstats/tez/TezDAGStats.java (original) +++ pig/branches/spark/src/org/apache/pig/tools/pigstats/tez/TezDAGStats.java Fri Feb 24 08:19:42 2017 @@ -245,7 +245,11 @@ public class TezDAGStats extends JobStat OutputStats existingOut = outputsByLocation.get(output.getLocation()); // In case of multistore, bytesWritten is already calculated // from size of all the files in the output directory. - if (!output.getPOStore().isMultiStore() && output.getBytes() > -1) { + // So use that if there is a combination of multistore and single store + if (output.getPOStore().isMultiStore()) { + existingOut.setBytes(output.getBytes()); + existingOut.setPOStore(output.getPOStore()); + } else if (!existingOut.getPOStore().isMultiStore() && output.getBytes() > -1) { long bytes = existingOut.getBytes() > -1 ? (existingOut.getBytes() + output.getBytes()) : output.getBytes(); Modified: pig/branches/spark/src/org/apache/pig/tools/pigstats/tez/TezPigScriptStats.java URL: http://svn.apache.org/viewvc/pig/branches/spark/src/org/apache/pig/tools/pigstats/tez/TezPigScriptStats.java?rev=1784237&r1=1784236&r2=1784237&view=diff ============================================================================== --- pig/branches/spark/src/org/apache/pig/tools/pigstats/tez/TezPigScriptStats.java (original) +++ pig/branches/spark/src/org/apache/pig/tools/pigstats/tez/TezPigScriptStats.java Fri Feb 24 08:19:42 2017 @@ -117,6 +117,11 @@ public class TezPigScriptStats extends P } private void display() { + LOG.info(getDisplayString()); + } + + @Override + public String getDisplayString() { SimpleDateFormat sdf = new SimpleDateFormat(DATE_FORMAT); StringBuilder sb = new StringBuilder(); sb.append("\n"); @@ -170,7 +175,7 @@ public class TezPigScriptStats extends P for (OutputStats os : getOutputStats()) { sb.append(os.getDisplayString().trim()).append("\n"); } - LOG.info("Script Statistics:\n" + sb.toString()); + return "Script Statistics:\n" + sb.toString(); } /** Modified: pig/branches/spark/src/org/apache/pig/tools/pigstats/tez/TezScriptState.java URL: http://svn.apache.org/viewvc/pig/branches/spark/src/org/apache/pig/tools/pigstats/tez/TezScriptState.java?rev=1784237&r1=1784236&r2=1784237&view=diff ============================================================================== --- pig/branches/spark/src/org/apache/pig/tools/pigstats/tez/TezScriptState.java (original) +++ pig/branches/spark/src/org/apache/pig/tools/pigstats/tez/TezScriptState.java Fri Feb 24 08:19:42 2017 @@ -275,6 +275,12 @@ public class TezScriptState extends Scri if (tezOp.isRegularJoin()) { feature.set(PIG_FEATURE.HASH_JOIN.ordinal()); } + if (tezOp.isBuildBloom()) { + feature.set(PIG_FEATURE.BUILD_BLOOM.ordinal()); + } + if (tezOp.isFilterBloom()) { + feature.set(PIG_FEATURE.FILTER_BLOOM.ordinal()); + } if (tezOp.isUnion()) { feature.set(PIG_FEATURE.UNION.ordinal()); } Modified: pig/branches/spark/src/org/apache/pig/tools/pigstats/tez/TezVertexStats.java URL: http://svn.apache.org/viewvc/pig/branches/spark/src/org/apache/pig/tools/pigstats/tez/TezVertexStats.java?rev=1784237&r1=1784236&r2=1784237&view=diff ============================================================================== --- pig/branches/spark/src/org/apache/pig/tools/pigstats/tez/TezVertexStats.java (original) +++ pig/branches/spark/src/org/apache/pig/tools/pigstats/tez/TezVertexStats.java Fri Feb 24 08:19:42 2017 @@ -22,6 +22,7 @@ import static org.apache.pig.tools.pigst import static org.apache.pig.tools.pigstats.tez.TezDAGStats.TASK_COUNTER_GROUP; import java.io.IOException; +import java.util.ArrayList; import java.util.Collections; import java.util.HashMap; import java.util.Iterator; @@ -289,13 +290,19 @@ public class TezVertexStats extends JobS } // Split followed by union will have multiple stores writing to same location - Map<String, POStore> uniqueOutputs = new HashMap<String, POStore>(); + Map<String, List<POStore>> uniqueOutputs = new HashMap<String, List<POStore>>(); for (POStore sto : stores) { POStoreTez store = (POStoreTez) sto; - uniqueOutputs.put(store.getOutputKey(), store); + List<POStore> stores = uniqueOutputs.get(store.getOutputKey()); + if (stores == null) { + stores = new ArrayList<POStore>(); + } + stores.add(store); + uniqueOutputs.put(store.getOutputKey(), stores); } - for (POStore sto : uniqueOutputs.values()) { + for (List<POStore> stores : uniqueOutputs.values()) { + POStore sto = stores.get(0); if (sto.isTmpStore()) { continue; } @@ -304,11 +311,16 @@ public class TezVertexStats extends JobS String filename = sto.getSFile().getFileName(); if (counters != null) { if (msGroup != null) { - Long n = msGroup.get(PigStatsUtil.getMultiStoreCounterName(sto)); - if (n != null) records = n; - } - if (records == -1) { - records = outputRecords; + long n = 0; + Long val = null; + for (POStore store : stores) { + val = msGroup.get(PigStatsUtil.getMultiStoreCounterName(store)); + // Tez removes 0 value counters for efficiency. + if (val != null) { + n += val; + }; + } + records = n; } if (isSuccessful() && records == -1) { // Tez removes 0 value counters for efficiency. @@ -338,13 +350,13 @@ public class TezVertexStats extends JobS @Override @Deprecated public int getNumberMaps() { - return this.isMapOpts ? numTasks : -1; + return this.isMapOpts ? numTasks : 0; } @Override @Deprecated public int getNumberReduces() { - return this.isMapOpts ? -1 : numTasks; + return this.isMapOpts ? 0 : numTasks; } @Override @@ -386,25 +398,25 @@ public class TezVertexStats extends JobS @Override @Deprecated public long getMapInputRecords() { - return this.isMapOpts ? numInputRecords : -1; + return this.isMapOpts ? numInputRecords : 0; } @Override @Deprecated public long getMapOutputRecords() { - return this.isMapOpts ? numOutputRecords : -1; + return this.isMapOpts ? numOutputRecords : 0; } @Override @Deprecated public long getReduceInputRecords() { - return this.isMapOpts ? -1 : numInputRecords; + return numReduceInputRecords; } @Override @Deprecated public long getReduceOutputRecords() { - return this.isMapOpts ? -1 : numOutputRecords; + return this.isMapOpts ? 0 : numOutputRecords; } @Override
