Modified: pig/branches/spark/src/org/apache/pig/impl/util/SpillableMemoryManager.java URL: http://svn.apache.org/viewvc/pig/branches/spark/src/org/apache/pig/impl/util/SpillableMemoryManager.java?rev=1783988&r1=1783987&r2=1783988&view=diff ============================================================================== --- pig/branches/spark/src/org/apache/pig/impl/util/SpillableMemoryManager.java (original) +++ pig/branches/spark/src/org/apache/pig/impl/util/SpillableMemoryManager.java Wed Feb 22 09:43:41 2017 @@ -51,10 +51,9 @@ public class SpillableMemoryManager impl private static final Log log = LogFactory.getLog(SpillableMemoryManager.class); - private static final int ONE_GB = 1024 * 1024 * 1024; private static final int UNUSED_MEMORY_THRESHOLD_DEFAULT = 350 * 1024 * 1024; - private static final double MEMORY_THRESHOLD_FRACTION_DEFAULT = 0.7; - private static final double COLLECTION_THRESHOLD_FRACTION_DEFAULT = 0.7; + private static final float MEMORY_THRESHOLD_FRACTION_DEFAULT = 0.7f; + private static final float COLLECTION_THRESHOLD_FRACTION_DEFAULT = 0.7f; private LinkedList<WeakReference<Spillable>> spillables = new LinkedList<WeakReference<Spillable>>(); // References to spillables with size @@ -86,7 +85,7 @@ public class SpillableMemoryManager impl // fraction of the total heap used for the threshold to determine // if we want to perform an extra gc before the spill - private double extraGCThresholdFraction = 0.05; + private float extraGCThresholdFraction = 0.05f; private long extraGCSpillSizeThreshold = 0L; private volatile boolean blockRegisterOnSpill = false; @@ -142,7 +141,7 @@ public class SpillableMemoryManager impl * @param unusedMemoryThreshold * Unused memory size below which we want to get notifications */ - private void configureMemoryThresholds(double memoryThresholdFraction, double collectionMemoryThresholdFraction, long unusedMemoryThreshold) { + private void configureMemoryThresholds(float memoryThresholdFraction, float collectionMemoryThresholdFraction, long unusedMemoryThreshold) { long tenuredHeapSize = tenuredHeap.getUsage().getMax(); memoryThresholdSize = (long)(tenuredHeapSize * memoryThresholdFraction); collectionThresholdSize = (long)(tenuredHeapSize * collectionMemoryThresholdFraction); @@ -184,8 +183,8 @@ public class SpillableMemoryManager impl spillFileSizeThreshold = conf.getLong("pig.spill.size.threshold", spillFileSizeThreshold); gcActivationSize = conf.getLong("pig.spill.gc.activation.size", gcActivationSize); - double memoryThresholdFraction = conf.getDouble(PigConfiguration.PIG_SPILL_MEMORY_USAGE_THRESHOLD_FRACTION, MEMORY_THRESHOLD_FRACTION_DEFAULT); - double collectionThresholdFraction = conf.getDouble(PigConfiguration.PIG_SPILL_COLLECTION_THRESHOLD_FRACTION, COLLECTION_THRESHOLD_FRACTION_DEFAULT); + float memoryThresholdFraction = conf.getFloat(PigConfiguration.PIG_SPILL_MEMORY_USAGE_THRESHOLD_FRACTION, MEMORY_THRESHOLD_FRACTION_DEFAULT); + float collectionThresholdFraction = conf.getFloat(PigConfiguration.PIG_SPILL_COLLECTION_THRESHOLD_FRACTION, COLLECTION_THRESHOLD_FRACTION_DEFAULT); long unusedMemoryThreshold = conf.getLong(PigConfiguration.PIG_SPILL_UNUSED_MEMORY_THRESHOLD_SIZE, UNUSED_MEMORY_THRESHOLD_DEFAULT); configureMemoryThresholds(memoryThresholdFraction, collectionThresholdFraction, unusedMemoryThreshold); } @@ -199,7 +198,7 @@ public class SpillableMemoryManager impl // used - heapmax/2 + heapmax/4 long toFree = 0L; if(n.getType().equals(MemoryNotificationInfo.MEMORY_THRESHOLD_EXCEEDED)) { - toFree = info.getUsage().getUsed() - collectionThresholdSize + (long)(collectionThresholdSize * 0.5); + toFree = info.getUsage().getUsed() - memoryThresholdSize + (long)(memoryThresholdSize * 0.5); //log String msg = "memory handler call- Usage threshold " @@ -211,7 +210,7 @@ public class SpillableMemoryManager impl log.debug(msg); } } else { // MEMORY_COLLECTION_THRESHOLD_EXCEEDED CASE - toFree = info.getUsage().getUsed() - memoryThresholdSize + (long)(memoryThresholdSize * 0.5); + toFree = info.getUsage().getUsed() - collectionThresholdSize + (long)(collectionThresholdSize * 0.5); //log String msg = "memory handler call - Collection threshold "
Modified: pig/branches/spark/src/org/apache/pig/impl/util/Utils.java URL: http://svn.apache.org/viewvc/pig/branches/spark/src/org/apache/pig/impl/util/Utils.java?rev=1783988&r1=1783987&r2=1783988&view=diff ============================================================================== --- pig/branches/spark/src/org/apache/pig/impl/util/Utils.java (original) +++ pig/branches/spark/src/org/apache/pig/impl/util/Utils.java Wed Feb 22 09:43:41 2017 @@ -48,6 +48,7 @@ import org.apache.hadoop.io.compress.BZi import org.apache.hadoop.io.compress.GzipCodec; import org.apache.hadoop.mapred.JobConf; import org.apache.hadoop.mapreduce.Job; +import org.apache.hadoop.util.ShutdownHookManager; import org.apache.pig.FileInputLoadFunc; import org.apache.pig.FuncSpec; import org.apache.pig.LoadFunc; @@ -93,18 +94,10 @@ public class Utils { return System.getProperty("java.vendor").contains("IBM"); } - public static boolean isHadoop23() { - String version = org.apache.hadoop.util.VersionInfo.getVersion(); - if (version.matches("\\b0\\.23\\..+\\b")) - return true; - return false; - } - - public static boolean isHadoop2() { - String version = org.apache.hadoop.util.VersionInfo.getVersion(); - if (version.matches("\\b2\\.\\d+\\..+")) - return true; - return false; + public static boolean is64bitJVM() { + String arch = System.getProperties().getProperty("sun.arch.data.model", + System.getProperty("com.ibm.vm.bitmode")); + return arch != null && arch.equals("64"); } /** @@ -574,6 +567,11 @@ public class Utils { return pigContext.getExecType().isLocal() || conf.getBoolean(PigImplConstants.CONVERTED_TO_LOCAL, false); } + public static boolean isLocal(Configuration conf) { + return conf.getBoolean(PigImplConstants.PIG_EXECTYPE_MODE_LOCAL, false) + || conf.getBoolean(PigImplConstants.CONVERTED_TO_LOCAL, false); + } + // PIG-3929 use parameter substitution for pig properties similar to Hadoop Configuration // Following code has been borrowed from Hadoop's Configuration#substituteVars private static Pattern varPat = Pattern.compile("\\$\\{[^\\}\\$\u0020]+\\}"); @@ -697,4 +695,15 @@ public class Utils { DateTimeZone.setDefault(DateTimeZone.forID(dtzStr)); } } + + /** + * Add shutdown hook that runs before the FileSystem cache shutdown happens. + * + * @param hook code to execute during shutdown + * @param priority Priority over the FileSystem.SHUTDOWN_HOOK_PRIORITY + */ + public static void addShutdownHookWithPriority(Runnable hook, int priority) { + ShutdownHookManager.get().addShutdownHook(hook, + FileSystem.SHUTDOWN_HOOK_PRIORITY + priority); + } } Modified: pig/branches/spark/src/org/apache/pig/impl/util/avro/AvroStorageDataConversionUtilities.java URL: http://svn.apache.org/viewvc/pig/branches/spark/src/org/apache/pig/impl/util/avro/AvroStorageDataConversionUtilities.java?rev=1783988&r1=1783987&r2=1783988&view=diff ============================================================================== --- pig/branches/spark/src/org/apache/pig/impl/util/avro/AvroStorageDataConversionUtilities.java (original) +++ pig/branches/spark/src/org/apache/pig/impl/util/avro/AvroStorageDataConversionUtilities.java Wed Feb 22 09:43:41 2017 @@ -118,6 +118,8 @@ public class AvroStorageDataConversionUt return ByteBuffer.wrap(((DataByteArray) o).get()); case FIXED: return new GenericData.Fixed(s, ((DataByteArray) o).get()); + case ENUM: + return new GenericData.EnumSymbol(s,o.toString()); default: if (DataType.findType(o) == DataType.DATETIME) { return ((DateTime) o).getMillis(); Modified: pig/branches/spark/src/org/apache/pig/impl/util/avro/AvroTupleWrapper.java URL: http://svn.apache.org/viewvc/pig/branches/spark/src/org/apache/pig/impl/util/avro/AvroTupleWrapper.java?rev=1783988&r1=1783987&r2=1783988&view=diff ============================================================================== --- pig/branches/spark/src/org/apache/pig/impl/util/avro/AvroTupleWrapper.java (original) +++ pig/branches/spark/src/org/apache/pig/impl/util/avro/AvroTupleWrapper.java Wed Feb 22 09:43:41 2017 @@ -33,6 +33,7 @@ import org.apache.commons.logging.LogFac import org.apache.pig.backend.executionengine.ExecException; import org.apache.pig.data.DataByteArray; import org.apache.pig.data.Tuple; +import org.apache.pig.data.TupleFactory; import java.io.DataInput; import java.io.DataOutput; @@ -49,6 +50,7 @@ import java.util.Map; public final class AvroTupleWrapper <T extends IndexedRecord> implements Tuple { private static final Log LOG = LogFactory.getLog(AvroTupleWrapper.class); + private TupleFactory mTupleFactory = TupleFactory.getInstance(); /** * The Avro object wrapped in the pig Tuple. @@ -64,9 +66,9 @@ public final class AvroTupleWrapper <T e } @Override - public void write(final DataOutput o) throws IOException { - throw new IOException( - this.getClass().toString() + ".write called, but not implemented yet"); + public void write(DataOutput out) throws IOException { + Tuple t = mTupleFactory.newTupleNoCopy(getAll()); + t.write(out); } @SuppressWarnings("rawtypes") Modified: pig/branches/spark/src/org/apache/pig/newplan/FilterExtractor.java URL: http://svn.apache.org/viewvc/pig/branches/spark/src/org/apache/pig/newplan/FilterExtractor.java?rev=1783988&r1=1783987&r2=1783988&view=diff ============================================================================== --- pig/branches/spark/src/org/apache/pig/newplan/FilterExtractor.java (original) +++ pig/branches/spark/src/org/apache/pig/newplan/FilterExtractor.java Wed Feb 22 09:43:41 2017 @@ -98,13 +98,17 @@ public abstract class FilterExtractor { public void visit() throws FrontendException { // we will visit the leaf and it will recursively walk the plan LogicalExpression leaf = (LogicalExpression)originalPlan.getSources().get( 0 ); - // if the leaf is a unary operator it should be a FilterFunc in - // which case we don't try to extract partition filter conditions - if(leaf instanceof BinaryExpression) { - // recursively traverse the tree bottom up - // checkPushdown returns KeyState which is pair of LogicalExpression - BinaryExpression binExpr = (BinaryExpression)leaf; - KeyState finale = checkPushDown(binExpr); + + // recursively traverse the tree bottom up + // checkPushdown returns KeyState which is pair of LogicalExpression + KeyState finale = null; + if (leaf instanceof BinaryExpression) { + finale = checkPushDown((BinaryExpression) leaf); + } else if (leaf instanceof UnaryExpression) { + finale = checkPushDown((UnaryExpression) leaf); + } + + if (finale != null) { this.filterExpr = finale.filterExpr; this.pushdownExpr = getExpression(finale.pushdownExpr); } @@ -278,12 +282,22 @@ public abstract class FilterExtractor { if (unaryExpr instanceof CastExpression) { return checkPushDown(unaryExpr.getExpression()); } - if (unaryExpr instanceof IsNullExpression) { - state.pushdownExpr = unaryExpr; - state.filterExpr = null; - } else if (unaryExpr instanceof NotExpression) { - state.pushdownExpr = unaryExpr; - state.filterExpr = null; + // For IsNull, the child may not be a supported expression, e.g. MapLookupExpression. + // For NotExpression, the child, C, is broken into expressions P and F such that C = P AND F + // Consequently, NOT C = NOT P OR NOT F, which can't be expressed as an AND so both must be + // pushed or both used as a filter. + // For both cases, this expr can be pushed if and only if the entire child can be. + if (unaryExpr instanceof IsNullExpression || unaryExpr instanceof NotExpression) { + KeyState childState = checkPushDown(unaryExpr.getExpression()); + if (childState.filterExpr == null) { + // only push down if the entire expression can be pushed + state.pushdownExpr = unaryExpr; + state.filterExpr = null; + } else { + removeFromFilteredPlan(childState.filterExpr); + state.filterExpr = addToFilterPlan(unaryExpr); + state.pushdownExpr = null; + } } else { state.filterExpr = addToFilterPlan(unaryExpr); state.pushdownExpr = null; Modified: pig/branches/spark/src/org/apache/pig/newplan/logical/expression/ExpToPhyTranslationVisitor.java URL: http://svn.apache.org/viewvc/pig/branches/spark/src/org/apache/pig/newplan/logical/expression/ExpToPhyTranslationVisitor.java?rev=1783988&r1=1783987&r2=1783988&view=diff ============================================================================== --- pig/branches/spark/src/org/apache/pig/newplan/logical/expression/ExpToPhyTranslationVisitor.java (original) +++ pig/branches/spark/src/org/apache/pig/newplan/logical/expression/ExpToPhyTranslationVisitor.java Wed Feb 22 09:43:41 2017 @@ -323,6 +323,7 @@ public class ExpToPhyTranslationVisitor public void visit( CastExpression op ) throws FrontendException { POCast pCast = new POCast(new OperatorKey(DEFAULT_SCOPE, nodeGen .getNextNodeId(DEFAULT_SCOPE))); + pCast.addOriginalLocation(op.getFieldSchema().alias, op.getLocation()) ; // physOp.setAlias(op.getAlias()); currentPlan.add(pCast); Modified: pig/branches/spark/src/org/apache/pig/newplan/logical/expression/MapLookupExpression.java URL: http://svn.apache.org/viewvc/pig/branches/spark/src/org/apache/pig/newplan/logical/expression/MapLookupExpression.java?rev=1783988&r1=1783987&r2=1783988&view=diff ============================================================================== --- pig/branches/spark/src/org/apache/pig/newplan/logical/expression/MapLookupExpression.java (original) +++ pig/branches/spark/src/org/apache/pig/newplan/logical/expression/MapLookupExpression.java Wed Feb 22 09:43:41 2017 @@ -95,7 +95,8 @@ public class MapLookupExpression extends LogicalFieldSchema predFS = successor.getFieldSchema(); if (predFS!=null) { if (predFS.type==DataType.MAP && predFS.schema!=null) { - return (predFS.schema.getField(0)); + fieldSchema = predFS.schema.getField(0); + return fieldSchema; } else { fieldSchema = new LogicalSchema.LogicalFieldSchema(null, null, DataType.BYTEARRAY); Modified: pig/branches/spark/src/org/apache/pig/newplan/logical/relational/LOGenerate.java URL: http://svn.apache.org/viewvc/pig/branches/spark/src/org/apache/pig/newplan/logical/relational/LOGenerate.java?rev=1783988&r1=1783987&r2=1783988&view=diff ============================================================================== --- pig/branches/spark/src/org/apache/pig/newplan/logical/relational/LOGenerate.java (original) +++ pig/branches/spark/src/org/apache/pig/newplan/logical/relational/LOGenerate.java Wed Feb 22 09:43:41 2017 @@ -37,6 +37,7 @@ public class LOGenerate extends LogicalR // to store uid in mUserDefinedSchema private List<LogicalSchema> mUserDefinedSchema = null; private List<LogicalSchema> outputPlanSchemas = null; + private List<LogicalSchema> expSchemas = null; // If LOGenerate generate new uid, cache it here. // This happens when expression plan does not have complete schema, however, // user give complete schema in ForEach statement in script @@ -71,6 +72,7 @@ public class LOGenerate extends LogicalR schema = new LogicalSchema(); outputPlanSchemas = new ArrayList<LogicalSchema>(); + expSchemas = new ArrayList<LogicalSchema>(); for(int i=0; i<outputPlans.size(); i++) { LogicalExpression exp = (LogicalExpression)outputPlans.get(i).getSources().get(0); @@ -93,19 +95,17 @@ public class LOGenerate extends LogicalR fieldSchema = exp.getFieldSchema().deepCopy(); expSchema = new LogicalSchema(); - if ((fieldSchema.type != DataType.TUPLE && fieldSchema.type != DataType.BAG)||!flattenFlags[i]) { + if ((fieldSchema.type != DataType.TUPLE && fieldSchema.type != DataType.BAG && fieldSchema.type != DataType.MAP) || !flattenFlags[i]) { // if type is primitive, just add to schema - if (fieldSchema!=null) + if (fieldSchema != null) expSchema.addField(fieldSchema); - else - expSchema = null; } else { - // if bag/tuple don't have inner schema, after flatten, we don't have schema for the entire operator + // if bag/tuple/map don't have inner schema, after flatten, we don't have schema for the entire operator if (fieldSchema.schema==null) { expSchema = null; } else { - // if we come here, we get a BAG/Tuple with flatten, extract inner schema of the tuple as expSchema + // if we come here, we get a BAG/Tuple/Map with flatten, extract inner schema of the tuple as expSchema List<LogicalSchema.LogicalFieldSchema> innerFieldSchemas = new ArrayList<LogicalSchema.LogicalFieldSchema>(); if (flattenFlags[i]) { if (fieldSchema.type == DataType.BAG) { @@ -117,13 +117,23 @@ public class LOGenerate extends LogicalR fs.alias = fs.alias == null ? null : fieldSchema.alias + "::" + fs.alias; } } + } else if (fieldSchema.type == DataType.MAP) { + //should only contain 1 schemafield for Map's value + innerFieldSchemas = fieldSchema.schema.getFields(); + LogicalSchema.LogicalFieldSchema fsForValue = innerFieldSchemas.get(0); + fsForValue.alias = fieldSchema.alias + "::value"; + + LogicalSchema.LogicalFieldSchema fsForKey = new LogicalFieldSchema( + fieldSchema.alias + "::key" , null, DataType.CHARARRAY, fieldSchema.uid); + + expSchema.addField(fsForKey); } else { // DataType.TUPLE innerFieldSchemas = fieldSchema.schema.getFields(); for (LogicalSchema.LogicalFieldSchema fs : innerFieldSchemas) { fs.alias = fs.alias == null ? null : fieldSchema.alias + "::" + fs.alias; } } - + for (LogicalSchema.LogicalFieldSchema fs : innerFieldSchemas) expSchema.addField(fs); } @@ -137,6 +147,7 @@ public class LOGenerate extends LogicalR if (expSchema!=null && expSchema.size()==0) expSchema = null; LogicalSchema planSchema = new LogicalSchema(); + expSchemas.add(expSchema); if (mUserDefinedSchemaCopy!=null) { LogicalSchema mergedSchema = new LogicalSchema(); // merge with userDefinedSchema @@ -146,12 +157,6 @@ public class LOGenerate extends LogicalR fs.stampFieldSchema(); mergedSchema.addField(new LogicalFieldSchema(fs)); } - for (LogicalFieldSchema fs : mergedSchema.getFields()) { - if (fs.type == DataType.NULL){ - //this is the use case where a new alias has been specified by user - fs.type = DataType.BYTEARRAY; - } - } } else { // Merge uid with the exp field schema @@ -163,8 +168,12 @@ public class LOGenerate extends LogicalR mergedSchema.mergeUid(expSchema); } - for (LogicalFieldSchema fs : mergedSchema.getFields()) + for (LogicalFieldSchema fs : mergedSchema.getFields()) { + if (fs.type==DataType.NULL) { + fs.type = DataType.BYTEARRAY; + } planSchema.addField(fs); + } } else { // if any plan do not have schema, the whole LOGenerate do not have schema if (expSchema==null) { @@ -310,4 +319,8 @@ public class LOGenerate extends LogicalR super.resetSchema(); outputPlanSchemas = null; } + + public List<LogicalSchema> getExpSchemas() { + return expSchemas; + } } Modified: pig/branches/spark/src/org/apache/pig/newplan/logical/relational/LOJoin.java URL: http://svn.apache.org/viewvc/pig/branches/spark/src/org/apache/pig/newplan/logical/relational/LOJoin.java?rev=1783988&r1=1783987&r2=1783988&view=diff ============================================================================== --- pig/branches/spark/src/org/apache/pig/newplan/logical/relational/LOJoin.java (original) +++ pig/branches/spark/src/org/apache/pig/newplan/logical/relational/LOJoin.java Wed Feb 22 09:43:41 2017 @@ -38,6 +38,7 @@ public class LOJoin extends LogicalRelat */ public static enum JOINTYPE { HASH, // Hash Join + BLOOM, // Bloom Join REPLICATED, // Fragment Replicated join SKEWED, // Skewed Join MERGE, // Sort Merge Join Modified: pig/branches/spark/src/org/apache/pig/newplan/logical/relational/LogToPhyTranslationVisitor.java URL: http://svn.apache.org/viewvc/pig/branches/spark/src/org/apache/pig/newplan/logical/relational/LogToPhyTranslationVisitor.java?rev=1783988&r1=1783987&r2=1783988&view=diff ============================================================================== --- pig/branches/spark/src/org/apache/pig/newplan/logical/relational/LogToPhyTranslationVisitor.java (original) +++ pig/branches/spark/src/org/apache/pig/newplan/logical/relational/LogToPhyTranslationVisitor.java Wed Feb 22 09:43:41 2017 @@ -1414,7 +1414,7 @@ public class LogToPhyTranslationVisitor return; } - else if (loj.getJoinType() == LOJoin.JOINTYPE.HASH){ + else if (loj.getJoinType() == LOJoin.JOINTYPE.HASH || loj.getJoinType() == LOJoin.JOINTYPE.BLOOM){ POPackage poPackage = compileToLR_GR_PackTrio(loj, loj.getCustomPartitioner(), innerFlags, loj.getExpressionPlans()); POForEach fe = compileFE4Flattening(innerFlags, scope, parallel, alias, location, inputs); currentPlan.add(fe); @@ -1425,7 +1425,20 @@ public class LogToPhyTranslationVisitor e.getErrorCode(),e.getErrorSource(),e); } logToPhyMap.put(loj, fe); - poPackage.getPkgr().setPackageType(PackageType.JOIN); + if (loj.getJoinType() == LOJoin.JOINTYPE.BLOOM) { + if (innerFlags.length == 2) { + if (innerFlags[0] == false && innerFlags[1] == false) { + throw new LogicalToPhysicalTranslatorException( + "Error at " + loj.getLocation() + " with alias "+ loj.getAlias() + + ". Bloom join cannot be used with a FULL OUTER join.", + 1109, + PigException.INPUT); + } + } + poPackage.getPkgr().setPackageType(PackageType.BLOOMJOIN); + } else { + poPackage.getPkgr().setPackageType(PackageType.JOIN); + } } translateSoftLinks(loj); } Modified: pig/branches/spark/src/org/apache/pig/newplan/logical/relational/LogicalPlan.java URL: http://svn.apache.org/viewvc/pig/branches/spark/src/org/apache/pig/newplan/logical/relational/LogicalPlan.java?rev=1783988&r1=1783987&r2=1783988&view=diff ============================================================================== --- pig/branches/spark/src/org/apache/pig/newplan/logical/relational/LogicalPlan.java (original) +++ pig/branches/spark/src/org/apache/pig/newplan/logical/relational/LogicalPlan.java Wed Feb 22 09:43:41 2017 @@ -48,6 +48,7 @@ import org.apache.pig.newplan.logical.vi import org.apache.pig.newplan.logical.visitor.ColumnAliasConversionVisitor; import org.apache.pig.newplan.logical.visitor.DanglingNestedNodeRemover; import org.apache.pig.newplan.logical.visitor.DuplicateForEachColumnRewriteVisitor; +import org.apache.pig.newplan.logical.visitor.ForEachUserSchemaVisitor; import org.apache.pig.newplan.logical.visitor.ImplicitSplitInsertVisitor; import org.apache.pig.newplan.logical.visitor.InputOutputFileValidatorVisitor; import org.apache.pig.newplan.logical.visitor.ScalarVariableValidator; @@ -175,6 +176,7 @@ public class LogicalPlan extends BaseOpe new ColumnAliasConversionVisitor(this).visit(); new SchemaAliasVisitor(this).visit(); new ScalarVisitor(this, pigContext, scope).visit(); + new ForEachUserSchemaVisitor(this).visit(); // ImplicitSplitInsertVisitor has to be called before // DuplicateForEachColumnRewriteVisitor. Detail at pig-1766 @@ -189,6 +191,15 @@ public class LogicalPlan extends BaseOpe new TypeCheckingRelVisitor( this, collector).visit(); + + new UnionOnSchemaSetter(this).visit(); + new CastLineageSetter(this, collector).visit(); + new ScalarVariableValidator(this).visit(); + new StoreAliasSetter(this).visit(); + + // compute whether output data is sorted or not + new SortInfoSetter(this).visit(); + boolean aggregateWarning = "true".equalsIgnoreCase(pigContext.getProperties().getProperty("aggregate.warning")); if(aggregateWarning) { @@ -199,14 +210,6 @@ public class LogicalPlan extends BaseOpe } } - new UnionOnSchemaSetter(this).visit(); - new CastLineageSetter(this, collector).visit(); - new ScalarVariableValidator(this).visit(); - new StoreAliasSetter(this).visit(); - - // compute whether output data is sorted or not - new SortInfoSetter(this).visit(); - if (!(skipInputOutputValidation || pigContext.inExplain || pigContext.inDumpSchema)) { // Validate input/output file new InputOutputFileValidatorVisitor(this, pigContext).visit(); Modified: pig/branches/spark/src/org/apache/pig/newplan/logical/relational/LogicalSchema.java URL: http://svn.apache.org/viewvc/pig/branches/spark/src/org/apache/pig/newplan/logical/relational/LogicalSchema.java?rev=1783988&r1=1783987&r2=1783988&view=diff ============================================================================== --- pig/branches/spark/src/org/apache/pig/newplan/logical/relational/LogicalSchema.java (original) +++ pig/branches/spark/src/org/apache/pig/newplan/logical/relational/LogicalSchema.java Wed Feb 22 09:43:41 2017 @@ -150,6 +150,39 @@ public class LogicalSchema { } return true; } + + // Check if fs1 is equal to fs2 with regard to type + public static boolean typeMatch(LogicalFieldSchema fs1, LogicalFieldSchema fs2) { + if (fs1==null && fs2==null) { + return true; + } + if (fs1==null || fs2==null) { + return false; + } + if (fs1.type!=fs2.type) { + return false; + } + if (DataType.isComplex(fs1.type)) { + LogicalSchema s1 = fs1.schema; + LogicalSchema s2 = fs2.schema; + if (s1==null && s2==null) { + return true; + } + if (fs1==null || fs2==null) { + return false; + } + if (s1.size()!=s2.size()) { + return false; + } + for (int i=0;i<s1.size();i++) { + if (!typeMatch(s1.getField(i), s2.getField(i))) { + return false; + } + } + } + return true; + } + /** * Adds the uid from FieldSchema argument to this FieldSchema * If the argument is null, it stamps this FieldSchema with uid @@ -447,7 +480,23 @@ public class LogicalSchema { LogicalFieldSchema mergedFS = new LogicalFieldSchema(mergedAlias, mergedSubSchema, mergedType); return mergedFS; } - + + public static boolean isEqualUnlessUnknown(LogicalFieldSchema fs1, LogicalFieldSchema fs2) throws FrontendException { + if (fs1.type == DataType.BYTEARRAY) { + return true; + } else if (fs2.type == DataType.BYTEARRAY) { + return true; + } else if (fs1.type == fs2.type) { + if (DataType.isComplex(fs1.type)) { + return LogicalSchema.isEqualUnlessUnknown(fs1.schema, fs2.schema); + } else { + return true; + } + } else { + return false; + } + } + /*** * Old Pig field schema does not require a tuple schema inside a bag; * Now it is required to have that; this method is to fill the gap @@ -770,7 +819,24 @@ public class LogicalSchema { } return mergedSchema; } - + + public static boolean isEqualUnlessUnknown(LogicalSchema s1, LogicalSchema s2) throws FrontendException { + if (s1 == null) { + return true; + } else if (s2 == null) { + return true; + } else if (s1.size() != s2.size()) { + return false; + } else { + for (int i=0;i<s1.size();i++) { + if (!LogicalFieldSchema.isEqualUnlessUnknown(s1.getField(i), s1.getField(i))) { + return false; + } + } + return true; + } + } + public String toString(boolean verbose) { StringBuilder str = new StringBuilder(); Modified: pig/branches/spark/src/org/apache/pig/newplan/logical/rules/AddForEach.java URL: http://svn.apache.org/viewvc/pig/branches/spark/src/org/apache/pig/newplan/logical/rules/AddForEach.java?rev=1783988&r1=1783987&r2=1783988&view=diff ============================================================================== --- pig/branches/spark/src/org/apache/pig/newplan/logical/rules/AddForEach.java (original) +++ pig/branches/spark/src/org/apache/pig/newplan/logical/rules/AddForEach.java Wed Feb 22 09:43:41 2017 @@ -95,7 +95,7 @@ public class AddForEach extends WholePla } Set<Long> outputUids = (Set<Long>)op.getAnnotation(ColumnPruneHelper.OUTPUTUIDS); - if (outputUids==null) + if (outputUids==null || outputUids.size() == 0 ) return false; LogicalSchema schema = op.getSchema(); Modified: pig/branches/spark/src/org/apache/pig/newplan/logical/visitor/CastLineageSetter.java URL: http://svn.apache.org/viewvc/pig/branches/spark/src/org/apache/pig/newplan/logical/visitor/CastLineageSetter.java?rev=1783988&r1=1783987&r2=1783988&view=diff ============================================================================== --- pig/branches/spark/src/org/apache/pig/newplan/logical/visitor/CastLineageSetter.java (original) +++ pig/branches/spark/src/org/apache/pig/newplan/logical/visitor/CastLineageSetter.java Wed Feb 22 09:43:41 2017 @@ -107,7 +107,7 @@ public class CastLineageSetter extends A if(inLoadFunc == null){ String msg = "Cannot resolve load function to use for casting from " + DataType.findTypeName(inType) + " to " + - DataType.findTypeName(outType) + ". "; + DataType.findTypeName(outType) + " at " + cast.getLocation() ; msgCollector.collect(msg, MessageType.Warning, PigWarning.NO_LOAD_FUNCTION_FOR_CASTING_BYTEARRAY); }else { Added: pig/branches/spark/src/org/apache/pig/newplan/logical/visitor/ForEachUserSchemaVisitor.java URL: http://svn.apache.org/viewvc/pig/branches/spark/src/org/apache/pig/newplan/logical/visitor/ForEachUserSchemaVisitor.java?rev=1783988&view=auto ============================================================================== --- pig/branches/spark/src/org/apache/pig/newplan/logical/visitor/ForEachUserSchemaVisitor.java (added) +++ pig/branches/spark/src/org/apache/pig/newplan/logical/visitor/ForEachUserSchemaVisitor.java Wed Feb 22 09:43:41 2017 @@ -0,0 +1,253 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.pig.newplan.logical.visitor; + +import java.util.ArrayList; +import java.util.List; + +import org.apache.pig.data.DataType; +import org.apache.pig.impl.logicalLayer.FrontendException; +import org.apache.pig.newplan.DependencyOrderWalker; +import org.apache.pig.newplan.Operator; +import org.apache.pig.newplan.OperatorPlan; +import org.apache.pig.newplan.logical.expression.CastExpression; +import org.apache.pig.newplan.logical.expression.LogicalExpressionPlan; +import org.apache.pig.newplan.logical.expression.ProjectExpression; +import org.apache.pig.newplan.logical.relational.LOForEach; +import org.apache.pig.newplan.logical.relational.LOGenerate; +import org.apache.pig.newplan.logical.relational.LOInnerLoad; +import org.apache.pig.newplan.logical.relational.LogicalPlan; +import org.apache.pig.newplan.logical.relational.LogicalRelationalNodesVisitor; +import org.apache.pig.newplan.logical.relational.LogicalSchema; +import org.apache.pig.newplan.logical.relational.LogicalSchema.LogicalFieldSchema; + +public class ForEachUserSchemaVisitor extends LogicalRelationalNodesVisitor { + public ForEachUserSchemaVisitor(OperatorPlan plan) throws FrontendException { + super(plan, new DependencyOrderWalker(plan)); + } + + private static LogicalSchema replaceNullByteArraySchema( + LogicalSchema originalSchema, + LogicalSchema userSchema) throws FrontendException { + if( originalSchema == null && userSchema == null ) { + return null; + } else if ( originalSchema == null ) { + return userSchema.deepCopy(); + } else if ( userSchema == null ) { + return originalSchema.deepCopy(); + } + + LogicalSchema replacedSchema = new LogicalSchema(); + for (int i=0;i<originalSchema.size();i++) { + LogicalFieldSchema replacedFS = replaceNullByteArrayFieldSchema(originalSchema.getField(i), userSchema.getField(i)); + replacedSchema.addField(replacedFS); + } + return replacedSchema; + } + + private static LogicalFieldSchema replaceNullByteArrayFieldSchema( + LogicalFieldSchema originalFS, + LogicalFieldSchema userFS) throws FrontendException { + if( originalFS == null && userFS == null ) { + return null; + } else if ( originalFS == null ) { + return userFS.deepCopy(); + } else if ( userFS == null ) { + return originalFS.deepCopy(); + } + if ( originalFS.type==DataType.NULL + || originalFS.type==DataType.BYTEARRAY ) { + return userFS.deepCopy(); + } else if ( userFS.type==DataType.NULL + || userFS.type==DataType.BYTEARRAY ) { + // Use originalFS schema but keep the alias from userFS + return new LogicalFieldSchema(userFS.alias, originalFS.schema, originalFS.type); + } + + if ( !DataType.isSchemaType(originalFS.type) ) { + return userFS.deepCopy(); + } else { + LogicalSchema replacedSchema = replaceNullByteArraySchema(originalFS.schema, userFS.schema); + return new LogicalFieldSchema(userFS.alias, replacedSchema, userFS.type); + } + } + + private static boolean hasOnlyNullOrByteArraySchema (LogicalFieldSchema fs) { + if( DataType.isSchemaType(fs.type) ) { + if( fs.schema != null ) { + for (LogicalFieldSchema sub_fs : fs.schema.getFields() ) { + if( !hasOnlyNullOrByteArraySchema(sub_fs) ) { + return false; + } + } + } + } else if( fs.type != DataType.NULL && fs.type != DataType.BYTEARRAY ) { + return false; + } + return true; + } + + @Override + public void visit(LOForEach foreach) throws FrontendException { + LOGenerate generate = (LOGenerate)foreach.getInnerPlan().getSinks().get(0); + List<LogicalSchema> mExpSchemas = generate.getExpSchemas(); + List<LogicalSchema> mUserDefinedSchemas = generate.getUserDefinedSchema(); + + // Skip if no way to figure out schema (usually both expression schema and + // user defined schema are null) + if (foreach.getSchema()==null) { + return; + } + + if (mUserDefinedSchemas==null) { + return; + } + + boolean hasUserDefinedSchema = false; + for (LogicalSchema mUserDefinedSchema : mUserDefinedSchemas) { + if (mUserDefinedSchema!=null) { + hasUserDefinedSchema = true; + break; + } + } + + if (!hasUserDefinedSchema) { + return; + } + + if (mExpSchemas.size()!=mUserDefinedSchemas.size()) { + throw new FrontendException("Size mismatch: Get " + mExpSchemas.size() + + " mExpSchemas, but " + mUserDefinedSchemas.size() + " mUserDefinedSchemas", + 0, generate.getLocation()); + } + + LogicalPlan innerPlan = new LogicalPlan(); + LOForEach casterForEach = new LOForEach(plan); + casterForEach.setInnerPlan(innerPlan); + casterForEach.setAlias(foreach.getAlias()); + + List<LogicalExpressionPlan> exps = new ArrayList<LogicalExpressionPlan>(); + LOGenerate gen = new LOGenerate(innerPlan, exps, null); + innerPlan.add(gen); + + int index = 0; + boolean needCast = false; + for(int i=0;i<mExpSchemas.size();i++) { + LogicalSchema mExpSchema = mExpSchemas.get(i); + LogicalSchema mUserDefinedSchema = mUserDefinedSchemas.get(i); + + // Use user defined schema to cast, this is the prevailing use case + if (mExpSchema==null) { + for (LogicalFieldSchema fs : mUserDefinedSchema.getFields()) { + if (hasOnlyNullOrByteArraySchema(fs)) { + addToExps(casterForEach, innerPlan, gen, exps, index, false, null); + } else { + addToExps(casterForEach, innerPlan, gen, exps, index, true, fs); + needCast = true; + } + index++; + } + continue; + } + + // No user defined schema, no need to cast + if (mUserDefinedSchema==null) { + for (int j=0;j<mExpSchema.size();j++) { + addToExps(casterForEach, innerPlan, gen, exps, index, false, null); + index++; + } + continue; + } + + // Expression has schema, but user also define schema, need cast only + // when there is a mismatch + if (mExpSchema.size()!=mUserDefinedSchema.size()) { + throw new FrontendException("Size mismatch: Cannot cast " + mExpSchema.size() + + " fields to " + mUserDefinedSchema.size(), 0, foreach.getLocation()); + } + + LogicalSchema replacedSchema = replaceNullByteArraySchema(mExpSchema,mUserDefinedSchema); + for (int j=0;j<mExpSchema.size();j++) { + LogicalFieldSchema mExpFieldSchema = mExpSchema.getField(j); + LogicalFieldSchema mUserDefinedFieldSchema = replacedSchema.getField(j); + + if (hasOnlyNullOrByteArraySchema(mUserDefinedFieldSchema) || + LogicalFieldSchema.typeMatch(mExpFieldSchema, mUserDefinedFieldSchema)) { + addToExps(casterForEach, innerPlan, gen, exps, index, false, null); + } else { + addToExps(casterForEach, innerPlan, gen, exps, index, true, mUserDefinedFieldSchema); + needCast = true; + } + index++; + } + } + + gen.setFlattenFlags(new boolean[index]); + if (needCast) { + // Insert the casterForEach into the plan and patch up the plan. + List <Operator> successorOps = plan.getSuccessors(foreach); + if (successorOps != null && successorOps.size() > 0){ + Operator next = plan.getSuccessors(foreach).get(0); + plan.insertBetween(foreach, casterForEach, next); + }else{ + plan.add(casterForEach); + plan.connect(foreach,casterForEach); + } + + // Since the explict cast is now inserted after the original foreach, + // throwing away the user defined "types" but keeping the user + // defined names from the original foreach. + // 'generate' (LOGenerate) still holds the reference to this + // mUserDefinedSchemas + for( LogicalSchema mUserDefinedSchema : mUserDefinedSchemas ) { + resetTypeToNull( mUserDefinedSchema ); + } + } + } + + private void resetTypeToNull (LogicalSchema s1) { + if( s1 != null ) { + for (LogicalFieldSchema fs : s1.getFields()) { + if( DataType.isSchemaType(fs.type) ) { + resetTypeToNull(fs.schema); + } else { + fs.type = DataType.NULL; + } + } + } + } + + private void addToExps(LOForEach casterForEach, LogicalPlan innerPlan, LOGenerate gen, + List<LogicalExpressionPlan> exps, int index, boolean needCaster, LogicalFieldSchema fs) { + + LOInnerLoad innerLoad = new LOInnerLoad(innerPlan, casterForEach, index); + innerPlan.add(innerLoad); + innerPlan.connect(innerLoad, gen); + + LogicalExpressionPlan exp = new LogicalExpressionPlan(); + + ProjectExpression prj = new ProjectExpression(exp, index, 0, gen); + exp.add(prj); + + if (needCaster) { + CastExpression cast = new CastExpression(exp, prj, new LogicalSchema.LogicalFieldSchema(fs)); + exp.add(cast); + } + exps.add(exp); + } +} Modified: pig/branches/spark/src/org/apache/pig/newplan/logical/visitor/LineageFindRelVisitor.java URL: http://svn.apache.org/viewvc/pig/branches/spark/src/org/apache/pig/newplan/logical/visitor/LineageFindRelVisitor.java?rev=1783988&r1=1783987&r2=1783988&view=diff ============================================================================== --- pig/branches/spark/src/org/apache/pig/newplan/logical/visitor/LineageFindRelVisitor.java (original) +++ pig/branches/spark/src/org/apache/pig/newplan/logical/visitor/LineageFindRelVisitor.java Wed Feb 22 09:43:41 2017 @@ -24,6 +24,7 @@ import java.util.List; import java.util.Map; import java.util.Set; +import org.apache.pig.EvalFunc; import org.apache.pig.FuncSpec; import org.apache.pig.LoadCaster; import org.apache.pig.LoadFunc; @@ -729,6 +730,44 @@ public class LineageFindRelVisitor exten } } + @Override + public void visit(UserFuncExpression op) throws FrontendException { + + if( op.getFieldSchema() == null ) { + return; + } + + FuncSpec funcSpec = null; + Class loader = instantiateCaster(op.getFuncSpec()); + List<LogicalExpression> arguments = op.getArguments(); + if ( loader != null ) { + // if evalFunc.getLoadCaster() returns, simply use that. + funcSpec = op.getFuncSpec(); + } else if (arguments.size() != 0 ) { + FuncSpec baseFuncSpec = null; + LogicalFieldSchema fs = arguments.get(0).getFieldSchema(); + if ( fs != null ) { + baseFuncSpec = uid2LoadFuncMap.get(fs.uid); + if( baseFuncSpec != null ) { + funcSpec = baseFuncSpec; + for(int i = 1; i < arguments.size(); i++) { + fs = arguments.get(i).getFieldSchema(); + if( fs == null || !haveIdenticalCasters(baseFuncSpec, uid2LoadFuncMap.get(fs.uid)) ) { + funcSpec = null; + break; + } + } + } + } + } + + if( funcSpec != null ) { + addUidLoadFuncToMap(op.getFieldSchema().uid, funcSpec); + // in case schema is nested, set funcSpec for all + setLoadFuncForUids(op.getFieldSchema().schema, funcSpec); + } + } + /** * if there is a null constant under casts, return it * @param rel @@ -770,6 +809,8 @@ public class LineageFindRelVisitor exten caster = ((LoadFunc)obj).getLoadCaster(); } else if (obj instanceof StreamToPig) { caster = ((StreamToPig)obj).getLoadCaster(); + } else if (obj instanceof EvalFunc) { + caster = ((EvalFunc)obj).getLoadCaster(); } else { throw new VisitorException("Invalid class type " + funcSpec.getClassName(), 2270, PigException.BUG ); Modified: pig/branches/spark/src/org/apache/pig/newplan/logical/visitor/TypeCheckingExpVisitor.java URL: http://svn.apache.org/viewvc/pig/branches/spark/src/org/apache/pig/newplan/logical/visitor/TypeCheckingExpVisitor.java?rev=1783988&r1=1783987&r2=1783988&view=diff ============================================================================== --- pig/branches/spark/src/org/apache/pig/newplan/logical/visitor/TypeCheckingExpVisitor.java (original) +++ pig/branches/spark/src/org/apache/pig/newplan/logical/visitor/TypeCheckingExpVisitor.java Wed Feb 22 09:43:41 2017 @@ -458,6 +458,7 @@ public class TypeCheckingExpVisitor exte collectCastWarning(node, arg.getType(), toFs.type, msgCollector); CastExpression cast = new CastExpression(plan, arg, toFs); + cast.setLocation(node.getLocation()); try { // disconnect cast and arg because the connection is already // added by cast constructor and insertBetween call is going @@ -490,7 +491,7 @@ public class TypeCheckingExpVisitor exte byte outType = cast.getType(); if(outType == DataType.BYTEARRAY && inType != outType) { int errCode = 1051; - String msg = "Cannot cast to bytearray"; + String msg = "Cannot cast from " + DataType.findTypeName(inType) + " to bytearray"; msgCollector.collect(msg, MessageType.Error) ; throw new TypeCheckerException(cast, msg, errCode, PigException.INPUT) ; } @@ -607,7 +608,7 @@ public class TypeCheckingExpVisitor exte // Matching schemas if we're working with tuples/bags if (DataType.isSchemaType(lhsType)) { try { - if(! binCond.getLhs().getFieldSchema().isEqual(binCond.getRhs().getFieldSchema())){ + if(!LogicalFieldSchema.isEqualUnlessUnknown(binCond.getLhs().getFieldSchema(), binCond.getRhs().getFieldSchema())){ int errCode = 1048; String msg = "Two inputs of BinCond must have compatible schemas." + " left hand side: " + binCond.getLhs().getFieldSchema() Modified: pig/branches/spark/src/org/apache/pig/newplan/logical/visitor/TypeCheckingRelVisitor.java URL: http://svn.apache.org/viewvc/pig/branches/spark/src/org/apache/pig/newplan/logical/visitor/TypeCheckingRelVisitor.java?rev=1783988&r1=1783987&r2=1783988&view=diff ============================================================================== --- pig/branches/spark/src/org/apache/pig/newplan/logical/visitor/TypeCheckingRelVisitor.java (original) +++ pig/branches/spark/src/org/apache/pig/newplan/logical/visitor/TypeCheckingRelVisitor.java Wed Feb 22 09:43:41 2017 @@ -351,7 +351,8 @@ public class TypeCheckingRelVisitor exte if (outFieldSchema.type != fs.type) { castNeededCounter++ ; - new CastExpression(genPlan, project, outFieldSchema); + CastExpression castexp = new CastExpression(genPlan, project, outFieldSchema); + castexp.setLocation(toOp.getLocation()); } generatePlans.add(genPlan) ; Modified: pig/branches/spark/src/org/apache/pig/newplan/logical/visitor/UnionOnSchemaSetter.java URL: http://svn.apache.org/viewvc/pig/branches/spark/src/org/apache/pig/newplan/logical/visitor/UnionOnSchemaSetter.java?rev=1783988&r1=1783987&r2=1783988&view=diff ============================================================================== --- pig/branches/spark/src/org/apache/pig/newplan/logical/visitor/UnionOnSchemaSetter.java (original) +++ pig/branches/spark/src/org/apache/pig/newplan/logical/visitor/UnionOnSchemaSetter.java Wed Feb 22 09:43:41 2017 @@ -21,6 +21,7 @@ package org.apache.pig.newplan.logical.v import java.util.ArrayList; import java.util.List; +import org.apache.pig.PigException; import org.apache.pig.data.DataType; import org.apache.pig.impl.logicalLayer.FrontendException; import org.apache.pig.impl.util.Pair; @@ -110,9 +111,20 @@ public class UnionOnSchemaSetter extends } else { ProjectExpression projExpr = new ProjectExpression( exprPlan, genInputs.size(), 0, gen ); - if( fs.type != DataType.BYTEARRAY - && opSchema.getField( pos ).type != fs.type ) { - new CastExpression( exprPlan, projExpr, fs ); + if( opSchema.getField( pos ).type != fs.type ) { + if( fs.type != DataType.BYTEARRAY ) { + CastExpression castexpr = new CastExpression( exprPlan, projExpr, fs ); + castexpr.setLocation(union.getLocation()); + } else { + int errCode = 1056; + String msg = "Union of incompatible types not allowed. " + + "Cannot cast from " + + DataType.findTypeName(opSchema.getField( pos ).type) + + " to bytearray for '" + + opSchema.getField( pos ).alias + + "'. Please typecast to compatible types before union." ; + throw new FrontendException(union, msg, errCode, PigException.INPUT) ; + } } genInputs.add( new LOInnerLoad( innerPlan, foreach, pos ) ); } Modified: pig/branches/spark/src/org/apache/pig/parser/LogicalPlanBuilder.java URL: http://svn.apache.org/viewvc/pig/branches/spark/src/org/apache/pig/parser/LogicalPlanBuilder.java?rev=1783988&r1=1783987&r2=1783988&view=diff ============================================================================== --- pig/branches/spark/src/org/apache/pig/parser/LogicalPlanBuilder.java (original) +++ pig/branches/spark/src/org/apache/pig/parser/LogicalPlanBuilder.java Wed Feb 22 09:43:41 2017 @@ -34,6 +34,7 @@ import org.antlr.runtime.RecognitionExce import org.apache.pig.ExecType; import org.apache.pig.FuncSpec; import org.apache.pig.LoadFunc; +import org.apache.pig.NonFSLoadFunc; import org.apache.pig.PigConfiguration; import org.apache.pig.StoreFuncInterface; import org.apache.pig.backend.executionengine.ExecException; @@ -888,7 +889,7 @@ public class LogicalPlanBuilder { if (absolutePath == null) { absolutePath = loFunc.relativeToAbsolutePath( filename, QueryParserUtils.getCurrentDir( pigContext ) ); - if (absolutePath!=null) { + if (absolutePath!=null && !(loFunc instanceof NonFSLoadFunc)) { QueryParserUtils.setHdfsServers( absolutePath, pigContext ); } fileNameMap.put( fileNameKey, absolutePath ); @@ -1357,13 +1358,19 @@ public class LogicalPlanBuilder { return Long.parseLong( num ); } + /** + * Parse big integer formatted string (e.g. "1234567890123BI") into BigInteger object + */ static BigInteger parseBigInteger(String s) { - String num = s.substring( 0, s.length() - 1 ); + String num = s.substring( 0, s.length() - 2 ); return new BigInteger( num ); } + /** + * Parse big decimal formatted string (e.g. "123456.7890123BD") into BigDecimal object + */ static BigDecimal parseBigDecimal(String s) { - String num = s.substring( 0, s.length() - 1 ); + String num = s.substring( 0, s.length() - 2 ); return new BigDecimal( num ); } @@ -1781,6 +1788,8 @@ public class LogicalPlanBuilder { return JOINTYPE.REPLICATED; } else if( modifier.equalsIgnoreCase( "hash" ) || modifier.equalsIgnoreCase( "default" ) ) { return LOJoin.JOINTYPE.HASH; + } else if( modifier.equalsIgnoreCase( "bloom" ) ) { + return LOJoin.JOINTYPE.BLOOM; } else if( modifier.equalsIgnoreCase( "skewed" ) ) { return JOINTYPE.SKEWED; } else if (modifier.equalsIgnoreCase("merge")) { @@ -1789,7 +1798,7 @@ public class LogicalPlanBuilder { return JOINTYPE.MERGESPARSE; } else { throw new ParserValidationException( intStream, loc, - "Only REPL, REPLICATED, HASH, SKEWED, MERGE, and MERGE-SPARSE are vaild JOIN modifiers." ); + "Only REPL, REPLICATED, HASH, BLOOM, SKEWED, MERGE, and MERGE-SPARSE are vaild JOIN modifiers." ); } } Modified: pig/branches/spark/src/org/apache/pig/parser/PigMacro.java URL: http://svn.apache.org/viewvc/pig/branches/spark/src/org/apache/pig/parser/PigMacro.java?rev=1783988&r1=1783987&r2=1783988&view=diff ============================================================================== --- pig/branches/spark/src/org/apache/pig/parser/PigMacro.java (original) +++ pig/branches/spark/src/org/apache/pig/parser/PigMacro.java Wed Feb 22 09:43:41 2017 @@ -168,14 +168,9 @@ class PigMacro { Map<String, String> paramVal = pc.getParamVal(); for (Map.Entry<String, String> e : pigContext.getParamVal().entrySet()) { - if (paramVal.containsKey(e.getKey())) { - throw new ParserException( - "Macro contains argument or return value " + e.getKey() + " which conflicts " + - "with a Pig parameter of the same name." - ); - } else { - paramVal.put(e.getKey(), e.getValue()); - } + // overwrite=false since macro parameters should have precedence + // over commandline parameters (if keys overlap) + pc.processOrdLine(e.getKey(), e.getValue(), false); } ParameterSubstitutionPreprocessor psp = new ParameterSubstitutionPreprocessor(pc); @@ -219,6 +214,7 @@ class PigMacro { try { result = parser.query(); } catch (RecognitionException e) { + e.line += startLine -1; String msg = (fileName == null) ? parser.getErrorHeader(e) : QueryParserUtils.generateErrorHeader(e, fileName); msg += " " + parser.getErrorMessage(e, parser.getTokenNames()); @@ -236,7 +232,7 @@ class PigMacro { if (!macroDefNodes.isEmpty()) { String fname = ((PigParserNode)ast).getFileName(); String msg = getErrorMessage(fname, ast.getLine(), - "Invalide macro definition", "macro '" + name + "Invalid macro definition", "macro '" + name + "' contains macro definition.\nmacro content: " + body); throw new ParserException(msg); @@ -273,6 +269,7 @@ class PigMacro { try { result2 = walker.query(); } catch (RecognitionException e) { + e.line += startLine - 1; String msg = walker.getErrorHeader(e) + " " + walker.getErrorMessage(e, walker.getTokenNames()); String msg2 = getErrorMessage(file, line, "Failed to mask macro '" Modified: pig/branches/spark/src/org/apache/pig/parser/QueryParser.g URL: http://svn.apache.org/viewvc/pig/branches/spark/src/org/apache/pig/parser/QueryParser.g?rev=1783988&r1=1783987&r2=1783988&view=diff ============================================================================== --- pig/branches/spark/src/org/apache/pig/parser/QueryParser.g (original) +++ pig/branches/spark/src/org/apache/pig/parser/QueryParser.g Wed Feb 22 09:43:41 2017 @@ -889,6 +889,8 @@ scalar : INTEGER | LONGINTEGER | FLOATNUMBER | DOUBLENUMBER + | BIGINTEGERNUMBER + | BIGDECIMALNUMBER | QUOTEDSTRING | NULL | TRUE Modified: pig/branches/spark/src/org/apache/pig/parser/RegisterResolver.java URL: http://svn.apache.org/viewvc/pig/branches/spark/src/org/apache/pig/parser/RegisterResolver.java?rev=1783988&r1=1783987&r2=1783988&view=diff ============================================================================== --- pig/branches/spark/src/org/apache/pig/parser/RegisterResolver.java (original) +++ pig/branches/spark/src/org/apache/pig/parser/RegisterResolver.java Wed Feb 22 09:43:41 2017 @@ -23,6 +23,10 @@ import java.net.URISyntaxException; import org.apache.pig.PigServer; import org.apache.pig.tools.DownloadResolver; +import org.apache.hadoop.conf.Configuration; +import org.apache.pig.backend.hadoop.datastorage.ConfigurationUtil; +import org.apache.pig.backend.hadoop.executionengine.shims.HadoopShims; +import org.apache.hadoop.fs.Path; public class RegisterResolver { @@ -66,15 +70,24 @@ public class RegisterResolver { String scheme = uri.getScheme(); if (scheme != null) { scheme = scheme.toLowerCase(); + if (scheme.equals("ivy")) { + DownloadResolver downloadResolver = DownloadResolver.getInstance(); + return downloadResolver.downloadArtifact(uri, pigServer); + } + if (!hasFileSystemImpl(uri)) { + throw new ParserException("Invalid Scheme: " + uri.getScheme()); + } } - if (scheme == null || scheme.equals("file") || scheme.equals("hdfs")) { - return new URI[] { uri }; - } else if (scheme.equals("ivy")) { - DownloadResolver downloadResolver = DownloadResolver.getInstance(); - return downloadResolver.downloadArtifact(uri, pigServer); - } else { - throw new ParserException("Invalid Scheme: " + uri.getScheme()); - } + return new URI[] { uri }; + } + + /** + * @param uri + * @return True if the uri has valid file system implementation + */ + private boolean hasFileSystemImpl(URI uri) { + Configuration conf = ConfigurationUtil.toConfiguration(pigServer.getPigContext().getProperties(), true); + return HadoopShims.hasFileSystemImpl(new Path(uri), conf); } /** Modified: pig/branches/spark/src/org/apache/pig/parser/SourceLocation.java URL: http://svn.apache.org/viewvc/pig/branches/spark/src/org/apache/pig/parser/SourceLocation.java?rev=1783988&r1=1783987&r2=1783988&view=diff ============================================================================== --- pig/branches/spark/src/org/apache/pig/parser/SourceLocation.java (original) +++ pig/branches/spark/src/org/apache/pig/parser/SourceLocation.java Wed Feb 22 09:43:41 2017 @@ -75,12 +75,11 @@ public class SourceLocation { if (node != null) { InvocationPoint pt = node.getNextInvocationPoint(); while (pt != null) { - sb.append("\n"); sb.append("at expanding macro '" + pt.getMacro() + "' (" + pt.getFile() + ":" + pt.getLine() + ")"); pt = node.getNextInvocationPoint(); + sb.append("\n"); } - sb.append("\n"); } sb.append( "<" ); if( file != null && !file.isEmpty() ) Modified: pig/branches/spark/src/org/apache/pig/pen/LocalMapReduceSimulator.java URL: http://svn.apache.org/viewvc/pig/branches/spark/src/org/apache/pig/pen/LocalMapReduceSimulator.java?rev=1783988&r1=1783987&r2=1783988&view=diff ============================================================================== --- pig/branches/spark/src/org/apache/pig/pen/LocalMapReduceSimulator.java (original) +++ pig/branches/spark/src/org/apache/pig/pen/LocalMapReduceSimulator.java Wed Feb 22 09:43:41 2017 @@ -27,6 +27,7 @@ import java.util.Map; import org.apache.hadoop.conf.Configuration; import org.apache.hadoop.io.Text; import org.apache.hadoop.io.Writable; +import org.apache.hadoop.mapred.TaskID; import org.apache.hadoop.mapred.jobcontrol.Job; import org.apache.hadoop.mapred.jobcontrol.JobControl; import org.apache.hadoop.mapreduce.Mapper; @@ -35,6 +36,7 @@ import org.apache.pig.PigException; import org.apache.pig.backend.executionengine.ExecException; import org.apache.pig.backend.hadoop.datastorage.ConfigurationUtil; import org.apache.pig.backend.hadoop.executionengine.mapReduceLayer.JobControlCompiler; +import org.apache.pig.backend.hadoop.executionengine.mapReduceLayer.MRConfiguration; import org.apache.pig.backend.hadoop.executionengine.mapReduceLayer.MapReduceLauncher; import org.apache.pig.backend.hadoop.executionengine.mapReduceLayer.MapReduceOper; import org.apache.pig.backend.hadoop.executionengine.mapReduceLayer.PigMapBase; @@ -75,9 +77,9 @@ import org.apache.pig.pen.util.LineageTr * */ public class LocalMapReduceSimulator { - + private MapReduceLauncher launcher = new MapReduceLauncher(); - + private Map<PhysicalOperator, PhysicalOperator> phyToMRMap = new HashMap<PhysicalOperator, PhysicalOperator>();; @SuppressWarnings("unchecked") @@ -88,12 +90,12 @@ public class LocalMapReduceSimulator { PigContext pc) throws PigException, IOException, InterruptedException { phyToMRMap.clear(); MROperPlan mrp = launcher.compile(php, pc); - + ConfigurationValidator.validatePigProperties(pc.getProperties()); Configuration conf = ConfigurationUtil.toConfiguration(pc.getProperties()); - + JobControlCompiler jcc = new JobControlCompiler(pc, conf); - + JobControl jc; int numMRJobsCompl = 0; DataBag input; @@ -106,6 +108,8 @@ public class LocalMapReduceSimulator { boolean needFileInput; final ArrayList<OperatorKey> emptyInpTargets = new ArrayList<OperatorKey>(); pc.getProperties().setProperty("pig.illustrating", "true"); + String jtIdentifier = "" + System.currentTimeMillis(); + int jobId = 0; while(mrp.size() != 0) { jc = jcc.compile(mrp, "Illustrator"); if(jc == null) { @@ -113,6 +117,7 @@ public class LocalMapReduceSimulator { } List<Job> jobs = jc.getWaitingJobs(); for (Job job : jobs) { + jobId++; jobConf = job.getJobConf(); FileLocalizer.setInitialized(false); ArrayList<ArrayList<OperatorKey>> inpTargets = @@ -123,14 +128,14 @@ public class LocalMapReduceSimulator { PigSplit split = null; List<POStore> stores = null; PhysicalOperator pack = null; - // revisit as there are new physical operators from MR compilation + // revisit as there are new physical operators from MR compilation if (!mro.mapPlan.isEmpty()) attacher.revisit(mro.mapPlan); if (!mro.reducePlan.isEmpty()) { attacher.revisit(mro.reducePlan); pack = mro.reducePlan.getRoots().get(0); } - + List<POLoad> lds = PlanHelper.getPhysicalOperators(mro.mapPlan, POLoad.class); if (!mro.mapPlan.isEmpty()) { stores = PlanHelper.getPhysicalOperators(mro.mapPlan, POStore.class); @@ -145,10 +150,10 @@ public class LocalMapReduceSimulator { for (POStore store : stores) { output.put(store.getSFile().getFileName(), attacher.getDataMap().get(store)); } - + OutputAttacher oa = new OutputAttacher(mro.mapPlan, output); oa.visit(); - + if (!mro.reducePlan.isEmpty()) { oa = new OutputAttacher(mro.reducePlan, output); oa.visit(); @@ -168,6 +173,7 @@ public class LocalMapReduceSimulator { if (input != null) mro.mapPlan.remove(ld); } + int mapTaskId = 0; for (POLoad ld : lds) { // check newly generated data first input = output.get(ld.getLFile().getFileName()); @@ -180,7 +186,7 @@ public class LocalMapReduceSimulator { break; } } - } + } } needFileInput = (input == null); split = new PigSplit(null, index, needFileInput ? emptyInpTargets : inpTargets.get(index), 0); @@ -199,6 +205,7 @@ public class LocalMapReduceSimulator { context = ((PigMapReduceCounter.PigMapCounter) map).getIllustratorContext(jobConf, input, intermediateData, split); } ((PigMapBase) map).setMapPlan(mro.mapPlan); + context.getConfiguration().set(MRConfiguration.TASK_ID, new TaskID(jtIdentifier, jobId, true, mapTaskId++).toString()); map.run(context); } else { if ("true".equals(jobConf.get("pig.usercomparator"))) @@ -210,10 +217,11 @@ public class LocalMapReduceSimulator { Mapper<Text, Tuple, PigNullableWritable, Writable>.Context context = ((PigMapBase) map) .getIllustratorContext(jobConf, input, intermediateData, split); ((PigMapBase) map).setMapPlan(mro.mapPlan); + context.getConfiguration().set(MRConfiguration.TASK_ID, new TaskID(jtIdentifier, jobId, true, mapTaskId++).toString()); map.run(context); } } - + if (!mro.reducePlan.isEmpty()) { if (pack instanceof POPackage) @@ -233,19 +241,20 @@ public class LocalMapReduceSimulator { } ((PigMapReduce.Reduce) reduce).setReducePlan(mro.reducePlan); + context.getConfiguration().set(MRConfiguration.TASK_ID, new TaskID(jtIdentifier, jobId, false, 0).toString()); reduce.run(context); } for (PhysicalOperator key : mro.phyToMRMap.keySet()) for (PhysicalOperator value : mro.phyToMRMap.get(key)) phyToMRMap.put(key, value); } - - + + int removedMROp = jcc.updateMROpPlan(new LinkedList<Job>()); - + numMRJobsCompl += removedMROp; } - + jcc.reset(); } @@ -256,7 +265,7 @@ public class LocalMapReduceSimulator { plan)); this.outputBuffer = output; } - + @Override public void visitUserFunc(POUserFunc userFunc) throws VisitorException { if (userFunc.getFunc() != null && userFunc.getFunc() instanceof ReadScalars) { Modified: pig/branches/spark/src/org/apache/pig/scripting/ScriptEngine.java URL: http://svn.apache.org/viewvc/pig/branches/spark/src/org/apache/pig/scripting/ScriptEngine.java?rev=1783988&r1=1783987&r2=1783988&view=diff ============================================================================== --- pig/branches/spark/src/org/apache/pig/scripting/ScriptEngine.java (original) +++ pig/branches/spark/src/org/apache/pig/scripting/ScriptEngine.java Wed Feb 22 09:43:41 2017 @@ -38,6 +38,7 @@ import java.util.regex.Pattern; import org.apache.hadoop.util.Shell; import org.apache.pig.backend.executionengine.ExecException; import org.apache.pig.impl.PigContext; +import org.apache.pig.impl.util.UDFContext; import org.apache.pig.tools.pigstats.PigStats; /** @@ -127,7 +128,9 @@ public abstract class ScriptEngine { //protected static InputStream getScriptAsStream(String scriptPath) { InputStream is = null; File file = new File(scriptPath); - if (file.exists()) { + // In the frontend give preference to the local file. + // In the backend, try the jar first + if (UDFContext.getUDFContext().isFrontend() && file.exists()) { try { is = new FileInputStream(file); } catch (FileNotFoundException e) { @@ -156,7 +159,14 @@ public abstract class ScriptEngine { } } } - + if (is == null && file.exists()) { + try { + is = new FileInputStream(file); + } catch (FileNotFoundException e) { + throw new IllegalStateException("could not find existing file "+scriptPath, e); + } + } + // TODO: discuss if we want to add logic here to load a script from HDFS if (is == null) { Modified: pig/branches/spark/src/org/apache/pig/scripting/js/JsFunction.java URL: http://svn.apache.org/viewvc/pig/branches/spark/src/org/apache/pig/scripting/js/JsFunction.java?rev=1783988&r1=1783987&r2=1783988&view=diff ============================================================================== --- pig/branches/spark/src/org/apache/pig/scripting/js/JsFunction.java (original) +++ pig/branches/spark/src/org/apache/pig/scripting/js/JsFunction.java Wed Feb 22 09:43:41 2017 @@ -95,7 +95,7 @@ public class JsFunction extends EvalFunc private void debugConvertPigToJS(int depth, String pigType, Object value, Schema schema) { if (LOG.isDebugEnabled()) { - LOG.debug(indent(depth)+"converting from Pig " + pigType + " " + value + " using " + stringify(schema)); + LOG.debug(indent(depth)+"converting from Pig " + pigType + " " + toString(value) + " using " + stringify(schema)); } } Modified: pig/branches/spark/src/org/apache/pig/scripting/jython/JythonFunction.java URL: http://svn.apache.org/viewvc/pig/branches/spark/src/org/apache/pig/scripting/jython/JythonFunction.java?rev=1783988&r1=1783987&r2=1783988&view=diff ============================================================================== --- pig/branches/spark/src/org/apache/pig/scripting/jython/JythonFunction.java (original) +++ pig/branches/spark/src/org/apache/pig/scripting/jython/JythonFunction.java Wed Feb 22 09:43:41 2017 @@ -54,7 +54,7 @@ public class JythonFunction extends Eval try { f = JythonScriptEngine.getFunction(filename, functionName); this.function = f; - num_parameters = ((PyBaseCode) f.func_code).co_argcount; + num_parameters = ((PyBaseCode) f.__code__).co_argcount; PyObject outputSchemaDef = f.__findattr__("outputSchema".intern()); if (outputSchemaDef != null) { this.schema = Utils.getSchemaFromString(outputSchemaDef.toString()); @@ -105,7 +105,7 @@ public class JythonFunction extends Eval @Override public Object exec(Tuple tuple) throws IOException { try { - if (tuple == null || (num_parameters == 0 && !((PyTableCode)function.func_code).varargs)) { + if (tuple == null || (num_parameters == 0 && !((PyTableCode)function.__code__).varargs)) { // ignore input tuple PyObject out = function.__call__(); return JythonUtils.pythonToPig(out); Modified: pig/branches/spark/src/org/apache/pig/tools/DownloadResolver.java URL: http://svn.apache.org/viewvc/pig/branches/spark/src/org/apache/pig/tools/DownloadResolver.java?rev=1783988&r1=1783987&r2=1783988&view=diff ============================================================================== --- pig/branches/spark/src/org/apache/pig/tools/DownloadResolver.java (original) +++ pig/branches/spark/src/org/apache/pig/tools/DownloadResolver.java Wed Feb 22 09:43:41 2017 @@ -44,8 +44,6 @@ public class DownloadResolver { private static DownloadResolver downloadResolver = new DownloadResolver(); private DownloadResolver() { - System.setProperty("groovy.grape.report.downloads", "true"); - if (System.getProperty("grape.config") != null) { LOG.info("Using ivysettings file from " + System.getProperty("grape.config")); } else { Added: pig/branches/spark/src/org/apache/pig/tools/grunt/ConsoleReaderInputStream.java URL: http://svn.apache.org/viewvc/pig/branches/spark/src/org/apache/pig/tools/grunt/ConsoleReaderInputStream.java?rev=1783988&view=auto ============================================================================== --- pig/branches/spark/src/org/apache/pig/tools/grunt/ConsoleReaderInputStream.java (added) +++ pig/branches/spark/src/org/apache/pig/tools/grunt/ConsoleReaderInputStream.java Wed Feb 22 09:43:41 2017 @@ -0,0 +1,122 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.pig.tools.grunt; + +import java.io.IOException; +import java.io.InputStream; +import java.io.SequenceInputStream; +import java.util.Enumeration; + +import jline.console.ConsoleReader; + +/** Borrowed from jline.console.internal.ConsoleReaderInputStream. However, + * we cannot use ConsoleReaderInputStream directly since: + * 1. ConsoleReaderInputStream is not public + * 2. ConsoleReaderInputStream has a bug which does not deal with UTF-8 correctly + */ +public class ConsoleReaderInputStream extends SequenceInputStream { + private static InputStream systemIn = System.in; + + public static void setIn() throws IOException { + setIn(new ConsoleReader()); + } + + public static void setIn(final ConsoleReader reader) { + System.setIn(new ConsoleReaderInputStream(reader)); + } + + /** + * Restore the original {@link System#in} input stream. + */ + public static void restoreIn() { + System.setIn(systemIn); + } + + public ConsoleReaderInputStream(final ConsoleReader reader) { + super(new ConsoleEnumeration(reader)); + } + + private static class ConsoleEnumeration implements Enumeration { + private final ConsoleReader reader; + private ConsoleLineInputStream next = null; + private ConsoleLineInputStream prev = null; + + public ConsoleEnumeration(final ConsoleReader reader) { + this.reader = reader; + } + + public Object nextElement() { + if (next != null) { + InputStream n = next; + prev = next; + next = null; + + return n; + } + + return new ConsoleLineInputStream(reader); + } + + public boolean hasMoreElements() { + // the last line was null + if ((prev != null) && (prev.wasNull == true)) { + return false; + } + + if (next == null) { + next = (ConsoleLineInputStream) nextElement(); + } + + return next != null; + } + } + + private static class ConsoleLineInputStream extends InputStream { + private final ConsoleReader reader; + private byte[] buffer = null; + private int index = 0; + private boolean eol = false; + protected boolean wasNull = false; + + public ConsoleLineInputStream(final ConsoleReader reader) { + this.reader = reader; + } + + public int read() throws IOException { + if (eol) { + return -1; + } + + if (buffer == null) { + buffer = reader.readLine().getBytes(); + } + + if (buffer == null) { + wasNull = true; + return -1; + } + + if (index >= buffer.length) { + eol = true; + return '\n'; // lines are ended with a newline + } + + return buffer[index++]; + } + } +} \ No newline at end of file Modified: pig/branches/spark/src/org/apache/pig/tools/grunt/Grunt.java URL: http://svn.apache.org/viewvc/pig/branches/spark/src/org/apache/pig/tools/grunt/Grunt.java?rev=1783988&r1=1783987&r2=1783988&view=diff ============================================================================== --- pig/branches/spark/src/org/apache/pig/tools/grunt/Grunt.java (original) +++ pig/branches/spark/src/org/apache/pig/tools/grunt/Grunt.java Wed Feb 22 09:43:41 2017 @@ -20,7 +20,7 @@ package org.apache.pig.tools.grunt; import java.io.BufferedReader; import java.util.ArrayList; -import jline.ConsoleReader; +import jline.console.ConsoleReader; import org.apache.commons.logging.Log; import org.apache.commons.logging.LogFactory; @@ -52,8 +52,8 @@ public class Grunt public void setConsoleReader(ConsoleReader c) { - c.addCompletor(new PigCompletorAliases(pig)); - c.addCompletor(new PigCompletor()); + c.addCompleter(new PigCompletorAliases(pig)); + c.addCompleter(new PigCompletor()); parser.setConsoleReader(c); } Modified: pig/branches/spark/src/org/apache/pig/tools/grunt/GruntParser.java URL: http://svn.apache.org/viewvc/pig/branches/spark/src/org/apache/pig/tools/grunt/GruntParser.java?rev=1783988&r1=1783987&r2=1783988&view=diff ============================================================================== --- pig/branches/spark/src/org/apache/pig/tools/grunt/GruntParser.java (original) +++ pig/branches/spark/src/org/apache/pig/tools/grunt/GruntParser.java Wed Feb 22 09:43:41 2017 @@ -26,7 +26,6 @@ import java.io.FileReader; import java.io.IOException; import java.io.InputStream; import java.io.InputStreamReader; -import java.io.OutputStreamWriter; import java.io.PrintStream; import java.io.Reader; import java.io.StringReader; @@ -42,8 +41,7 @@ import java.util.Map; import java.util.Properties; import java.util.Set; -import jline.ConsoleReader; -import jline.ConsoleReaderInputStream; +import jline.console.ConsoleReader; import org.apache.commons.io.output.NullOutputStream; import org.apache.commons.logging.Log; @@ -264,7 +262,7 @@ public class GruntParser extends PigScri public void prompt() { if (mInteractive) { - mConsoleReader.setDefaultPrompt("grunt> "); + mConsoleReader.setPrompt("grunt> "); } } @@ -516,8 +514,13 @@ public class GruntParser extends PigScri ConsoleReader reader; boolean interactive; - mPigServer.getPigContext().setParams(params); - mPigServer.getPigContext().setParamFiles(files); + PigContext pc = mPigServer.getPigContext(); + + if( !loadOnly ) { + pc.getPreprocessorContext().paramScopePush(); + } + pc.setParams(params); + pc.setParamFiles(files); try { FetchFileRet fetchFile = FileLocalizer.fetchFile(mConf, script); @@ -528,7 +531,7 @@ public class GruntParser extends PigScri cmds = cmds.replaceAll("\t"," "); reader = new ConsoleReader(new ByteArrayInputStream(cmds.getBytes()), - new OutputStreamWriter(System.out)); + System.out); reader.setHistory(mConsoleReader.getHistory()); InputStream in = new ConsoleReaderInputStream(reader); inputReader = new BufferedReader(new InputStreamReader(in)); @@ -560,6 +563,9 @@ public class GruntParser extends PigScri if (interactive) { System.out.println(""); } + if( ! loadOnly ) { + pc.getPreprocessorContext().paramScopePop(); + } } @Override Modified: pig/branches/spark/src/org/apache/pig/tools/grunt/PigCompletor.java URL: http://svn.apache.org/viewvc/pig/branches/spark/src/org/apache/pig/tools/grunt/PigCompletor.java?rev=1783988&r1=1783987&r2=1783988&view=diff ============================================================================== --- pig/branches/spark/src/org/apache/pig/tools/grunt/PigCompletor.java (original) +++ pig/branches/spark/src/org/apache/pig/tools/grunt/PigCompletor.java Wed Feb 22 09:43:41 2017 @@ -33,9 +33,9 @@ import java.util.TreeSet; import org.apache.commons.logging.Log; import org.apache.commons.logging.LogFactory; -import jline.Completor; +import jline.console.completer.Completer; -public class PigCompletor implements Completor { +public class PigCompletor implements Completer { private final Log log = LogFactory.getLog(getClass()); Set<String> candidates; static final String AUTOCOMPLETE_FILENAME = "autocomplete"; Modified: pig/branches/spark/src/org/apache/pig/tools/grunt/PigCompletorAliases.java URL: http://svn.apache.org/viewvc/pig/branches/spark/src/org/apache/pig/tools/grunt/PigCompletorAliases.java?rev=1783988&r1=1783987&r2=1783988&view=diff ============================================================================== --- pig/branches/spark/src/org/apache/pig/tools/grunt/PigCompletorAliases.java (original) +++ pig/branches/spark/src/org/apache/pig/tools/grunt/PigCompletorAliases.java Wed Feb 22 09:43:41 2017 @@ -26,12 +26,11 @@ import java.util.TreeSet; import org.apache.commons.logging.Log; import org.apache.commons.logging.LogFactory; - import org.apache.pig.PigServer; -import jline.Completor; +import jline.console.completer.Completer; -public class PigCompletorAliases implements Completor { +public class PigCompletorAliases implements Completer { private final Log log = LogFactory.getLog(getClass()); Set<String> keywords; PigServer pig; Modified: pig/branches/spark/src/org/apache/pig/tools/parameters/PigFileParser.jj URL: http://svn.apache.org/viewvc/pig/branches/spark/src/org/apache/pig/tools/parameters/PigFileParser.jj?rev=1783988&r1=1783987&r2=1783988&view=diff ============================================================================== --- pig/branches/spark/src/org/apache/pig/tools/parameters/PigFileParser.jj (original) +++ pig/branches/spark/src/org/apache/pig/tools/parameters/PigFileParser.jj Wed Feb 22 09:43:41 2017 @@ -259,8 +259,11 @@ TOKEN : <PIGDEFAULT: "%default" > } + TOKEN : { + <REGISTER: "register"> : IN_REGISTER + | <IDENTIFIER: (<SPECIALCHAR>)*<LETTER>(<DIGIT> | <LETTER> | <SPECIALCHAR>)*> | <LITERAL: ("\"" ((~["\""])*("\\\"")?)* "\"")|("'" ((~["'"])*("\\\'")?)* "'") > @@ -276,7 +279,14 @@ TOKEN : <OTHER: (~["\"" , "'" , "`" , "a"-"z" , "A"-"Z" , "_" , "#" , "=" , " " , "\n" , "\t" , "\r", "%", "/", "-", "$"])+ > | <NOT_OTHER_CHAR: ["\"" , "'" , "`" , "a"-"z" , "A"-"Z" , "_" , "#" , "=" , " " , "\n" , "\t" , "\r", "%", "/", "-", "$"] > - +} + +<IN_REGISTER> MORE : { " " | "\t" | "\r" | "\n"} + +<IN_REGISTER> TOKEN: { + <PATH: (~["(", ")", ";", "\r", " ", "\t", "\n"])+> { + matchedToken.image = image.toString(); + }: DEFAULT } void Parse() throws IOException : {} @@ -288,6 +298,7 @@ void input() throws IOException : { String s; Token strTok = null; + Token strTok2 = null; } { strTok = <PIG> @@ -308,6 +319,20 @@ void input() throws IOException : { pc.validate(strTok.toString()); } ) | + strTok = <REGISTER> + strTok2 = <PATH> {} + { + // Adding a special case for register since it handles "/*" globbing + // and this conflicts with general multi-line comment "/* */". + // See the comment above on OTHERS on how tokenizer matches the longest + // match. Here, string next to "register" is treated as PATH TOKEN + // and therefore not consider "/*" as part of the comment + // (and avoid the longest match problem). + out.append(strTok.image); + String sub_line = pc.substitute(strTok2.image); + out.append(sub_line); + } + | s = paramString(){} { //process an ordinary pig line - perform substitution
