Modified: pig/branches/spark/src/org/apache/pig/backend/hadoop/executionengine/util/CombinerOptimizerUtil.java URL: http://svn.apache.org/viewvc/pig/branches/spark/src/org/apache/pig/backend/hadoop/executionengine/util/CombinerOptimizerUtil.java?rev=1733627&r1=1733626&r2=1733627&view=diff ============================================================================== --- pig/branches/spark/src/org/apache/pig/backend/hadoop/executionengine/util/CombinerOptimizerUtil.java (original) +++ pig/branches/spark/src/org/apache/pig/backend/hadoop/executionengine/util/CombinerOptimizerUtil.java Fri Mar 4 18:17:39 2016 @@ -206,7 +206,9 @@ public class CombinerOptimizerUtil { // fix projection and function time for algebraic functions in reduce foreach for (Pair<PhysicalOperator, PhysicalPlan> op2plan : algebraicOps) { setProjectInput(op2plan.first, op2plan.second, op2newpos.get(op2plan.first)); + byte resultType = op2plan.first.getResultType(); ((POUserFunc)op2plan.first).setAlgebraicFunction(POUserFunc.FINAL); + op2plan.first.setResultType(resultType); } // we have modified the foreach inner plans - so set them again @@ -251,7 +253,7 @@ public class CombinerOptimizerUtil { POLocalRearrange mlr = getNewRearrange(rearrange); POPartialAgg mapAgg = null; if (doMapAgg) { - mapAgg = createPartialAgg(cfe); + mapAgg = createPartialAgg(cfe, isGroupAll(rearrange)); } // A specialized local rearrange operator will replace @@ -285,16 +287,31 @@ public class CombinerOptimizerUtil { } } + private static boolean isGroupAll(POLocalRearrange lr) { + // B: Local Rearrange[tuple]{chararray}(false) - scope-3 -> scope-14 + // | | + // | Constant(all) - scope-4 + boolean isGroupAll = false; + if (lr.getPlans().size() == 1) { + PhysicalPlan plan = lr.getPlans().get(0); + if (plan.getKeys().size() == 1 && (plan.getRoots().get(0) instanceof ConstantExpression)) { + ConstantExpression constExpr = (ConstantExpression) plan.getRoots().get(0); + isGroupAll = ("all").equals(constExpr.getValue()); + } + } + return isGroupAll; + } + /** * Translate POForEach in combiner into a POPartialAgg * @param combineFE * @return partial aggregate operator * @throws CloneNotSupportedException */ - private static POPartialAgg createPartialAgg(POForEach combineFE) throws CloneNotSupportedException { + private static POPartialAgg createPartialAgg(POForEach combineFE, boolean isGroupAll) throws CloneNotSupportedException { String scope = combineFE.getOperatorKey().scope; POPartialAgg poAgg = new POPartialAgg(new OperatorKey(scope, - NodeIdGenerator.getGenerator().getNextNodeId(scope))); + NodeIdGenerator.getGenerator().getNextNodeId(scope)), isGroupAll); poAgg.addOriginalLocation(combineFE.getAlias(), combineFE.getOriginalLocations()); poAgg.setResultType(combineFE.getResultType());
Modified: pig/branches/spark/src/org/apache/pig/backend/hadoop/hbase/HBaseStorage.java URL: http://svn.apache.org/viewvc/pig/branches/spark/src/org/apache/pig/backend/hadoop/hbase/HBaseStorage.java?rev=1733627&r1=1733626&r2=1733627&view=diff ============================================================================== --- pig/branches/spark/src/org/apache/pig/backend/hadoop/hbase/HBaseStorage.java (original) +++ pig/branches/spark/src/org/apache/pig/backend/hadoop/hbase/HBaseStorage.java Fri Mar 4 18:17:39 2016 @@ -16,8 +16,6 @@ */ package org.apache.pig.backend.hadoop.hbase; -import java.io.DataInput; -import java.io.DataOutput; import java.io.IOException; import java.lang.reflect.InvocationTargetException; import java.lang.reflect.Method; @@ -48,8 +46,8 @@ import org.apache.commons.logging.LogFac import org.apache.hadoop.conf.Configuration; import org.apache.hadoop.fs.Path; import org.apache.hadoop.hbase.HBaseConfiguration; -import org.apache.hadoop.hbase.client.Put; import org.apache.hadoop.hbase.client.Delete; +import org.apache.hadoop.hbase.client.Put; import org.apache.hadoop.hbase.client.Result; import org.apache.hadoop.hbase.client.Scan; import org.apache.hadoop.hbase.filter.BinaryComparator; @@ -88,6 +86,7 @@ import org.apache.pig.ResourceSchema.Res import org.apache.pig.StoreFuncInterface; import org.apache.pig.StoreResources; import org.apache.pig.backend.hadoop.executionengine.mapReduceLayer.PigSplit; +import org.apache.pig.backend.hadoop.executionengine.shims.HadoopShims; import org.apache.pig.backend.hadoop.hbase.HBaseTableInputFormat.HBaseTableIFBuilder; import org.apache.pig.builtin.FuncUtils; import org.apache.pig.builtin.Utf8StorageConverter; @@ -205,6 +204,7 @@ public class HBaseStorage extends LoadFu validOptions_.addOption("cacheBlocks", true, "Set whether blocks should be cached for the scan"); validOptions_.addOption("caching", true, "Number of rows scanners should cache"); validOptions_.addOption("limit", true, "Per-region limit"); + validOptions_.addOption("maxResultsPerColumnFamily", true, "Limit the maximum number of values returned per row per column family"); validOptions_.addOption("delim", true, "Column delimiter"); validOptions_.addOption("ignoreWhitespace", true, "Ignore spaces when parsing columns"); validOptions_.addOption("caster", true, "Caster to use for converting values. A class name, " + @@ -251,6 +251,7 @@ public class HBaseStorage extends LoadFu * <li>-lte=maxKeyVal * <li>-regex=match regex on KeyVal * <li>-limit=numRowsPerRegion max number of rows to retrieve per region + * <li>-maxResultsPerColumnFamily= Limit the maximum number of values returned per row per column family * <li>-delim=char delimiter to use when parsing column names (default is space or comma) * <li>-ignoreWhitespace=(true|false) ignore spaces when parsing column names (default true) * <li>-cacheBlocks=(true|false) Set whether blocks should be cached for the scan (default false). @@ -275,7 +276,7 @@ public class HBaseStorage extends LoadFu configuredOptions_ = parser_.parse(validOptions_, optsArr); } catch (ParseException e) { HelpFormatter formatter = new HelpFormatter(); - formatter.printHelp( "[-loadKey] [-gt] [-gte] [-lt] [-lte] [-regex] [-cacheBlocks] [-caching] [-caster] [-noWAL] [-limit] [-delim] [-ignoreWhitespace] [-minTimestamp] [-maxTimestamp] [-timestamp] [-includeTimestamp] [-includeTombstone]", validOptions_ ); + formatter.printHelp( "[-loadKey] [-gt] [-gte] [-lt] [-lte] [-regex] [-cacheBlocks] [-caching] [-caster] [-noWAL] [-limit] [-maxResultsPerColumnFamily] [-delim] [-ignoreWhitespace] [-minTimestamp] [-maxTimestamp] [-timestamp] [-includeTimestamp] [-includeTombstone]", validOptions_ ); throw e; } @@ -476,6 +477,10 @@ public class HBaseStorage extends LoadFu if (configuredOptions_.hasOption("timestamp")){ scan.setTimeStamp(timestamp_); } + if (configuredOptions_.hasOption("maxResultsPerColumnFamily")){ + int maxResultsPerColumnFamily_ = Integer.valueOf(configuredOptions_.getOptionValue("maxResultsPerColumnFamily")); + scan.setMaxResultsPerColumnFamily(maxResultsPerColumnFamily_); + } // if the group of columnInfos for this family doesn't contain a prefix, we don't need // to set any filters, we can just call addColumn or addFamily. See javadocs below. @@ -818,7 +823,9 @@ public class HBaseStorage extends LoadFu List<Class> classList = new ArrayList<Class>(); classList.add(org.apache.hadoop.hbase.client.HTable.class); // main hbase jar or hbase-client classList.add(org.apache.hadoop.hbase.mapreduce.TableSplit.class); // main hbase jar or hbase-server - classList.add(com.google.common.collect.Lists.class); // guava + if (!HadoopShims.isHadoopYARN()) { //Avoid shipping duplicate. Hadoop 0.23/2 itself has guava + classList.add(com.google.common.collect.Lists.class); // guava + } classList.add(org.apache.zookeeper.ZooKeeper.class); // zookeeper // Additional jars that are specific to v0.95.0+ addClassToList("org.cloudera.htrace.Trace", classList); // htrace @@ -1052,7 +1059,8 @@ public class HBaseStorage extends LoadFu * @throws IOException */ public Delete createDelete(Object key, byte type, long timestamp) throws IOException { - Delete delete = new Delete(objToBytes(key, type), timestamp); + Delete delete = new Delete(objToBytes(key, type)); + delete.setTimestamp(timestamp); if(noWAL_) { delete.setWriteToWAL(false); @@ -1188,7 +1196,7 @@ public class HBaseStorage extends LoadFu this.requiredFieldList = requiredFieldList; if (requiredFieldList != null && requiredFields.size() > (columnInfo_.size() + colOffset)) { - throw new FrontendException("The list of columns to project from HBase is larger than HBaseStorage is configured to load."); + throw new FrontendException("The list of columns to project from HBase (" + requiredFields.size() + ") is larger than HBaseStorage is configured to load (" + (columnInfo_.size() + colOffset) + ")."); } // remember the projection @@ -1215,9 +1223,10 @@ public class HBaseStorage extends LoadFu return new RequiredFieldResponse(true); } + @Override public void ensureAllKeyInstancesInSameSplit() throws IOException { - /** - * no-op because hbase keys are unique + /** + * no-op because hbase keys are unique * This will also work with things like DelimitedKeyPrefixRegionSplitPolicy * if you need a partial key match to be included in the split */ @@ -1232,7 +1241,7 @@ public class HBaseStorage extends LoadFu throw new RuntimeException("LoadFunc expected split of type TableSplit but was " + split.getClass().getName()); } } - + /** * Class to encapsulate logic around which column names were specified in each Modified: pig/branches/spark/src/org/apache/pig/builtin/AVG.java URL: http://svn.apache.org/viewvc/pig/branches/spark/src/org/apache/pig/builtin/AVG.java?rev=1733627&r1=1733626&r2=1733627&view=diff ============================================================================== --- pig/branches/spark/src/org/apache/pig/builtin/AVG.java (original) +++ pig/branches/spark/src/org/apache/pig/builtin/AVG.java Fri Mar 4 18:17:39 2016 @@ -27,6 +27,7 @@ import org.apache.pig.Algebraic; import org.apache.pig.EvalFunc; import org.apache.pig.FuncSpec; import org.apache.pig.PigException; +import org.apache.pig.backend.executionengine.ExecException; import org.apache.pig.data.DataBag; import org.apache.pig.data.DataByteArray; import org.apache.pig.data.DataType; @@ -34,7 +35,6 @@ import org.apache.pig.data.Tuple; import org.apache.pig.data.TupleFactory; import org.apache.pig.impl.logicalLayer.FrontendException; import org.apache.pig.impl.logicalLayer.schema.Schema; -import org.apache.pig.backend.executionengine.ExecException; /** @@ -79,14 +79,17 @@ public class AVG extends EvalFunc<Double } } + @Override public String getInitial() { return Initial.class.getName(); } + @Override public String getIntermed() { return Intermediate.class.getName(); } + @Override public String getFinal() { return Final.class.getName(); } @@ -230,7 +233,7 @@ public class AVG extends EvalFunc<Double DataBag values = (DataBag)input.get(0); // if we were handed an empty bag, return NULL - if(values.size() == 0) { + if(values == null || values.size() == 0) { return null; } Modified: pig/branches/spark/src/org/apache/pig/builtin/AlgebraicBigDecimalMathBase.java URL: http://svn.apache.org/viewvc/pig/branches/spark/src/org/apache/pig/builtin/AlgebraicBigDecimalMathBase.java?rev=1733627&r1=1733626&r2=1733627&view=diff ============================================================================== --- pig/branches/spark/src/org/apache/pig/builtin/AlgebraicBigDecimalMathBase.java (original) +++ pig/branches/spark/src/org/apache/pig/builtin/AlgebraicBigDecimalMathBase.java Fri Mar 4 18:17:39 2016 @@ -18,8 +18,8 @@ package org.apache.pig.builtin; import java.io.IOException; -import java.util.Iterator; import java.math.BigDecimal; +import java.util.Iterator; import org.apache.pig.Accumulator; import org.apache.pig.PigException; @@ -88,7 +88,7 @@ public abstract class AlgebraicBigDecima DataBag values = (DataBag)input.get(0); // if we were handed an empty bag, return NULL // this is in compliance with SQL standard - if(values.size() == 0) { + if(values == null || values.size() == 0) { return null; } BigDecimal sofar = AlgebraicBigDecimalMathBase.getSeed(opProvider.getOp()); Modified: pig/branches/spark/src/org/apache/pig/builtin/AlgebraicBigIntegerMathBase.java URL: http://svn.apache.org/viewvc/pig/branches/spark/src/org/apache/pig/builtin/AlgebraicBigIntegerMathBase.java?rev=1733627&r1=1733626&r2=1733627&view=diff ============================================================================== --- pig/branches/spark/src/org/apache/pig/builtin/AlgebraicBigIntegerMathBase.java (original) +++ pig/branches/spark/src/org/apache/pig/builtin/AlgebraicBigIntegerMathBase.java Fri Mar 4 18:17:39 2016 @@ -18,8 +18,8 @@ package org.apache.pig.builtin; import java.io.IOException; -import java.util.Iterator; import java.math.BigInteger; +import java.util.Iterator; import org.apache.pig.Accumulator; import org.apache.pig.PigException; @@ -88,7 +88,7 @@ public abstract class AlgebraicBigIntege DataBag values = (DataBag)input.get(0); // if we were handed an empty bag, return NULL // this is in compliance with SQL standard - if(values.size() == 0) { + if(values == null || values.size() == 0) { return null; } BigInteger sofar = AlgebraicBigIntegerMathBase.getSeed(opProvider.getOp()); Modified: pig/branches/spark/src/org/apache/pig/builtin/AlgebraicByteArrayMathBase.java URL: http://svn.apache.org/viewvc/pig/branches/spark/src/org/apache/pig/builtin/AlgebraicByteArrayMathBase.java?rev=1733627&r1=1733626&r2=1733627&view=diff ============================================================================== --- pig/branches/spark/src/org/apache/pig/builtin/AlgebraicByteArrayMathBase.java (original) +++ pig/branches/spark/src/org/apache/pig/builtin/AlgebraicByteArrayMathBase.java Fri Mar 4 18:17:39 2016 @@ -67,7 +67,7 @@ public abstract class AlgebraicByteArray DataBag values = (DataBag)input.get(0); // if we were handed an empty bag, return NULL // this is in compliance with SQL standard - if(values.size() == 0) { + if(values == null || values.size() == 0) { return null; } double sofar = AlgebraicByteArrayMathBase.getSeed(opProvider.getOp()); Modified: pig/branches/spark/src/org/apache/pig/builtin/AlgebraicDoubleMathBase.java URL: http://svn.apache.org/viewvc/pig/branches/spark/src/org/apache/pig/builtin/AlgebraicDoubleMathBase.java?rev=1733627&r1=1733626&r2=1733627&view=diff ============================================================================== --- pig/branches/spark/src/org/apache/pig/builtin/AlgebraicDoubleMathBase.java (original) +++ pig/branches/spark/src/org/apache/pig/builtin/AlgebraicDoubleMathBase.java Fri Mar 4 18:17:39 2016 @@ -64,7 +64,7 @@ public abstract class AlgebraicDoubleMat DataBag values = (DataBag)input.get(0); // if we were handed an empty bag, return NULL // this is in compliance with SQL standard - if(values.size() == 0) { + if(values == null || values.size() == 0) { return null; } double sofar = AlgebraicDoubleMathBase.getSeed(opProvider.getOp()); Modified: pig/branches/spark/src/org/apache/pig/builtin/AlgebraicFloatMathBase.java URL: http://svn.apache.org/viewvc/pig/branches/spark/src/org/apache/pig/builtin/AlgebraicFloatMathBase.java?rev=1733627&r1=1733626&r2=1733627&view=diff ============================================================================== --- pig/branches/spark/src/org/apache/pig/builtin/AlgebraicFloatMathBase.java (original) +++ pig/branches/spark/src/org/apache/pig/builtin/AlgebraicFloatMathBase.java Fri Mar 4 18:17:39 2016 @@ -64,7 +64,7 @@ public abstract class AlgebraicFloatMath DataBag values = (DataBag)input.get(0); // if we were handed an empty bag, return NULL // this is in compliance with SQL standard - if(values.size() == 0) { + if(values == null || values.size() == 0) { return null; } Float sofar = AlgebraicFloatMathBase.getSeed(opProvider.getOp()); Modified: pig/branches/spark/src/org/apache/pig/builtin/AlgebraicIntMathBase.java URL: http://svn.apache.org/viewvc/pig/branches/spark/src/org/apache/pig/builtin/AlgebraicIntMathBase.java?rev=1733627&r1=1733626&r2=1733627&view=diff ============================================================================== --- pig/branches/spark/src/org/apache/pig/builtin/AlgebraicIntMathBase.java (original) +++ pig/branches/spark/src/org/apache/pig/builtin/AlgebraicIntMathBase.java Fri Mar 4 18:17:39 2016 @@ -64,7 +64,7 @@ public abstract class AlgebraicIntMathBa DataBag values = (DataBag)input.get(0); // if we were handed an empty bag, return NULL // this is in compliance with SQL standard - if(values.size() == 0) { + if(values == null || values.size() == 0) { return null; } int sofar = AlgebraicIntMathBase.getSeed(opProvider.getOp()); Modified: pig/branches/spark/src/org/apache/pig/builtin/AlgebraicLongMathBase.java URL: http://svn.apache.org/viewvc/pig/branches/spark/src/org/apache/pig/builtin/AlgebraicLongMathBase.java?rev=1733627&r1=1733626&r2=1733627&view=diff ============================================================================== --- pig/branches/spark/src/org/apache/pig/builtin/AlgebraicLongMathBase.java (original) +++ pig/branches/spark/src/org/apache/pig/builtin/AlgebraicLongMathBase.java Fri Mar 4 18:17:39 2016 @@ -64,7 +64,7 @@ public abstract class AlgebraicLongMathB DataBag values = (DataBag)input.get(0); // if we were handed an empty bag, return NULL // this is in compliance with SQL standard - if(values.size() == 0) { + if(values == null || values.size() == 0) { return null; } Long sofar = AlgebraicLongMathBase.getSeed(opProvider.getOp()); Modified: pig/branches/spark/src/org/apache/pig/builtin/AvroStorage.java URL: http://svn.apache.org/viewvc/pig/branches/spark/src/org/apache/pig/builtin/AvroStorage.java?rev=1733627&r1=1733626&r2=1733627&view=diff ============================================================================== --- pig/branches/spark/src/org/apache/pig/builtin/AvroStorage.java (original) +++ pig/branches/spark/src/org/apache/pig/builtin/AvroStorage.java Fri Mar 4 18:17:39 2016 @@ -29,6 +29,7 @@ import org.apache.avro.Schema; import org.apache.avro.SchemaParseException; import org.apache.avro.Schema.Type; import org.apache.avro.file.DataFileStream; +import org.apache.avro.generic.GenericContainer; import org.apache.avro.generic.GenericDatumReader; import org.apache.avro.generic.GenericData; import org.apache.avro.mapred.AvroInputFormat; @@ -120,6 +121,8 @@ public class AvroStorage extends LoadFun * <li><code>-schemafile</code> Specifies URL for avro schema file * from which to read the input schema (can be local file, hdfs, * url, etc).</li> + * <li><code>-schemaclass</code> Specifies fully qualified class name for avro + * class in your classpath which implements GenericContainer.</li> * <li><code>-examplefile</code> Specifies URL for avro data file from * which to copy the input schema (can be local file, hdfs, url, etc).</li> * <li><code>-allowrecursive</code> Option to allow recursive schema @@ -153,6 +156,9 @@ public class AvroStorage extends LoadFun validOptions.addOption("f", "schemafile", true, "Specifies URL for avro schema file from which to read " + "the input or output schema"); + validOptions.addOption("c", "schemaclass", true, + "Specifies fully qualified class name for avro " + + "class in your classpath which implements GenericContainer."); validOptions.addOption("e", "examplefile", true, "Specifies URL for avro data file from which to copy " + "the output schema"); @@ -169,8 +175,14 @@ public class AvroStorage extends LoadFun if (configuredOptions.hasOption('f')) { try { Path p = new Path(configuredOptions.getOptionValue('f')); + Configuration conf; + if (UDFContext.getUDFContext().getJobConf()==null) { + conf = new Configuration(); + } else { + conf = UDFContext.getUDFContext().getJobConf(); + } Schema s = new Schema.Parser() - .parse((FileSystem.get(p.toUri(), new Configuration()).open(p))); + .parse((FileSystem.get(p.toUri(), conf).open(p))); setInputAvroSchema(s); setOutputAvroSchema(s); } catch (FileNotFoundException fnfe) { @@ -179,6 +191,25 @@ public class AvroStorage extends LoadFun "schema was described in a local file on the front end, and this message " + "is in the back end log, you can ignore this mesasge.)", fnfe); } + } else if (configuredOptions.hasOption('c')) { + String schemaClass = configuredOptions.getOptionValue('c'); + try { + Schema s = ((GenericContainer) Class.forName(schemaClass).newInstance()).getSchema(); + setInputAvroSchema(s); + setOutputAvroSchema(s); + } catch (ClassNotFoundException | IllegalAccessException cnfe) { + System.err.printf("class not found exception\n"); + log.error("Schema class '" + schemaClass + "' was not found in the classpath.", cnfe); + throw new RuntimeException(cnfe); + } catch (InstantiationException ie) { + System.err.printf("instantiation exception\n"); + log.error("Schema class '" + schemaClass + "' must have a public empty args constructor.", ie); + throw new RuntimeException(ie); + } catch (ClassCastException cce) { + System.err.printf("class cast exception\n"); + log.error("Schema class '" + schemaClass + "' must implement org.apache.avro.generic.GenericContainer interface.", cce); + throw new RuntimeException(cce); + } } else if (configuredOptions.hasOption('e')) { setOutputAvroSchema( getAvroSchema(configuredOptions.getOptionValue('e'), Modified: pig/branches/spark/src/org/apache/pig/builtin/BigDecimalAvg.java URL: http://svn.apache.org/viewvc/pig/branches/spark/src/org/apache/pig/builtin/BigDecimalAvg.java?rev=1733627&r1=1733626&r2=1733627&view=diff ============================================================================== --- pig/branches/spark/src/org/apache/pig/builtin/BigDecimalAvg.java (original) +++ pig/branches/spark/src/org/apache/pig/builtin/BigDecimalAvg.java Fri Mar 4 18:17:39 2016 @@ -18,20 +18,20 @@ package org.apache.pig.builtin; import java.io.IOException; -import java.util.Iterator; import java.math.BigDecimal; import java.math.MathContext; +import java.util.Iterator; import org.apache.pig.Accumulator; import org.apache.pig.Algebraic; import org.apache.pig.EvalFunc; import org.apache.pig.PigException; +import org.apache.pig.backend.executionengine.ExecException; import org.apache.pig.data.DataBag; import org.apache.pig.data.DataType; import org.apache.pig.data.Tuple; import org.apache.pig.data.TupleFactory; import org.apache.pig.impl.logicalLayer.schema.Schema; -import org.apache.pig.backend.executionengine.ExecException; /** @@ -61,14 +61,17 @@ public class BigDecimalAvg extends EvalF } } + @Override public String getInitial() { return Initial.class.getName(); } + @Override public String getIntermed() { return Intermediate.class.getName(); } + @Override public String getFinal() { return Final.class.getName(); } @@ -199,7 +202,7 @@ public class BigDecimalAvg extends EvalF DataBag values = (DataBag)input.get(0); // if we were handed an empty bag, return NULL - if(values.size() == 0) { + if(values == null || values.size() == 0) { return null; } Modified: pig/branches/spark/src/org/apache/pig/builtin/BigIntegerAvg.java URL: http://svn.apache.org/viewvc/pig/branches/spark/src/org/apache/pig/builtin/BigIntegerAvg.java?rev=1733627&r1=1733626&r2=1733627&view=diff ============================================================================== --- pig/branches/spark/src/org/apache/pig/builtin/BigIntegerAvg.java (original) +++ pig/branches/spark/src/org/apache/pig/builtin/BigIntegerAvg.java Fri Mar 4 18:17:39 2016 @@ -18,21 +18,21 @@ package org.apache.pig.builtin; import java.io.IOException; -import java.util.Iterator; import java.math.BigDecimal; import java.math.BigInteger; import java.math.MathContext; +import java.util.Iterator; import org.apache.pig.Accumulator; import org.apache.pig.Algebraic; import org.apache.pig.EvalFunc; import org.apache.pig.PigException; +import org.apache.pig.backend.executionengine.ExecException; import org.apache.pig.data.DataBag; import org.apache.pig.data.DataType; import org.apache.pig.data.Tuple; import org.apache.pig.data.TupleFactory; import org.apache.pig.impl.logicalLayer.schema.Schema; -import org.apache.pig.backend.executionengine.ExecException; /** @@ -62,14 +62,17 @@ public class BigIntegerAvg extends EvalF } } + @Override public String getInitial() { return Initial.class.getName(); } + @Override public String getIntermed() { return Intermediate.class.getName(); } + @Override public String getFinal() { return Final.class.getName(); } @@ -201,7 +204,7 @@ public class BigIntegerAvg extends EvalF DataBag values = (DataBag)input.get(0); // if we were handed an empty bag, return NULL - if (values.size() == 0) { + if (values == null || values.size() == 0) { return null; } Modified: pig/branches/spark/src/org/apache/pig/builtin/DateTimeMax.java URL: http://svn.apache.org/viewvc/pig/branches/spark/src/org/apache/pig/builtin/DateTimeMax.java?rev=1733627&r1=1733626&r2=1733627&view=diff ============================================================================== --- pig/branches/spark/src/org/apache/pig/builtin/DateTimeMax.java (original) +++ pig/branches/spark/src/org/apache/pig/builtin/DateTimeMax.java Fri Mar 4 18:17:39 2016 @@ -20,8 +20,6 @@ package org.apache.pig.builtin; import java.io.IOException; import java.util.Iterator; -import org.joda.time.DateTime; - import org.apache.pig.Accumulator; import org.apache.pig.Algebraic; import org.apache.pig.EvalFunc; @@ -32,6 +30,7 @@ import org.apache.pig.data.DataType; import org.apache.pig.data.Tuple; import org.apache.pig.data.TupleFactory; import org.apache.pig.impl.logicalLayer.schema.Schema; +import org.joda.time.DateTime; /** * This method should never be used directly, use {@link MAX}. @@ -47,14 +46,17 @@ public class DateTimeMax extends EvalFun } } + @Override public String getInitial() { return Initial.class.getName(); } + @Override public String getIntermed() { return Intermediate.class.getName(); } + @Override public String getFinal() { return Final.class.getName(); } @@ -120,7 +122,7 @@ public class DateTimeMax extends EvalFun // if we were handed an empty bag, return NULL // this is in compliance with SQL standard - if(values.size() == 0) { + if(values == null || values.size() == 0) { return null; } @@ -153,7 +155,7 @@ public class DateTimeMax extends EvalFun @Override public Schema outputSchema(Schema input) { - return new Schema(new Schema.FieldSchema(null, DataType.DATETIME)); + return new Schema(new Schema.FieldSchema(null, DataType.DATETIME)); } Modified: pig/branches/spark/src/org/apache/pig/builtin/DateTimeMin.java URL: http://svn.apache.org/viewvc/pig/branches/spark/src/org/apache/pig/builtin/DateTimeMin.java?rev=1733627&r1=1733626&r2=1733627&view=diff ============================================================================== --- pig/branches/spark/src/org/apache/pig/builtin/DateTimeMin.java (original) +++ pig/branches/spark/src/org/apache/pig/builtin/DateTimeMin.java Fri Mar 4 18:17:39 2016 @@ -20,8 +20,6 @@ package org.apache.pig.builtin; import java.io.IOException; import java.util.Iterator; -import org.joda.time.DateTime; - import org.apache.pig.Accumulator; import org.apache.pig.Algebraic; import org.apache.pig.EvalFunc; @@ -32,6 +30,7 @@ import org.apache.pig.data.DataType; import org.apache.pig.data.Tuple; import org.apache.pig.data.TupleFactory; import org.apache.pig.impl.logicalLayer.schema.Schema; +import org.joda.time.DateTime; /** * This method should never be used directly, use {@link MAX}. @@ -47,14 +46,17 @@ public class DateTimeMin extends EvalFun } } + @Override public String getInitial() { return Initial.class.getName(); } + @Override public String getIntermed() { return Intermediate.class.getName(); } + @Override public String getFinal() { return Final.class.getName(); } @@ -120,7 +122,7 @@ public class DateTimeMin extends EvalFun // if we were handed an empty bag, return NULL // this is in compliance with SQL standard - if(values.size() == 0) { + if(values == null || values.size() == 0) { return null; } Modified: pig/branches/spark/src/org/apache/pig/builtin/Distinct.java URL: http://svn.apache.org/viewvc/pig/branches/spark/src/org/apache/pig/builtin/Distinct.java?rev=1733627&r1=1733626&r2=1733627&view=diff ============================================================================== --- pig/branches/spark/src/org/apache/pig/builtin/Distinct.java (original) +++ pig/branches/spark/src/org/apache/pig/builtin/Distinct.java Fri Mar 4 18:17:39 2016 @@ -22,9 +22,7 @@ import java.io.IOException; import org.apache.pig.Algebraic; import org.apache.pig.EvalFunc; -import org.apache.pig.JVMReuseManager; import org.apache.pig.PigConfiguration; -import org.apache.pig.StaticDataCleanup; import org.apache.pig.backend.executionengine.ExecException; import org.apache.pig.backend.hadoop.executionengine.mapReduceLayer.PigMapReduce; import org.apache.pig.data.BagFactory; @@ -45,16 +43,6 @@ public class Distinct extends EvalFunc< private static boolean initialized = false; private static boolean useDefaultBag = false; - static { - JVMReuseManager.getInstance().registerForStaticDataCleanup(Distinct.class); - } - - @StaticDataCleanup - public static void staticDataCleanup() { - initialized = false; - useDefaultBag = false; - } - @Override public DataBag exec(Tuple input) throws IOException { return getDistinct(input); Modified: pig/branches/spark/src/org/apache/pig/builtin/DoubleAvg.java URL: http://svn.apache.org/viewvc/pig/branches/spark/src/org/apache/pig/builtin/DoubleAvg.java?rev=1733627&r1=1733626&r2=1733627&view=diff ============================================================================== --- pig/branches/spark/src/org/apache/pig/builtin/DoubleAvg.java (original) +++ pig/branches/spark/src/org/apache/pig/builtin/DoubleAvg.java Fri Mar 4 18:17:39 2016 @@ -18,28 +18,25 @@ package org.apache.pig.builtin; import java.io.IOException; -import java.util.HashMap; import java.util.Iterator; import org.apache.pig.Accumulator; import org.apache.pig.Algebraic; import org.apache.pig.EvalFunc; -import org.apache.pig.FuncSpec; import org.apache.pig.PigException; +import org.apache.pig.backend.executionengine.ExecException; import org.apache.pig.data.DataBag; import org.apache.pig.data.DataType; import org.apache.pig.data.Tuple; import org.apache.pig.data.TupleFactory; -import org.apache.pig.impl.logicalLayer.FrontendException; import org.apache.pig.impl.logicalLayer.schema.Schema; -import org.apache.pig.backend.executionengine.ExecException; /** * This method should never be used directly, use {@link AVG}. */ public class DoubleAvg extends EvalFunc<Double> implements Algebraic, Accumulator<Double> { - + private static TupleFactory mTupleFactory = TupleFactory.getInstance(); @Override @@ -56,21 +53,24 @@ public class DoubleAvg extends EvalFunc< Double avg = null; if (count > 0) avg = new Double(sum / count); - + return avg; } catch (ExecException ee) { throw ee; } } + @Override public String getInitial() { return Initial.class.getName(); } + @Override public String getIntermed() { return Intermediate.class.getName(); } + @Override public String getFinal() { return Final.class.getName(); } @@ -91,7 +91,7 @@ public class DoubleAvg extends EvalFunc< t.set(0, d); if (d != null){ t.set(1, 1L); - }else{ + } else { t.set(1, 0L); } return t; @@ -100,9 +100,9 @@ public class DoubleAvg extends EvalFunc< } catch (Exception e) { int errCode = 2106; String msg = "Error while computing average in " + this.getClass().getSimpleName(); - throw new ExecException(msg, errCode, PigException.BUG, e); + throw new ExecException(msg, errCode, PigException.BUG, e); } - + } } @@ -117,7 +117,7 @@ public class DoubleAvg extends EvalFunc< } catch (Exception e) { int errCode = 2106; String msg = "Error while computing average in " + this.getClass().getSimpleName(); - throw new ExecException(msg, errCode, PigException.BUG, e); + throw new ExecException(msg, errCode, PigException.BUG, e); } } } @@ -145,7 +145,7 @@ public class DoubleAvg extends EvalFunc< } catch (Exception e) { int errCode = 2106; String msg = "Error while computing average in " + this.getClass().getSimpleName(); - throw new ExecException(msg, errCode, PigException.BUG, e); + throw new ExecException(msg, errCode, PigException.BUG, e); } } } @@ -166,7 +166,7 @@ public class DoubleAvg extends EvalFunc< Tuple t = it.next(); Double d = (Double)t.get(0); // we count nulls in avg as contributing 0 - // a departure from SQL for performance of + // a departure from SQL for performance of // COUNT() which implemented by just inspecting // size of the bag if(d == null) { @@ -200,9 +200,9 @@ public class DoubleAvg extends EvalFunc< static protected Double sum(Tuple input) throws ExecException, IOException { DataBag values = (DataBag)input.get(0); - + // if we were handed an empty bag, return NULL - if(values.size() == 0) { + if(values == null || values.size() == 0) { return null; } @@ -228,17 +228,17 @@ public class DoubleAvg extends EvalFunc< return null; } } - + @Override public Schema outputSchema(Schema input) { - return new Schema(new Schema.FieldSchema(null, DataType.DOUBLE)); + return new Schema(new Schema.FieldSchema(null, DataType.DOUBLE)); } - + /* Accumulator interface */ - + private Double intermediateSum = null; private Double intermediateCount = null; - + @Override public void accumulate(Tuple b) throws IOException { try { @@ -251,7 +251,7 @@ public class DoubleAvg extends EvalFunc< intermediateSum = 0.0; intermediateCount = 0.0; } - + double count = (Long)count(b); if (count > 0) { @@ -263,9 +263,9 @@ public class DoubleAvg extends EvalFunc< } catch (Exception e) { int errCode = 2106; String msg = "Error while computing average in " + this.getClass().getSimpleName(); - throw new ExecException(msg, errCode, PigException.BUG, e); + throw new ExecException(msg, errCode, PigException.BUG, e); } - } + } @Override public void cleanup() { @@ -280,5 +280,5 @@ public class DoubleAvg extends EvalFunc< avg = new Double(intermediateSum / intermediateCount); } return avg; - } + } } Modified: pig/branches/spark/src/org/apache/pig/builtin/FloatAvg.java URL: http://svn.apache.org/viewvc/pig/branches/spark/src/org/apache/pig/builtin/FloatAvg.java?rev=1733627&r1=1733626&r2=1733627&view=diff ============================================================================== --- pig/branches/spark/src/org/apache/pig/builtin/FloatAvg.java (original) +++ pig/branches/spark/src/org/apache/pig/builtin/FloatAvg.java Fri Mar 4 18:17:39 2016 @@ -24,19 +24,19 @@ import org.apache.pig.Accumulator; import org.apache.pig.Algebraic; import org.apache.pig.EvalFunc; import org.apache.pig.PigException; +import org.apache.pig.backend.executionengine.ExecException; import org.apache.pig.data.DataBag; import org.apache.pig.data.DataType; import org.apache.pig.data.Tuple; import org.apache.pig.data.TupleFactory; import org.apache.pig.impl.logicalLayer.schema.Schema; -import org.apache.pig.backend.executionengine.ExecException; /** * This method should never be used directly, use {@link AVG}. */ public class FloatAvg extends EvalFunc<Double> implements Algebraic, Accumulator<Double> { - + private static TupleFactory mTupleFactory = TupleFactory.getInstance(); @Override @@ -53,21 +53,24 @@ public class FloatAvg extends EvalFunc<D Double avg = null; if (count > 0) avg = new Double(sum / count); - + return avg; } catch (ExecException ee) { throw ee; } } + @Override public String getInitial() { return Initial.class.getName(); } + @Override public String getIntermed() { return Intermediate.class.getName(); } + @Override public String getFinal() { return Final.class.getName(); } @@ -96,9 +99,9 @@ public class FloatAvg extends EvalFunc<D } catch (Exception e) { int errCode = 2106; String msg = "Error while computing average in " + this.getClass().getSimpleName(); - throw new ExecException(msg, errCode, PigException.BUG, e); + throw new ExecException(msg, errCode, PigException.BUG, e); } - + } } @@ -113,7 +116,7 @@ public class FloatAvg extends EvalFunc<D } catch (Exception e) { int errCode = 2106; String msg = "Error while computing average in " + this.getClass().getSimpleName(); - throw new ExecException(msg, errCode, PigException.BUG, e); + throw new ExecException(msg, errCode, PigException.BUG, e); } } } @@ -141,7 +144,7 @@ public class FloatAvg extends EvalFunc<D } catch (Exception e) { int errCode = 2106; String msg = "Error while computing average in " + this.getClass().getSimpleName(); - throw new ExecException(msg, errCode, PigException.BUG, e); + throw new ExecException(msg, errCode, PigException.BUG, e); } } } @@ -162,7 +165,7 @@ public class FloatAvg extends EvalFunc<D Tuple t = it.next(); Double d = (Double)t.get(0); // we count nulls in avg as contributing 0 - // a departure from SQL for performance of + // a departure from SQL for performance of // COUNT() which implemented by just inspecting // size of the bag if(d == null) { @@ -199,7 +202,7 @@ public class FloatAvg extends EvalFunc<D DataBag values = (DataBag)input.get(0); // if we were handed an empty bag, return NULL - if(values.size() == 0) { + if(values == null || values.size() == 0) { return null; } @@ -225,17 +228,17 @@ public class FloatAvg extends EvalFunc<D return null; } } - + @Override public Schema outputSchema(Schema input) { - return new Schema(new Schema.FieldSchema(null, DataType.DOUBLE)); + return new Schema(new Schema.FieldSchema(null, DataType.DOUBLE)); } - + /* Accumulator interface */ private Double intermediateSum = null; private Double intermediateCount = null; - + @Override public void accumulate(Tuple b) throws IOException { try { @@ -248,9 +251,9 @@ public class FloatAvg extends EvalFunc<D intermediateSum = 0.0; intermediateCount = 0.0; } - + double count = (Long)count(b); - + if (count > 0) { intermediateCount += count; intermediateSum += sum; @@ -260,9 +263,9 @@ public class FloatAvg extends EvalFunc<D } catch (Exception e) { int errCode = 2106; String msg = "Error while computing average in " + this.getClass().getSimpleName(); - throw new ExecException(msg, errCode, PigException.BUG, e); + throw new ExecException(msg, errCode, PigException.BUG, e); } - } + } @Override public void cleanup() { @@ -277,6 +280,6 @@ public class FloatAvg extends EvalFunc<D avg = new Double(intermediateSum / intermediateCount); } return avg; - } + } } Modified: pig/branches/spark/src/org/apache/pig/builtin/IntAvg.java URL: http://svn.apache.org/viewvc/pig/branches/spark/src/org/apache/pig/builtin/IntAvg.java?rev=1733627&r1=1733626&r2=1733627&view=diff ============================================================================== --- pig/branches/spark/src/org/apache/pig/builtin/IntAvg.java (original) +++ pig/branches/spark/src/org/apache/pig/builtin/IntAvg.java Fri Mar 4 18:17:39 2016 @@ -24,19 +24,19 @@ import org.apache.pig.Accumulator; import org.apache.pig.Algebraic; import org.apache.pig.EvalFunc; import org.apache.pig.PigException; +import org.apache.pig.backend.executionengine.ExecException; import org.apache.pig.data.DataBag; import org.apache.pig.data.DataType; import org.apache.pig.data.Tuple; import org.apache.pig.data.TupleFactory; import org.apache.pig.impl.logicalLayer.schema.Schema; -import org.apache.pig.backend.executionengine.ExecException; /** * This method should never be used directly, use {@link AVG}. */ public class IntAvg extends EvalFunc<Double> implements Algebraic, Accumulator<Double> { - + private static TupleFactory mTupleFactory = TupleFactory.getInstance(); @Override @@ -53,21 +53,24 @@ public class IntAvg extends EvalFunc<Dou Double avg = null; if (count > 0) avg = new Double(sum / count); - + return avg; } catch (ExecException ee) { throw ee; } } + @Override public String getInitial() { return Initial.class.getName(); } + @Override public String getIntermed() { return Intermediate.class.getName(); } + @Override public String getFinal() { return Final.class.getName(); } @@ -75,7 +78,7 @@ public class IntAvg extends EvalFunc<Dou static public class Initial extends EvalFunc<Tuple> { @Override public Tuple exec(Tuple input) throws IOException { - + try { Tuple t = mTupleFactory.newTuple(2); // input is a bag with one tuple containing @@ -97,9 +100,9 @@ public class IntAvg extends EvalFunc<Dou } catch (Exception e) { int errCode = 2106; String msg = "Error while computing average in " + this.getClass().getSimpleName(); - throw new ExecException(msg, errCode, PigException.BUG, e); + throw new ExecException(msg, errCode, PigException.BUG, e); } - + } } @@ -114,7 +117,7 @@ public class IntAvg extends EvalFunc<Dou } catch (Exception e) { int errCode = 2106; String msg = "Error while computing average in " + this.getClass().getSimpleName(); - throw new ExecException(msg, errCode, PigException.BUG, e); + throw new ExecException(msg, errCode, PigException.BUG, e); } } } @@ -142,7 +145,7 @@ public class IntAvg extends EvalFunc<Dou } catch (Exception e) { int errCode = 2106; String msg = "Error while computing average in " + this.getClass().getSimpleName(); - throw new ExecException(msg, errCode, PigException.BUG, e); + throw new ExecException(msg, errCode, PigException.BUG, e); } } } @@ -163,7 +166,7 @@ public class IntAvg extends EvalFunc<Dou Tuple t = it.next(); Long l = (Long)t.get(0); // we count nulls in avg as contributing 0 - // a departure from SQL for performance of + // a departure from SQL for performance of // COUNT() which implemented by just inspecting // size of the bag if(l == null) { @@ -200,7 +203,7 @@ public class IntAvg extends EvalFunc<Dou DataBag values = (DataBag)input.get(0); // if we were handed an empty bag, return NULL - if(values.size() == 0) { + if(values == null || values.size() == 0) { return null; } @@ -211,7 +214,7 @@ public class IntAvg extends EvalFunc<Dou try { Integer i = (Integer)(t.get(0)); // we count nulls in avg as contributing 0 - // a departure from SQL for performance of + // a departure from SQL for performance of // COUNT() which implemented by just inspecting // size of the bag if (i == null) continue; @@ -230,17 +233,17 @@ public class IntAvg extends EvalFunc<Dou return null; } } - + @Override public Schema outputSchema(Schema input) { - return new Schema(new Schema.FieldSchema(null, DataType.DOUBLE)); + return new Schema(new Schema.FieldSchema(null, DataType.DOUBLE)); } /* Accumulator interface */ private Long intermediateSum = null; private Double intermediateCount = null; - + @Override public void accumulate(Tuple b) throws IOException { try { @@ -253,7 +256,7 @@ public class IntAvg extends EvalFunc<Dou intermediateSum = 0L; intermediateCount = 0.0; } - + double count = (Long)count(b); if (count > 0) { @@ -265,9 +268,9 @@ public class IntAvg extends EvalFunc<Dou } catch (Exception e) { int errCode = 2106; String msg = "Error while computing average in " + this.getClass().getSimpleName(); - throw new ExecException(msg, errCode, PigException.BUG, e); + throw new ExecException(msg, errCode, PigException.BUG, e); } - } + } @Override public void cleanup() { Modified: pig/branches/spark/src/org/apache/pig/builtin/JsonLoader.java URL: http://svn.apache.org/viewvc/pig/branches/spark/src/org/apache/pig/builtin/JsonLoader.java?rev=1733627&r1=1733626&r2=1733627&view=diff ============================================================================== --- pig/branches/spark/src/org/apache/pig/builtin/JsonLoader.java (original) +++ pig/branches/spark/src/org/apache/pig/builtin/JsonLoader.java Fri Mar 4 18:17:39 2016 @@ -25,14 +25,7 @@ import java.util.HashMap; import java.util.List; import java.util.Map; import java.util.Properties; -import java.math.BigDecimal; -import org.joda.time.format.ISODateTimeFormat; -import org.joda.time.format.DateTimeFormatter; -import org.codehaus.jackson.JsonFactory; -import org.codehaus.jackson.JsonParseException; -import org.codehaus.jackson.JsonParser; -import org.codehaus.jackson.JsonToken; import org.apache.hadoop.io.Text; import org.apache.hadoop.mapreduce.InputFormat; import org.apache.hadoop.mapreduce.Job; @@ -56,37 +49,44 @@ import org.apache.pig.data.Tuple; import org.apache.pig.data.TupleFactory; import org.apache.pig.impl.util.UDFContext; import org.apache.pig.impl.util.Utils; +import org.codehaus.jackson.JsonFactory; +import org.codehaus.jackson.JsonParser; +import org.codehaus.jackson.JsonToken; +import org.joda.time.format.DateTimeFormatter; +import org.joda.time.format.ISODateTimeFormat; /** * A loader for data stored using {@link JsonStorage}. This is not a generic * JSON loader. It depends on the schema being stored with the data when * conceivably you could write a loader that determines the schema from the - * JSON. + * JSON. */ public class JsonLoader extends LoadFunc implements LoadMetadata { protected RecordReader reader = null; protected ResourceSchema schema = null; - + private String udfcSignature = null; private JsonFactory jsonFactory = null; private TupleFactory tupleFactory = TupleFactory.getInstance(); private BagFactory bagFactory = BagFactory.getInstance(); - + private static final String SCHEMA_SIGNATURE = "pig.jsonloader.schema"; - + public JsonLoader() { } - + public JsonLoader(String schemaString) throws IOException { schema = new ResourceSchema(Utils.parseSchema(schemaString)); } + @Override public void setLocation(String location, Job job) throws IOException { // Tell our input format where we will be reading from FileInputFormat.setInputPaths(job, location); } - + + @Override @SuppressWarnings("unchecked") public InputFormat getInputFormat() throws IOException { // We will use TextInputFormat, the default Hadoop input format for @@ -95,17 +95,19 @@ public class JsonLoader extends LoadFunc return new TextInputFormat(); } + @Override public LoadCaster getLoadCaster() throws IOException { // We do not expect to do casting of byte arrays, because we will be // returning typed data. return null; } + @Override @SuppressWarnings("unchecked") public void prepareToRead(RecordReader reader, PigSplit split) throws IOException { this.reader = reader; - + // Get the schema string from the UDFContext object. UDFContext udfc = UDFContext.getUDFContext(); Properties p = @@ -121,6 +123,7 @@ public class JsonLoader extends LoadFunc jsonFactory = new JsonFactory(); } + @Override public Tuple getNext() throws IOException { Text val = null; try { @@ -150,31 +153,34 @@ public class JsonLoader extends LoadFunc // isn't what we expect we return a tuple with null fields rather than // throwing an exception. That way a few mangled lines don't fail the // job. - + try { if (p.nextToken() != JsonToken.START_OBJECT) { warn("Bad record, could not find start of record " + val.toString(), PigWarning.UDF_WARNING_1); return t; } - + // Read each field in the record for (int i = 0; i < fields.length; i++) { t.set(i, readField(p, fields[i], i)); } - + if (p.nextToken() != JsonToken.END_OBJECT) { warn("Bad record, could not find end of record " + val.toString(), PigWarning.UDF_WARNING_1); return t; } - + } catch (Exception jpe) { - warn("Bad record, returning null for " + val, PigWarning.UDF_WARNING_1); + Throwable ex = jpe.getCause() == null ? jpe : jpe.getCause(); + warn("Encountered exception " + ex.getClass().getName() + ": " + + ex.getMessage() + ". Bad record, returning null for " + + val, PigWarning.UDF_WARNING_1); } finally { p.close(); } - + return t; } @@ -186,44 +192,44 @@ public class JsonLoader extends LoadFunc // Read based on our expected type case DataType.BOOLEAN: return p.getBooleanValue(); - + case DataType.INTEGER: return p.getIntValue(); - + case DataType.LONG: return p.getLongValue(); - + case DataType.FLOAT: return p.getFloatValue(); - + case DataType.DOUBLE: return p.getDoubleValue(); - + case DataType.DATETIME: DateTimeFormatter formatter = ISODateTimeFormat.dateTimeParser(); return formatter.withOffsetParsed().parseDateTime(p.getText()); - + case DataType.BYTEARRAY: byte[] b = p.getText().getBytes(); // Use the DBA constructor that copies the bytes so that we own // the memory return new DataByteArray(b, 0, b.length); - + case DataType.CHARARRAY: return p.getText(); - + case DataType.BIGINTEGER: return p.getBigIntegerValue(); - + case DataType.BIGDECIMAL: return new BigDecimal(p.getText()); - + default: throw new IOException("Unknown type in input schema: " + field.getType() ); } } - + private Object readField(JsonParser p, ResourceFieldSchema field, int fieldnum) throws IOException { @@ -237,7 +243,7 @@ public class JsonLoader extends LoadFunc // Check to see if this value was null if (tok == JsonToken.VALUE_NULL) return null; - + tok = p.nextToken(); // Read based on our expected type @@ -324,11 +330,13 @@ public class JsonLoader extends LoadFunc } //------------------------------------------------------------------------ - + + @Override public void setUDFContextSignature(String signature) { udfcSignature = signature; } + @Override public ResourceSchema getSchema(String location, Job job) throws IOException { @@ -338,12 +346,12 @@ public class JsonLoader extends LoadFunc } else { // Parse the schema s = (new JsonMetadata()).getSchema(location, job, true); - + if (s == null) { throw new IOException("Unable to parse schema found in file in " + location); } } - + // Now that we have determined the schema, store it in our // UDFContext properties object so we have it when we need it on the // backend @@ -355,18 +363,21 @@ public class JsonLoader extends LoadFunc return s; } - public ResourceStatistics getStatistics(String location, Job job) + @Override + public ResourceStatistics getStatistics(String location, Job job) throws IOException { // We don't implement this one. return null; } + @Override public String[] getPartitionKeys(String location, Job job) throws IOException { // We don't have partitions return null; } + @Override public void setPartitionFilter(Expression partitionFilter) throws IOException { // We don't have partitions Modified: pig/branches/spark/src/org/apache/pig/builtin/LongAvg.java URL: http://svn.apache.org/viewvc/pig/branches/spark/src/org/apache/pig/builtin/LongAvg.java?rev=1733627&r1=1733626&r2=1733627&view=diff ============================================================================== --- pig/branches/spark/src/org/apache/pig/builtin/LongAvg.java (original) +++ pig/branches/spark/src/org/apache/pig/builtin/LongAvg.java Fri Mar 4 18:17:39 2016 @@ -24,19 +24,19 @@ import org.apache.pig.Accumulator; import org.apache.pig.Algebraic; import org.apache.pig.EvalFunc; import org.apache.pig.PigException; +import org.apache.pig.backend.executionengine.ExecException; import org.apache.pig.data.DataBag; import org.apache.pig.data.DataType; import org.apache.pig.data.Tuple; import org.apache.pig.data.TupleFactory; import org.apache.pig.impl.logicalLayer.schema.Schema; -import org.apache.pig.backend.executionengine.ExecException; /** * This method should never be used directly, use {@link AVG}. */ public class LongAvg extends EvalFunc<Double> implements Algebraic, Accumulator<Double> { - + private static TupleFactory mTupleFactory = TupleFactory.getInstance(); @Override @@ -53,21 +53,24 @@ public class LongAvg extends EvalFunc<Do Double avg = null; if (count > 0) avg = new Double(sum / count); - + return avg; } catch (ExecException ee) { throw ee; } } + @Override public String getInitial() { return Initial.class.getName(); } + @Override public String getIntermed() { return Intermediate.class.getName(); } + @Override public String getFinal() { return Final.class.getName(); } @@ -96,9 +99,9 @@ public class LongAvg extends EvalFunc<Do } catch (Exception e) { int errCode = 2106; String msg = "Error while computing average in " + this.getClass().getSimpleName(); - throw new ExecException(msg, errCode, PigException.BUG, e); + throw new ExecException(msg, errCode, PigException.BUG, e); } - + } } @@ -113,7 +116,7 @@ public class LongAvg extends EvalFunc<Do } catch (Exception e) { int errCode = 2106; String msg = "Error while computing average in " + this.getClass().getSimpleName(); - throw new ExecException(msg, errCode, PigException.BUG, e); + throw new ExecException(msg, errCode, PigException.BUG, e); } } } @@ -141,7 +144,7 @@ public class LongAvg extends EvalFunc<Do } catch (Exception e) { int errCode = 2106; String msg = "Error while computing average in " + this.getClass().getSimpleName(); - throw new ExecException(msg, errCode, PigException.BUG, e); + throw new ExecException(msg, errCode, PigException.BUG, e); } } } @@ -162,7 +165,7 @@ public class LongAvg extends EvalFunc<Do Tuple t = it.next(); Long l = (Long)t.get(0); // we count nulls in avg as contributing 0 - // a departure from SQL for performance of + // a departure from SQL for performance of // COUNT() which implemented by just inspecting // size of the bag if(l == null) { @@ -199,7 +202,7 @@ public class LongAvg extends EvalFunc<Do DataBag values = (DataBag)input.get(0); // if we were handed an empty bag, return NULL - if(values.size() == 0) { + if(values == null || values.size() == 0) { return null; } @@ -225,17 +228,17 @@ public class LongAvg extends EvalFunc<Do return null; } } - + @Override public Schema outputSchema(Schema input) { - return new Schema(new Schema.FieldSchema(null, DataType.DOUBLE)); + return new Schema(new Schema.FieldSchema(null, DataType.DOUBLE)); } - + /* Accumulator interface */ - + private Long intermediateSum = null; private Double intermediateCount = null; - + @Override public void accumulate(Tuple b) throws IOException { try { @@ -248,7 +251,7 @@ public class LongAvg extends EvalFunc<Do intermediateSum = 0L; intermediateCount = 0.0; } - + double count = (Long)count(b); if (count > 0) { @@ -260,9 +263,9 @@ public class LongAvg extends EvalFunc<Do } catch (Exception e) { int errCode = 2106; String msg = "Error while computing average in " + this.getClass().getSimpleName(); - throw new ExecException(msg, errCode, PigException.BUG, e); + throw new ExecException(msg, errCode, PigException.BUG, e); } - } + } @Override public void cleanup() { Modified: pig/branches/spark/src/org/apache/pig/builtin/OrcStorage.java URL: http://svn.apache.org/viewvc/pig/branches/spark/src/org/apache/pig/builtin/OrcStorage.java?rev=1733627&r1=1733626&r2=1733627&view=diff ============================================================================== --- pig/branches/spark/src/org/apache/pig/builtin/OrcStorage.java (original) +++ pig/branches/spark/src/org/apache/pig/builtin/OrcStorage.java Fri Mar 4 18:17:39 2016 @@ -37,6 +37,7 @@ import org.apache.commons.logging.LogFac import org.apache.hadoop.fs.FileStatus; import org.apache.hadoop.fs.FileSystem; import org.apache.hadoop.fs.Path; +import org.apache.hadoop.fs.PathFilter; import org.apache.hadoop.hive.conf.HiveConf; import org.apache.hadoop.hive.ql.io.orc.CompressionKind; import org.apache.hadoop.hive.ql.io.orc.OrcFile; @@ -91,7 +92,7 @@ import org.apache.pig.impl.logicalLayer. import org.apache.pig.impl.util.ObjectSerializer; import org.apache.pig.impl.util.UDFContext; import org.apache.pig.impl.util.Utils; -import org.apache.pig.impl.util.orc.OrcUtils; +import org.apache.pig.impl.util.hive.HiveUtils; import org.joda.time.DateTime; import com.esotericsoftware.kryo.io.Input; @@ -235,7 +236,7 @@ public class OrcStorage extends LoadFunc typeInfo = (TypeInfo)ObjectSerializer.deserialize(p.getProperty(signature + SchemaSignatureSuffix)); } if (oi==null) { - oi = OrcUtils.createObjectInspector(typeInfo); + oi = HiveUtils.createObjectInspector(typeInfo); } } @@ -244,7 +245,7 @@ public class OrcStorage extends LoadFunc ResourceFieldSchema fs = new ResourceFieldSchema(); fs.setType(DataType.TUPLE); fs.setSchema(rs); - typeInfo = OrcUtils.getTypeInfo(fs); + typeInfo = HiveUtils.getTypeInfo(fs); Properties p = UDFContext.getUDFContext().getUDFProperties(this.getClass()); p.setProperty(signature + SchemaSignatureSuffix, ObjectSerializer.serialize(typeInfo)); } @@ -376,7 +377,7 @@ public class OrcStorage extends LoadFunc } Object value = in.getCurrentValue(); - Tuple t = (Tuple)OrcUtils.convertOrcToPig(value, oi, mRequiredColumns); + Tuple t = (Tuple)HiveUtils.convertHiveToPig(value, oi, mRequiredColumns); return t; } catch (InterruptedException e) { int errCode = 6018; @@ -406,7 +407,7 @@ public class OrcStorage extends LoadFunc return FuncUtils.getShipFiles(classList); } - private static Path getFirstFile(String location, FileSystem fs) throws IOException { + private static Path getFirstFile(String location, FileSystem fs, PathFilter filter) throws IOException { String[] locations = getPathStrings(location); Path[] paths = new Path[locations.length]; for (int i = 0; i < paths.length; ++i) { @@ -423,7 +424,7 @@ public class OrcStorage extends LoadFunc } FileStatus[] statusArray = (FileStatus[]) statusList .toArray(new FileStatus[statusList.size()]); - Path p = Utils.depthFirstSearchForFile(statusArray, fs); + Path p = Utils.depthFirstSearchForFile(statusArray, fs, filter); return p; } @@ -438,7 +439,7 @@ public class OrcStorage extends LoadFunc } } - ResourceFieldSchema fs = OrcUtils.getResourceFieldSchema(typeInfo); + ResourceFieldSchema fs = HiveUtils.getResourceFieldSchema(typeInfo); return fs.getSchema(); } @@ -456,7 +457,7 @@ public class OrcStorage extends LoadFunc private TypeInfo getTypeInfoFromLocation(String location, Job job) throws IOException { FileSystem fs = FileSystem.get(job.getConfiguration()); - Path path = getFirstFile(location, fs); + Path path = getFirstFile(location, fs, new NonEmptyOrcFileFilter(fs)); if (path == null) { log.info("Cannot find any ORC files from " + location + ". Probably multiple load store in script."); @@ -467,6 +468,28 @@ public class OrcStorage extends LoadFunc return TypeInfoUtils.getTypeInfoFromObjectInspector(oip); } + public static class NonEmptyOrcFileFilter implements PathFilter { + private FileSystem fs; + public NonEmptyOrcFileFilter(FileSystem fs) { + this.fs = fs; + } + @Override + public boolean accept(Path path) { + Reader reader; + try { + reader = OrcFile.createReader(fs, path); + ObjectInspector oip = (ObjectInspector)reader.getObjectInspector(); + ResourceFieldSchema rs = HiveUtils.getResourceFieldSchema(TypeInfoUtils.getTypeInfoFromObjectInspector(oip)); + if (rs.getSchema().getFields().length!=0) { + return true; + } + } catch (IOException e) { + throw new RuntimeException(e); + } + return false; + } + } + @Override public ResourceStatistics getStatistics(String location, Job job) throws IOException { Modified: pig/branches/spark/src/org/apache/pig/builtin/PigStorage.java URL: http://svn.apache.org/viewvc/pig/branches/spark/src/org/apache/pig/builtin/PigStorage.java?rev=1733627&r1=1733626&r2=1733627&view=diff ============================================================================== --- pig/branches/spark/src/org/apache/pig/builtin/PigStorage.java (original) +++ pig/branches/spark/src/org/apache/pig/builtin/PigStorage.java Fri Mar 4 18:17:39 2016 @@ -54,6 +54,7 @@ import org.apache.pig.LoadFunc; import org.apache.pig.LoadMetadata; import org.apache.pig.LoadPushDown; import org.apache.pig.OverwritableStoreFunc; +import org.apache.pig.PigConfiguration; import org.apache.pig.PigException; import org.apache.pig.ResourceSchema; import org.apache.pig.ResourceSchema.ResourceFieldSchema; @@ -67,6 +68,7 @@ import org.apache.pig.backend.hadoop.exe import org.apache.pig.backend.hadoop.executionengine.mapReduceLayer.PigTextInputFormat; import org.apache.pig.backend.hadoop.executionengine.mapReduceLayer.PigTextOutputFormat; import org.apache.pig.backend.hadoop.executionengine.physicalLayer.relationalOperators.POStore; +import org.apache.pig.backend.hadoop.executionengine.shims.HadoopShims; import org.apache.pig.bzip2r.Bzip2TextInputFormat; import org.apache.pig.data.DataByteArray; import org.apache.pig.data.Tuple; @@ -159,6 +161,10 @@ LoadPushDown, LoadMetadata, StoreMetadat private static final String TAG_SOURCE_PATH = "tagPath"; private Path sourcePath = null; + // it determines whether to depend on pig's own Bzip2TextInputFormat or + // to simply depend on hadoop for handling bzip2 inputs + private boolean bzipinput_usehadoops ; + private Options populateValidOptions() { Options validOptions = new Options(); validOptions.addOption("schema", false, "Loads / Stores the schema of the relation using a hidden JSON file."); @@ -419,9 +425,12 @@ LoadPushDown, LoadMetadata, StoreMetadat @Override public InputFormat getInputFormat() { - if(loadLocation.endsWith(".bz2") || loadLocation.endsWith(".bz")) { + if((loadLocation.endsWith(".bz2") || loadLocation.endsWith(".bz")) + && (!bzipinput_usehadoops || !HadoopShims.isHadoopYARN()) ) { + mLog.info("Using Bzip2TextInputFormat"); return new Bzip2TextInputFormat(); } else { + mLog.info("Using PigTextInputFormat"); return new PigTextInputFormat(); } } @@ -439,6 +448,9 @@ LoadPushDown, LoadMetadata, StoreMetadat throws IOException { loadLocation = location; FileInputFormat.setInputPaths(job, location); + bzipinput_usehadoops = job.getConfiguration().getBoolean( + PigConfiguration.PIG_BZIP_USE_HADOOP_INPUTFORMAT, + true ); } @Override Modified: pig/branches/spark/src/org/apache/pig/builtin/PluckTuple.java URL: http://svn.apache.org/viewvc/pig/branches/spark/src/org/apache/pig/builtin/PluckTuple.java?rev=1733627&r1=1733626&r2=1733627&view=diff ============================================================================== --- pig/branches/spark/src/org/apache/pig/builtin/PluckTuple.java (original) +++ pig/branches/spark/src/org/apache/pig/builtin/PluckTuple.java Fri Mar 4 18:17:39 2016 @@ -20,6 +20,7 @@ package org.apache.pig.builtin; import java.io.IOException; import java.util.List; +import java.util.regex.Pattern; import org.apache.pig.EvalFunc; import org.apache.pig.data.DataType; @@ -35,6 +36,8 @@ import com.google.common.collect.Lists; * filter for the columns in a relation that begin with that prefix. * * Example: + * + * 1) Prefix * a = load 'a' as (x, y); * b = load 'b' as (x, y); * c = join a by x, b by x; @@ -44,16 +47,35 @@ import com.google.common.collect.Lists; * c: {a::x: bytearray,a::y: bytearray,b::x: bytearray,b::y: bytearray} * describe d; * d: {plucked::a::x: bytearray,plucked::a::y: bytearray} + * + * 2) Regex + * a = load 'a' as (x, y); + * b = load 'b' as (x, y); + * c = join a by x, b by x; + * DEFINE pluck PluckTuple('.*::y'); + * d = foreach c generate FLATTEN(pluck(*)); + * describe c; + * c: {a::x: bytearray,a::y: bytearray,b::x: bytearray,b::y: bytearray} + * describe d; + * d: {plucked::a::y: bytearray,plucked::a::y: bytearray} */ public class PluckTuple extends EvalFunc<Tuple> { private static final TupleFactory mTupleFactory = TupleFactory.getInstance(); + private Pattern pattern; private boolean isInitialized = false; private int[] indicesToInclude; private String prefix; + private boolean match; public PluckTuple(String prefix) { + this(prefix,"true"); + } + + public PluckTuple(String prefix, String match) { this.prefix = prefix; + this.match = Boolean.valueOf(match); + pattern = Pattern.compile(prefix); } @Override @@ -63,7 +85,10 @@ public class PluckTuple extends EvalFunc Schema inputSchema = getInputSchema(); for (int i = 0; i < inputSchema.size(); i++) { String alias = inputSchema.getField(i).alias; - if (alias.startsWith(prefix)) { + if (this.match && (alias.startsWith(prefix) || pattern.matcher(alias).matches()) ) { + indicesToInclude.add(i); + } + else if (!this.match && !alias.startsWith(prefix) && !pattern.matcher(alias).matches() ){ indicesToInclude.add(i); } } @@ -92,7 +117,10 @@ public class PluckTuple extends EvalFunc } catch (FrontendException e) { throw new RuntimeException(e); // Should never happen } - if (alias.startsWith(prefix)) { + if (this.match && (alias.startsWith(prefix) || pattern.matcher(alias).matches())) { + indicesToInclude.add(i); + } + else if (!this.match && !alias.startsWith(prefix) && !pattern.matcher(alias).matches()){ indicesToInclude.add(i); } } Modified: pig/branches/spark/src/org/apache/pig/builtin/RANDOM.java URL: http://svn.apache.org/viewvc/pig/branches/spark/src/org/apache/pig/builtin/RANDOM.java?rev=1733627&r1=1733626&r2=1733627&view=diff ============================================================================== --- pig/branches/spark/src/org/apache/pig/builtin/RANDOM.java (original) +++ pig/branches/spark/src/org/apache/pig/builtin/RANDOM.java Fri Mar 4 18:17:39 2016 @@ -22,6 +22,10 @@ import java.io.IOException; import java.util.Random; import org.apache.pig.EvalFunc; +import org.apache.pig.PigConstants; +import org.apache.pig.StaticDataCleanup; +import org.apache.pig.backend.hadoop.executionengine.mapReduceLayer.PigMapReduce; +import org.apache.pig.backend.hadoop.executionengine.mapReduceLayer.MRConfiguration; import org.apache.pig.data.Tuple; import org.apache.pig.impl.logicalLayer.schema.Schema; import org.apache.pig.data.DataType; @@ -32,23 +36,60 @@ import org.apache.pig.data.DataType; */ @Nondeterministic public class RANDOM extends EvalFunc<Double>{ - private Random r; + private Random r = null; public RANDOM() { - r = new Random(); } public RANDOM(String seed) { r = new Random(Long.parseLong(seed)); } - @Override - public Double exec(Tuple input) throws IOException { - return r.nextDouble(); - } + @Override + public Double exec(Tuple input) throws IOException { + if( r == null ) { + int jobidhash = PigMapReduce.sJobConfInternal.get().get(MRConfiguration.JOB_ID).hashCode(); + int taskIndex = Integer.valueOf(PigMapReduce.sJobConfInternal.get().get(PigConstants.TASK_INDEX)); + + // XOR-ing 3 separate values + // |<-----32 bits---->|<----32 bits----->| + // |-jobidhash(int)---|-jobidhash(int)---| + // | |---taskIndex(int)--| + // |----------seedUniquifier (long)------| + // | | + // |<-- Only 48 bits used ----->| + // | by java.util.Random | + // | | + // + // Reason for repeating jobidhash and shifting taskIndex is, seed + // too close to each others would produce very similar values. + // + // Goal of this method is to produce a pseudo-random values that + // would + // (1) Produce a same sequence of peusdo-random variables for attempts from same jobid/vertexid and taskid + // (2) When taskid, jobid, or vertexid(tez) differ, they should produce a different random sequence + // (3) When Random is called more than once inside the script, they should also produce different random values + // e.g. B = FOREACH A generate RANDOM(), RANDOM(); + // + r = new Random( (((long) jobidhash) << 32 | (jobidhash & 0xffffffffL)) ^ ((long) taskIndex << 16) ^ seedUniquifier); + + // L'Ecuyer, "Tables of Linear Congruential Generators of + // Different Sizes and Good Lattice Structure", 1999 + seedUniquifier *= 4292484099903637661L; + } + return r.nextDouble(); + } @Override public Schema outputSchema(Schema input) { return new Schema(new Schema.FieldSchema(getSchemaName(this.getClass().getName().toLowerCase(), input), DataType.DOUBLE)); } + + // Taking the initial seed value from java.util.Random + private static long seedUniquifier = 8682522807148012L; + + @StaticDataCleanup + public static void resetSeedUniquifier() { + seedUniquifier = 8682522807148012L; + } } Modified: pig/branches/spark/src/org/apache/pig/builtin/REPLACE.java URL: http://svn.apache.org/viewvc/pig/branches/spark/src/org/apache/pig/builtin/REPLACE.java?rev=1733627&r1=1733626&r2=1733627&view=diff ============================================================================== --- pig/branches/spark/src/org/apache/pig/builtin/REPLACE.java (original) +++ pig/branches/spark/src/org/apache/pig/builtin/REPLACE.java Fri Mar 4 18:17:39 2016 @@ -21,14 +21,15 @@ package org.apache.pig.builtin; import java.io.IOException; import java.util.ArrayList; import java.util.List; +import java.util.regex.Pattern; import org.apache.pig.EvalFunc; import org.apache.pig.FuncSpec; import org.apache.pig.PigWarning; -import org.apache.pig.data.Tuple; import org.apache.pig.data.DataType; -import org.apache.pig.impl.logicalLayer.schema.Schema; +import org.apache.pig.data.Tuple; import org.apache.pig.impl.logicalLayer.FrontendException; +import org.apache.pig.impl.logicalLayer.schema.Schema; /** * REPLACE implements eval function to replace part of a string. @@ -42,6 +43,8 @@ import org.apache.pig.impl.logicalLayer. */ public class REPLACE extends EvalFunc<String> { + private Pattern mPattern = null; + /** * Method invoked on every tuple during foreach evaluation * @param input tuple; first column is assumed to have the column to convert @@ -52,13 +55,29 @@ public class REPLACE extends EvalFunc<St if (input == null || input.size() < 3) return null; - try{ - String source = (String)input.get(0); - String target = (String)input.get(1); - String replacewith = (String)input.get(2); - return source.replaceAll(target, replacewith); + String source = (String)input.get(0); + String target = (String)input.get(1); + + if (target == null) { + warn("Replace : Regular expression is null", PigWarning.UDF_WARNING_1); + return null; + } + + if (mPattern == null || ! target.equals(mPattern.pattern())) { + try { + mPattern = Pattern.compile(target); + } catch (Exception e) { + warn("Replace : Mal-Formed Regular expression : " + target, PigWarning.UDF_WARNING_1); + return null; + } + } + + String replacewith = (String)input.get(2); + + try { + return mPattern.matcher(source).replaceAll(replacewith); }catch(Exception e){ - warn("Failed to process input; error - " + e.getMessage(), PigWarning.UDF_WARNING_1); + warn("Replace : Failed to process input; error - " + e.getMessage(), PigWarning.UDF_WARNING_1); return null; } } Modified: pig/branches/spark/src/org/apache/pig/builtin/RollupDimensions.java URL: http://svn.apache.org/viewvc/pig/branches/spark/src/org/apache/pig/builtin/RollupDimensions.java?rev=1733627&r1=1733626&r2=1733627&view=diff ============================================================================== --- pig/branches/spark/src/org/apache/pig/builtin/RollupDimensions.java (original) +++ pig/branches/spark/src/org/apache/pig/builtin/RollupDimensions.java Fri Mar 4 18:17:39 2016 @@ -47,10 +47,6 @@ public class RollupDimensions extends Ev private static BagFactory bf = BagFactory.getInstance(); private static TupleFactory tf = TupleFactory.getInstance(); private final String allMarker; - // the pivot position - private int pivot = -1; - // to check if rollup is optimized or not - private boolean rollupHIIoptimizable = false; public RollupDimensions() { this(null); @@ -61,18 +57,6 @@ public class RollupDimensions extends Ev this.allMarker = allMarker; } - public void setRollupHIIOptimizable(boolean check) { - this.rollupHIIoptimizable = check; - } - - public boolean getRollupHIIOptimizable() { - return this.rollupHIIoptimizable; - } - - public void setPivot(int pvt) throws IOException { - this.pivot = pvt; - } - @Override public DataBag exec(Tuple tuple) throws IOException { List<Tuple> result = Lists.newArrayListWithCapacity(tuple.size() + 1); @@ -82,32 +66,12 @@ public class RollupDimensions extends Ev return bf.newDefaultBag(result); } - private void iterativelyRollup(List<Tuple> result, Tuple input) - throws IOException { - - Tuple tempTup = tf.newTuple(input.getAll()); - - //if (this.rollupHIIoptimizable != null) { // rule is enabled - if (this.rollupHIIoptimizable == true) { - if (this.pivot == -1) // user did not specify the pivot position - // --> IRG approach - return; - else { // user did specify the pivot position --> IRG + IRG - if (this.pivot == 0) // we use the IRG approach - return; - else { // we use IRG+IRG approach - for (int i = this.pivot - 1; i < input.size(); i++) - tempTup.set(i, allMarker); - result.add(tf.newTuple(tempTup.getAll())); - } - } - } - else { // we can not optimize --> Vanilla approach - for (int i = input.size() - 1; i >= 0; i--) { - tempTup.set(i, allMarker); - result.add(tf.newTuple(tempTup.getAll())); - } - } + private void iterativelyRollup(List<Tuple> result, Tuple input) throws ExecException { + Tuple tempTup = tf.newTuple(input.getAll()); + for (int i = input.size() - 1; i >= 0; i--) { + tempTup.set(i, allMarker); + result.add(tf.newTuple(tempTup.getAll())); + } } @Override Modified: pig/branches/spark/src/org/apache/pig/builtin/StringMax.java URL: http://svn.apache.org/viewvc/pig/branches/spark/src/org/apache/pig/builtin/StringMax.java?rev=1733627&r1=1733626&r2=1733627&view=diff ============================================================================== --- pig/branches/spark/src/org/apache/pig/builtin/StringMax.java (original) +++ pig/branches/spark/src/org/apache/pig/builtin/StringMax.java Fri Mar 4 18:17:39 2016 @@ -45,14 +45,17 @@ public class StringMax extends EvalFunc< } } + @Override public String getInitial() { return Initial.class.getName(); } + @Override public String getIntermed() { return Intermediate.class.getName(); } + @Override public String getFinal() { return Final.class.getName(); } @@ -77,7 +80,7 @@ public class StringMax extends EvalFunc< } catch (Exception e) { int errCode = 2106; String msg = "Error while computing max in " + this.getClass().getSimpleName(); - throw new ExecException(msg, errCode, PigException.BUG, e); + throw new ExecException(msg, errCode, PigException.BUG, e); } } } @@ -94,7 +97,7 @@ public class StringMax extends EvalFunc< } catch (Exception e) { int errCode = 2106; String msg = "Error while computing max in " + this.getClass().getSimpleName(); - throw new ExecException(msg, errCode, PigException.BUG, e); + throw new ExecException(msg, errCode, PigException.BUG, e); } } } @@ -108,17 +111,17 @@ public class StringMax extends EvalFunc< } catch (Exception e) { int errCode = 2106; String msg = "Error while computing max in " + this.getClass().getSimpleName(); - throw new ExecException(msg, errCode, PigException.BUG, e); + throw new ExecException(msg, errCode, PigException.BUG, e); } } } static protected String max(Tuple input) throws ExecException { DataBag values = (DataBag)input.get(0); - + // if we were handed an empty bag, return NULL // this is in compliance with SQL standard - if(values.size() == 0) { + if(values == null || values.size() == 0) { return null; } @@ -129,7 +132,7 @@ public class StringMax extends EvalFunc< Tuple t = it.next(); curMax = (String)(t.get(0)); } - + for (; it.hasNext();) { Tuple t = it.next(); try { @@ -138,26 +141,26 @@ public class StringMax extends EvalFunc< if( s.compareTo(curMax) > 0) { curMax = s; } - + } catch (RuntimeException exp) { int errCode = 2103; String msg = "Problem while computing max of strings."; throw new ExecException(msg, errCode, PigException.BUG, exp); } } - + return curMax; } @Override public Schema outputSchema(Schema input) { - return new Schema(new Schema.FieldSchema(null, DataType.CHARARRAY)); + return new Schema(new Schema.FieldSchema(null, DataType.CHARARRAY)); } /* accumulator interface */ private String intermediateMax = null; - + @Override public void accumulate(Tuple b) throws IOException { try { @@ -166,16 +169,16 @@ public class StringMax extends EvalFunc< return; } // check if it lexicographically follows curMax - if (intermediateMax == null || intermediateMax.compareTo(curMax) > 0) { + if (intermediateMax == null || intermediateMax.compareTo(curMax) < 0) { intermediateMax = curMax; - } + } } catch (ExecException ee) { throw ee; } catch (Exception e) { int errCode = 2106; String msg = "Error while computing max in " + this.getClass().getSimpleName(); - throw new ExecException(msg, errCode, PigException.BUG, e); + throw new ExecException(msg, errCode, PigException.BUG, e); } }
