Modified: 
pig/branches/spark/src/org/apache/pig/backend/hadoop/executionengine/util/CombinerOptimizerUtil.java
URL: 
http://svn.apache.org/viewvc/pig/branches/spark/src/org/apache/pig/backend/hadoop/executionengine/util/CombinerOptimizerUtil.java?rev=1733627&r1=1733626&r2=1733627&view=diff
==============================================================================
--- 
pig/branches/spark/src/org/apache/pig/backend/hadoop/executionengine/util/CombinerOptimizerUtil.java
 (original)
+++ 
pig/branches/spark/src/org/apache/pig/backend/hadoop/executionengine/util/CombinerOptimizerUtil.java
 Fri Mar  4 18:17:39 2016
@@ -206,7 +206,9 @@ public class CombinerOptimizerUtil {
                 // fix projection and function time for algebraic functions in 
reduce foreach
                 for (Pair<PhysicalOperator, PhysicalPlan> op2plan : 
algebraicOps) {
                     setProjectInput(op2plan.first, op2plan.second, 
op2newpos.get(op2plan.first));
+                    byte resultType = op2plan.first.getResultType();
                     
((POUserFunc)op2plan.first).setAlgebraicFunction(POUserFunc.FINAL);
+                    op2plan.first.setResultType(resultType);
                 }
 
                 // we have modified the foreach inner plans - so set them again
@@ -251,7 +253,7 @@ public class CombinerOptimizerUtil {
                 POLocalRearrange mlr = getNewRearrange(rearrange);
                 POPartialAgg mapAgg = null;
                 if (doMapAgg) {
-                    mapAgg = createPartialAgg(cfe);
+                    mapAgg = createPartialAgg(cfe, isGroupAll(rearrange));
                 }
 
                 // A specialized local rearrange operator will replace
@@ -285,16 +287,31 @@ public class CombinerOptimizerUtil {
         }
     }
 
+    private static boolean isGroupAll(POLocalRearrange lr) {
+        // B: Local Rearrange[tuple]{chararray}(false) - scope-3    ->   
scope-14
+        // |   |
+        // |   Constant(all) - scope-4
+        boolean isGroupAll = false;
+        if (lr.getPlans().size() == 1) {
+            PhysicalPlan plan = lr.getPlans().get(0);
+            if (plan.getKeys().size() == 1 && (plan.getRoots().get(0) 
instanceof ConstantExpression)) {
+                ConstantExpression constExpr = (ConstantExpression) 
plan.getRoots().get(0);
+                isGroupAll = ("all").equals(constExpr.getValue());
+            }
+        }
+        return isGroupAll;
+    }
+
     /**
      * Translate POForEach in combiner into a POPartialAgg
      * @param combineFE
      * @return partial aggregate operator
      * @throws CloneNotSupportedException
      */
-    private static POPartialAgg createPartialAgg(POForEach combineFE) throws 
CloneNotSupportedException {
+    private static POPartialAgg createPartialAgg(POForEach combineFE, boolean 
isGroupAll) throws CloneNotSupportedException {
         String scope = combineFE.getOperatorKey().scope;
         POPartialAgg poAgg = new POPartialAgg(new OperatorKey(scope,
-                NodeIdGenerator.getGenerator().getNextNodeId(scope)));
+                NodeIdGenerator.getGenerator().getNextNodeId(scope)), 
isGroupAll);
         poAgg.addOriginalLocation(combineFE.getAlias(), 
combineFE.getOriginalLocations());
         poAgg.setResultType(combineFE.getResultType());
 

Modified: 
pig/branches/spark/src/org/apache/pig/backend/hadoop/hbase/HBaseStorage.java
URL: 
http://svn.apache.org/viewvc/pig/branches/spark/src/org/apache/pig/backend/hadoop/hbase/HBaseStorage.java?rev=1733627&r1=1733626&r2=1733627&view=diff
==============================================================================
--- 
pig/branches/spark/src/org/apache/pig/backend/hadoop/hbase/HBaseStorage.java 
(original)
+++ 
pig/branches/spark/src/org/apache/pig/backend/hadoop/hbase/HBaseStorage.java 
Fri Mar  4 18:17:39 2016
@@ -16,8 +16,6 @@
  */
 package org.apache.pig.backend.hadoop.hbase;
 
-import java.io.DataInput;
-import java.io.DataOutput;
 import java.io.IOException;
 import java.lang.reflect.InvocationTargetException;
 import java.lang.reflect.Method;
@@ -48,8 +46,8 @@ import org.apache.commons.logging.LogFac
 import org.apache.hadoop.conf.Configuration;
 import org.apache.hadoop.fs.Path;
 import org.apache.hadoop.hbase.HBaseConfiguration;
-import org.apache.hadoop.hbase.client.Put;
 import org.apache.hadoop.hbase.client.Delete;
+import org.apache.hadoop.hbase.client.Put;
 import org.apache.hadoop.hbase.client.Result;
 import org.apache.hadoop.hbase.client.Scan;
 import org.apache.hadoop.hbase.filter.BinaryComparator;
@@ -88,6 +86,7 @@ import org.apache.pig.ResourceSchema.Res
 import org.apache.pig.StoreFuncInterface;
 import org.apache.pig.StoreResources;
 import org.apache.pig.backend.hadoop.executionengine.mapReduceLayer.PigSplit;
+import org.apache.pig.backend.hadoop.executionengine.shims.HadoopShims;
 import 
org.apache.pig.backend.hadoop.hbase.HBaseTableInputFormat.HBaseTableIFBuilder;
 import org.apache.pig.builtin.FuncUtils;
 import org.apache.pig.builtin.Utf8StorageConverter;
@@ -205,6 +204,7 @@ public class HBaseStorage extends LoadFu
         validOptions_.addOption("cacheBlocks", true, "Set whether blocks 
should be cached for the scan");
         validOptions_.addOption("caching", true, "Number of rows scanners 
should cache");
         validOptions_.addOption("limit", true, "Per-region limit");
+        validOptions_.addOption("maxResultsPerColumnFamily", true, "Limit the 
maximum number of values returned per row per column family");
         validOptions_.addOption("delim", true, "Column delimiter");
         validOptions_.addOption("ignoreWhitespace", true, "Ignore spaces when 
parsing columns");
         validOptions_.addOption("caster", true, "Caster to use for converting 
values. A class name, " +
@@ -251,6 +251,7 @@ public class HBaseStorage extends LoadFu
      * <li>-lte=maxKeyVal
      * <li>-regex=match regex on KeyVal
      * <li>-limit=numRowsPerRegion max number of rows to retrieve per region
+     * <li>-maxResultsPerColumnFamily= Limit the maximum number of values 
returned per row per column family
      * <li>-delim=char delimiter to use when parsing column names (default is 
space or comma)
      * <li>-ignoreWhitespace=(true|false) ignore spaces when parsing column 
names (default true)
      * <li>-cacheBlocks=(true|false) Set whether blocks should be cached for 
the scan (default false).
@@ -275,7 +276,7 @@ public class HBaseStorage extends LoadFu
             configuredOptions_ = parser_.parse(validOptions_, optsArr);
         } catch (ParseException e) {
             HelpFormatter formatter = new HelpFormatter();
-            formatter.printHelp( "[-loadKey] [-gt] [-gte] [-lt] [-lte] 
[-regex] [-cacheBlocks] [-caching] [-caster] [-noWAL] [-limit] [-delim] 
[-ignoreWhitespace] [-minTimestamp] [-maxTimestamp] [-timestamp] 
[-includeTimestamp] [-includeTombstone]", validOptions_ );
+            formatter.printHelp( "[-loadKey] [-gt] [-gte] [-lt] [-lte] 
[-regex] [-cacheBlocks] [-caching] [-caster] [-noWAL] [-limit] 
[-maxResultsPerColumnFamily] [-delim] [-ignoreWhitespace] [-minTimestamp] 
[-maxTimestamp] [-timestamp] [-includeTimestamp] [-includeTombstone]", 
validOptions_ );
             throw e;
         }
 
@@ -476,6 +477,10 @@ public class HBaseStorage extends LoadFu
         if (configuredOptions_.hasOption("timestamp")){
             scan.setTimeStamp(timestamp_);
         }
+        if (configuredOptions_.hasOption("maxResultsPerColumnFamily")){
+            int maxResultsPerColumnFamily_ = 
Integer.valueOf(configuredOptions_.getOptionValue("maxResultsPerColumnFamily"));
+            scan.setMaxResultsPerColumnFamily(maxResultsPerColumnFamily_);
+        }
 
         // if the group of columnInfos for this family doesn't contain a 
prefix, we don't need
         // to set any filters, we can just call addColumn or addFamily. See 
javadocs below.
@@ -818,7 +823,9 @@ public class HBaseStorage extends LoadFu
         List<Class> classList = new ArrayList<Class>();
         classList.add(org.apache.hadoop.hbase.client.HTable.class); // main 
hbase jar or hbase-client
         classList.add(org.apache.hadoop.hbase.mapreduce.TableSplit.class); // 
main hbase jar or hbase-server
-        classList.add(com.google.common.collect.Lists.class); // guava
+        if (!HadoopShims.isHadoopYARN()) { //Avoid shipping duplicate. Hadoop 
0.23/2 itself has guava
+            classList.add(com.google.common.collect.Lists.class); // guava
+        }
         classList.add(org.apache.zookeeper.ZooKeeper.class); // zookeeper
         // Additional jars that are specific to v0.95.0+
         addClassToList("org.cloudera.htrace.Trace", classList); // htrace
@@ -1052,7 +1059,8 @@ public class HBaseStorage extends LoadFu
      * @throws IOException
      */
     public Delete createDelete(Object key, byte type, long timestamp) throws 
IOException {
-        Delete delete = new Delete(objToBytes(key, type), timestamp);
+        Delete delete = new Delete(objToBytes(key, type));
+        delete.setTimestamp(timestamp);
 
         if(noWAL_) {
             delete.setWriteToWAL(false);
@@ -1188,7 +1196,7 @@ public class HBaseStorage extends LoadFu
         this.requiredFieldList = requiredFieldList;
 
         if (requiredFieldList != null && requiredFields.size() > 
(columnInfo_.size() + colOffset)) {
-            throw new FrontendException("The list of columns to project from 
HBase is larger than HBaseStorage is configured to load.");
+            throw new FrontendException("The list of columns to project from 
HBase (" + requiredFields.size() + ") is larger than HBaseStorage is configured 
to load (" + (columnInfo_.size() + colOffset) + ").");
         }
 
         // remember the projection
@@ -1215,9 +1223,10 @@ public class HBaseStorage extends LoadFu
         return new RequiredFieldResponse(true);
     }
 
+    @Override
     public void ensureAllKeyInstancesInSameSplit() throws IOException {
-        /** 
-         * no-op because hbase keys are unique 
+        /**
+         * no-op because hbase keys are unique
          * This will also work with things like 
DelimitedKeyPrefixRegionSplitPolicy
          * if you need a partial key match to be included in the split
          */
@@ -1232,7 +1241,7 @@ public class HBaseStorage extends LoadFu
             throw new RuntimeException("LoadFunc expected split of type 
TableSplit but was " + split.getClass().getName());
         }
     }
- 
+
 
     /**
      * Class to encapsulate logic around which column names were specified in 
each

Modified: pig/branches/spark/src/org/apache/pig/builtin/AVG.java
URL: 
http://svn.apache.org/viewvc/pig/branches/spark/src/org/apache/pig/builtin/AVG.java?rev=1733627&r1=1733626&r2=1733627&view=diff
==============================================================================
--- pig/branches/spark/src/org/apache/pig/builtin/AVG.java (original)
+++ pig/branches/spark/src/org/apache/pig/builtin/AVG.java Fri Mar  4 18:17:39 
2016
@@ -27,6 +27,7 @@ import org.apache.pig.Algebraic;
 import org.apache.pig.EvalFunc;
 import org.apache.pig.FuncSpec;
 import org.apache.pig.PigException;
+import org.apache.pig.backend.executionengine.ExecException;
 import org.apache.pig.data.DataBag;
 import org.apache.pig.data.DataByteArray;
 import org.apache.pig.data.DataType;
@@ -34,7 +35,6 @@ import org.apache.pig.data.Tuple;
 import org.apache.pig.data.TupleFactory;
 import org.apache.pig.impl.logicalLayer.FrontendException;
 import org.apache.pig.impl.logicalLayer.schema.Schema;
-import org.apache.pig.backend.executionengine.ExecException;
 
 
 /**
@@ -79,14 +79,17 @@ public class AVG extends EvalFunc<Double
         }
     }
 
+    @Override
     public String getInitial() {
         return Initial.class.getName();
     }
 
+    @Override
     public String getIntermed() {
         return Intermediate.class.getName();
     }
 
+    @Override
     public String getFinal() {
         return Final.class.getName();
     }
@@ -230,7 +233,7 @@ public class AVG extends EvalFunc<Double
         DataBag values = (DataBag)input.get(0);
 
         // if we were handed an empty bag, return NULL
-        if(values.size() == 0) {
+        if(values == null || values.size() == 0) {
             return null;
         }
 

Modified: 
pig/branches/spark/src/org/apache/pig/builtin/AlgebraicBigDecimalMathBase.java
URL: 
http://svn.apache.org/viewvc/pig/branches/spark/src/org/apache/pig/builtin/AlgebraicBigDecimalMathBase.java?rev=1733627&r1=1733626&r2=1733627&view=diff
==============================================================================
--- 
pig/branches/spark/src/org/apache/pig/builtin/AlgebraicBigDecimalMathBase.java 
(original)
+++ 
pig/branches/spark/src/org/apache/pig/builtin/AlgebraicBigDecimalMathBase.java 
Fri Mar  4 18:17:39 2016
@@ -18,8 +18,8 @@
 package org.apache.pig.builtin;
 
 import java.io.IOException;
-import java.util.Iterator;
 import java.math.BigDecimal;
+import java.util.Iterator;
 
 import org.apache.pig.Accumulator;
 import org.apache.pig.PigException;
@@ -88,7 +88,7 @@ public abstract class AlgebraicBigDecima
         DataBag values = (DataBag)input.get(0);
         // if we were handed an empty bag, return NULL
         // this is in compliance with SQL standard
-        if(values.size() == 0) {
+        if(values == null || values.size() == 0) {
             return null;
         }
         BigDecimal sofar = 
AlgebraicBigDecimalMathBase.getSeed(opProvider.getOp());

Modified: 
pig/branches/spark/src/org/apache/pig/builtin/AlgebraicBigIntegerMathBase.java
URL: 
http://svn.apache.org/viewvc/pig/branches/spark/src/org/apache/pig/builtin/AlgebraicBigIntegerMathBase.java?rev=1733627&r1=1733626&r2=1733627&view=diff
==============================================================================
--- 
pig/branches/spark/src/org/apache/pig/builtin/AlgebraicBigIntegerMathBase.java 
(original)
+++ 
pig/branches/spark/src/org/apache/pig/builtin/AlgebraicBigIntegerMathBase.java 
Fri Mar  4 18:17:39 2016
@@ -18,8 +18,8 @@
 package org.apache.pig.builtin;
 
 import java.io.IOException;
-import java.util.Iterator;
 import java.math.BigInteger;
+import java.util.Iterator;
 
 import org.apache.pig.Accumulator;
 import org.apache.pig.PigException;
@@ -88,7 +88,7 @@ public abstract class AlgebraicBigIntege
         DataBag values = (DataBag)input.get(0);
         // if we were handed an empty bag, return NULL
         // this is in compliance with SQL standard
-        if(values.size() == 0) {
+        if(values == null || values.size() == 0) {
             return null;
         }
         BigInteger sofar = 
AlgebraicBigIntegerMathBase.getSeed(opProvider.getOp());

Modified: 
pig/branches/spark/src/org/apache/pig/builtin/AlgebraicByteArrayMathBase.java
URL: 
http://svn.apache.org/viewvc/pig/branches/spark/src/org/apache/pig/builtin/AlgebraicByteArrayMathBase.java?rev=1733627&r1=1733626&r2=1733627&view=diff
==============================================================================
--- 
pig/branches/spark/src/org/apache/pig/builtin/AlgebraicByteArrayMathBase.java 
(original)
+++ 
pig/branches/spark/src/org/apache/pig/builtin/AlgebraicByteArrayMathBase.java 
Fri Mar  4 18:17:39 2016
@@ -67,7 +67,7 @@ public abstract class AlgebraicByteArray
         DataBag values = (DataBag)input.get(0);
         // if we were handed an empty bag, return NULL
         // this is in compliance with SQL standard
-        if(values.size() == 0) {
+        if(values == null || values.size() == 0) {
             return null;
         }
         double sofar = AlgebraicByteArrayMathBase.getSeed(opProvider.getOp());

Modified: 
pig/branches/spark/src/org/apache/pig/builtin/AlgebraicDoubleMathBase.java
URL: 
http://svn.apache.org/viewvc/pig/branches/spark/src/org/apache/pig/builtin/AlgebraicDoubleMathBase.java?rev=1733627&r1=1733626&r2=1733627&view=diff
==============================================================================
--- pig/branches/spark/src/org/apache/pig/builtin/AlgebraicDoubleMathBase.java 
(original)
+++ pig/branches/spark/src/org/apache/pig/builtin/AlgebraicDoubleMathBase.java 
Fri Mar  4 18:17:39 2016
@@ -64,7 +64,7 @@ public abstract class AlgebraicDoubleMat
         DataBag values = (DataBag)input.get(0);
         // if we were handed an empty bag, return NULL
         // this is in compliance with SQL standard
-        if(values.size() == 0) {
+        if(values == null || values.size() == 0) {
             return null;
         }
         double sofar = AlgebraicDoubleMathBase.getSeed(opProvider.getOp());

Modified: 
pig/branches/spark/src/org/apache/pig/builtin/AlgebraicFloatMathBase.java
URL: 
http://svn.apache.org/viewvc/pig/branches/spark/src/org/apache/pig/builtin/AlgebraicFloatMathBase.java?rev=1733627&r1=1733626&r2=1733627&view=diff
==============================================================================
--- pig/branches/spark/src/org/apache/pig/builtin/AlgebraicFloatMathBase.java 
(original)
+++ pig/branches/spark/src/org/apache/pig/builtin/AlgebraicFloatMathBase.java 
Fri Mar  4 18:17:39 2016
@@ -64,7 +64,7 @@ public abstract class AlgebraicFloatMath
         DataBag values = (DataBag)input.get(0);
         // if we were handed an empty bag, return NULL
         // this is in compliance with SQL standard
-        if(values.size() == 0) {
+        if(values == null || values.size() == 0) {
             return null;
         }
         Float sofar = AlgebraicFloatMathBase.getSeed(opProvider.getOp());

Modified: 
pig/branches/spark/src/org/apache/pig/builtin/AlgebraicIntMathBase.java
URL: 
http://svn.apache.org/viewvc/pig/branches/spark/src/org/apache/pig/builtin/AlgebraicIntMathBase.java?rev=1733627&r1=1733626&r2=1733627&view=diff
==============================================================================
--- pig/branches/spark/src/org/apache/pig/builtin/AlgebraicIntMathBase.java 
(original)
+++ pig/branches/spark/src/org/apache/pig/builtin/AlgebraicIntMathBase.java Fri 
Mar  4 18:17:39 2016
@@ -64,7 +64,7 @@ public abstract class AlgebraicIntMathBa
         DataBag values = (DataBag)input.get(0);
         // if we were handed an empty bag, return NULL
         // this is in compliance with SQL standard
-        if(values.size() == 0) {
+        if(values == null || values.size() == 0) {
             return null;
         }
         int sofar = AlgebraicIntMathBase.getSeed(opProvider.getOp());

Modified: 
pig/branches/spark/src/org/apache/pig/builtin/AlgebraicLongMathBase.java
URL: 
http://svn.apache.org/viewvc/pig/branches/spark/src/org/apache/pig/builtin/AlgebraicLongMathBase.java?rev=1733627&r1=1733626&r2=1733627&view=diff
==============================================================================
--- pig/branches/spark/src/org/apache/pig/builtin/AlgebraicLongMathBase.java 
(original)
+++ pig/branches/spark/src/org/apache/pig/builtin/AlgebraicLongMathBase.java 
Fri Mar  4 18:17:39 2016
@@ -64,7 +64,7 @@ public abstract class AlgebraicLongMathB
         DataBag values = (DataBag)input.get(0);
         // if we were handed an empty bag, return NULL
         // this is in compliance with SQL standard
-        if(values.size() == 0) {
+        if(values == null || values.size() == 0) {
             return null;
         }
         Long sofar = AlgebraicLongMathBase.getSeed(opProvider.getOp());

Modified: pig/branches/spark/src/org/apache/pig/builtin/AvroStorage.java
URL: 
http://svn.apache.org/viewvc/pig/branches/spark/src/org/apache/pig/builtin/AvroStorage.java?rev=1733627&r1=1733626&r2=1733627&view=diff
==============================================================================
--- pig/branches/spark/src/org/apache/pig/builtin/AvroStorage.java (original)
+++ pig/branches/spark/src/org/apache/pig/builtin/AvroStorage.java Fri Mar  4 
18:17:39 2016
@@ -29,6 +29,7 @@ import org.apache.avro.Schema;
 import org.apache.avro.SchemaParseException;
 import org.apache.avro.Schema.Type;
 import org.apache.avro.file.DataFileStream;
+import org.apache.avro.generic.GenericContainer;
 import org.apache.avro.generic.GenericDatumReader;
 import org.apache.avro.generic.GenericData;
 import org.apache.avro.mapred.AvroInputFormat;
@@ -120,6 +121,8 @@ public class AvroStorage extends LoadFun
    *  <li><code>-schemafile</code> Specifies URL for avro schema file
    *    from which to read the input schema (can be local file, hdfs,
    *    url, etc).</li>
+   *  <li><code>-schemaclass</code> Specifies fully qualified class name for 
avro
+   *    class in your classpath which implements GenericContainer.</li>
    *  <li><code>-examplefile</code> Specifies URL for avro data file from
    *    which to copy the input schema (can be local file, hdfs, url, 
etc).</li>
    *  <li><code>-allowrecursive</code> Option to allow recursive schema
@@ -153,6 +156,9 @@ public class AvroStorage extends LoadFun
         validOptions.addOption("f", "schemafile", true,
             "Specifies URL for avro schema file from which to read "
             + "the input or output schema");
+        validOptions.addOption("c", "schemaclass", true,
+            "Specifies fully qualified class name for avro "
+            + "class in your classpath which implements GenericContainer.");
         validOptions.addOption("e", "examplefile", true,
             "Specifies URL for avro data file from which to copy "
             + "the output schema");
@@ -169,8 +175,14 @@ public class AvroStorage extends LoadFun
         if (configuredOptions.hasOption('f')) {
           try {
             Path p = new Path(configuredOptions.getOptionValue('f'));
+            Configuration conf;
+            if (UDFContext.getUDFContext().getJobConf()==null) {
+                conf = new Configuration();
+            } else {
+                conf = UDFContext.getUDFContext().getJobConf();
+            }
             Schema s = new Schema.Parser()
-              .parse((FileSystem.get(p.toUri(), new 
Configuration()).open(p)));  
+              .parse((FileSystem.get(p.toUri(), conf).open(p)));
             setInputAvroSchema(s);
             setOutputAvroSchema(s);
           } catch (FileNotFoundException fnfe) {
@@ -179,6 +191,25 @@ public class AvroStorage extends LoadFun
                 "schema was described in a local file on the front end, and 
this message " + 
                 "is in the back end log, you can ignore this mesasge.)", fnfe);
           }
+        } else if (configuredOptions.hasOption('c')) {
+          String schemaClass = configuredOptions.getOptionValue('c');
+          try {
+            Schema s = ((GenericContainer) 
Class.forName(schemaClass).newInstance()).getSchema();
+            setInputAvroSchema(s);
+            setOutputAvroSchema(s);
+          } catch (ClassNotFoundException | IllegalAccessException cnfe) {
+            System.err.printf("class not found exception\n");
+            log.error("Schema class '" + schemaClass + "' was not found in the 
classpath.", cnfe);
+            throw new RuntimeException(cnfe);
+          } catch (InstantiationException ie) {
+            System.err.printf("instantiation exception\n");
+            log.error("Schema class '" + schemaClass + "' must have a public 
empty args constructor.", ie);
+            throw new RuntimeException(ie);
+          } catch (ClassCastException cce) {
+            System.err.printf("class cast exception\n");
+            log.error("Schema class '" + schemaClass + "' must implement 
org.apache.avro.generic.GenericContainer interface.", cce);
+            throw new RuntimeException(cce);
+          }
         } else if (configuredOptions.hasOption('e')) {
           setOutputAvroSchema(
               getAvroSchema(configuredOptions.getOptionValue('e'),

Modified: pig/branches/spark/src/org/apache/pig/builtin/BigDecimalAvg.java
URL: 
http://svn.apache.org/viewvc/pig/branches/spark/src/org/apache/pig/builtin/BigDecimalAvg.java?rev=1733627&r1=1733626&r2=1733627&view=diff
==============================================================================
--- pig/branches/spark/src/org/apache/pig/builtin/BigDecimalAvg.java (original)
+++ pig/branches/spark/src/org/apache/pig/builtin/BigDecimalAvg.java Fri Mar  4 
18:17:39 2016
@@ -18,20 +18,20 @@
 package org.apache.pig.builtin;
 
 import java.io.IOException;
-import java.util.Iterator;
 import java.math.BigDecimal;
 import java.math.MathContext;
+import java.util.Iterator;
 
 import org.apache.pig.Accumulator;
 import org.apache.pig.Algebraic;
 import org.apache.pig.EvalFunc;
 import org.apache.pig.PigException;
+import org.apache.pig.backend.executionengine.ExecException;
 import org.apache.pig.data.DataBag;
 import org.apache.pig.data.DataType;
 import org.apache.pig.data.Tuple;
 import org.apache.pig.data.TupleFactory;
 import org.apache.pig.impl.logicalLayer.schema.Schema;
-import org.apache.pig.backend.executionengine.ExecException;
 
 
 /**
@@ -61,14 +61,17 @@ public class BigDecimalAvg extends EvalF
         }
     }
 
+    @Override
     public String getInitial() {
         return Initial.class.getName();
     }
 
+    @Override
     public String getIntermed() {
         return Intermediate.class.getName();
     }
 
+    @Override
     public String getFinal() {
         return Final.class.getName();
     }
@@ -199,7 +202,7 @@ public class BigDecimalAvg extends EvalF
         DataBag values = (DataBag)input.get(0);
 
         // if we were handed an empty bag, return NULL
-        if(values.size() == 0) {
+        if(values == null || values.size() == 0) {
             return null;
         }
 

Modified: pig/branches/spark/src/org/apache/pig/builtin/BigIntegerAvg.java
URL: 
http://svn.apache.org/viewvc/pig/branches/spark/src/org/apache/pig/builtin/BigIntegerAvg.java?rev=1733627&r1=1733626&r2=1733627&view=diff
==============================================================================
--- pig/branches/spark/src/org/apache/pig/builtin/BigIntegerAvg.java (original)
+++ pig/branches/spark/src/org/apache/pig/builtin/BigIntegerAvg.java Fri Mar  4 
18:17:39 2016
@@ -18,21 +18,21 @@
 package org.apache.pig.builtin;
 
 import java.io.IOException;
-import java.util.Iterator;
 import java.math.BigDecimal;
 import java.math.BigInteger;
 import java.math.MathContext;
+import java.util.Iterator;
 
 import org.apache.pig.Accumulator;
 import org.apache.pig.Algebraic;
 import org.apache.pig.EvalFunc;
 import org.apache.pig.PigException;
+import org.apache.pig.backend.executionengine.ExecException;
 import org.apache.pig.data.DataBag;
 import org.apache.pig.data.DataType;
 import org.apache.pig.data.Tuple;
 import org.apache.pig.data.TupleFactory;
 import org.apache.pig.impl.logicalLayer.schema.Schema;
-import org.apache.pig.backend.executionengine.ExecException;
 
 
 /**
@@ -62,14 +62,17 @@ public class BigIntegerAvg extends EvalF
         }
     }
 
+    @Override
     public String getInitial() {
         return Initial.class.getName();
     }
 
+    @Override
     public String getIntermed() {
         return Intermediate.class.getName();
     }
 
+    @Override
     public String getFinal() {
         return Final.class.getName();
     }
@@ -201,7 +204,7 @@ public class BigIntegerAvg extends EvalF
         DataBag values = (DataBag)input.get(0);
 
         // if we were handed an empty bag, return NULL
-        if (values.size() == 0) {
+        if (values == null || values.size() == 0) {
             return null;
         }
 

Modified: pig/branches/spark/src/org/apache/pig/builtin/DateTimeMax.java
URL: 
http://svn.apache.org/viewvc/pig/branches/spark/src/org/apache/pig/builtin/DateTimeMax.java?rev=1733627&r1=1733626&r2=1733627&view=diff
==============================================================================
--- pig/branches/spark/src/org/apache/pig/builtin/DateTimeMax.java (original)
+++ pig/branches/spark/src/org/apache/pig/builtin/DateTimeMax.java Fri Mar  4 
18:17:39 2016
@@ -20,8 +20,6 @@ package org.apache.pig.builtin;
 import java.io.IOException;
 import java.util.Iterator;
 
-import org.joda.time.DateTime;
-
 import org.apache.pig.Accumulator;
 import org.apache.pig.Algebraic;
 import org.apache.pig.EvalFunc;
@@ -32,6 +30,7 @@ import org.apache.pig.data.DataType;
 import org.apache.pig.data.Tuple;
 import org.apache.pig.data.TupleFactory;
 import org.apache.pig.impl.logicalLayer.schema.Schema;
+import org.joda.time.DateTime;
 
 /**
  * This method should never be used directly, use {@link MAX}.
@@ -47,14 +46,17 @@ public class DateTimeMax extends EvalFun
         }
     }
 
+    @Override
     public String getInitial() {
         return Initial.class.getName();
     }
 
+    @Override
     public String getIntermed() {
         return Intermediate.class.getName();
     }
 
+    @Override
     public String getFinal() {
         return Final.class.getName();
     }
@@ -120,7 +122,7 @@ public class DateTimeMax extends EvalFun
 
         // if we were handed an empty bag, return NULL
         // this is in compliance with SQL standard
-        if(values.size() == 0) {
+        if(values == null || values.size() == 0) {
             return null;
         }
 
@@ -153,7 +155,7 @@ public class DateTimeMax extends EvalFun
 
     @Override
     public Schema outputSchema(Schema input) {
-        return new Schema(new Schema.FieldSchema(null, DataType.DATETIME)); 
+        return new Schema(new Schema.FieldSchema(null, DataType.DATETIME));
     }
 
 

Modified: pig/branches/spark/src/org/apache/pig/builtin/DateTimeMin.java
URL: 
http://svn.apache.org/viewvc/pig/branches/spark/src/org/apache/pig/builtin/DateTimeMin.java?rev=1733627&r1=1733626&r2=1733627&view=diff
==============================================================================
--- pig/branches/spark/src/org/apache/pig/builtin/DateTimeMin.java (original)
+++ pig/branches/spark/src/org/apache/pig/builtin/DateTimeMin.java Fri Mar  4 
18:17:39 2016
@@ -20,8 +20,6 @@ package org.apache.pig.builtin;
 import java.io.IOException;
 import java.util.Iterator;
 
-import org.joda.time.DateTime;
-
 import org.apache.pig.Accumulator;
 import org.apache.pig.Algebraic;
 import org.apache.pig.EvalFunc;
@@ -32,6 +30,7 @@ import org.apache.pig.data.DataType;
 import org.apache.pig.data.Tuple;
 import org.apache.pig.data.TupleFactory;
 import org.apache.pig.impl.logicalLayer.schema.Schema;
+import org.joda.time.DateTime;
 
 /**
  * This method should never be used directly, use {@link MAX}.
@@ -47,14 +46,17 @@ public class DateTimeMin extends EvalFun
         }
     }
 
+    @Override
     public String getInitial() {
         return Initial.class.getName();
     }
 
+    @Override
     public String getIntermed() {
         return Intermediate.class.getName();
     }
 
+    @Override
     public String getFinal() {
         return Final.class.getName();
     }
@@ -120,7 +122,7 @@ public class DateTimeMin extends EvalFun
 
         // if we were handed an empty bag, return NULL
         // this is in compliance with SQL standard
-        if(values.size() == 0) {
+        if(values == null || values.size() == 0) {
             return null;
         }
 

Modified: pig/branches/spark/src/org/apache/pig/builtin/Distinct.java
URL: 
http://svn.apache.org/viewvc/pig/branches/spark/src/org/apache/pig/builtin/Distinct.java?rev=1733627&r1=1733626&r2=1733627&view=diff
==============================================================================
--- pig/branches/spark/src/org/apache/pig/builtin/Distinct.java (original)
+++ pig/branches/spark/src/org/apache/pig/builtin/Distinct.java Fri Mar  4 
18:17:39 2016
@@ -22,9 +22,7 @@ import java.io.IOException;
 
 import org.apache.pig.Algebraic;
 import org.apache.pig.EvalFunc;
-import org.apache.pig.JVMReuseManager;
 import org.apache.pig.PigConfiguration;
-import org.apache.pig.StaticDataCleanup;
 import org.apache.pig.backend.executionengine.ExecException;
 import 
org.apache.pig.backend.hadoop.executionengine.mapReduceLayer.PigMapReduce;
 import org.apache.pig.data.BagFactory;
@@ -45,16 +43,6 @@ public class Distinct  extends EvalFunc<
     private static boolean initialized = false;
     private static boolean useDefaultBag = false;
 
-    static {
-        
JVMReuseManager.getInstance().registerForStaticDataCleanup(Distinct.class);
-    }
-
-    @StaticDataCleanup
-    public static void staticDataCleanup() {
-        initialized = false;
-        useDefaultBag = false;
-    }
-
     @Override
     public DataBag exec(Tuple input) throws IOException {
         return getDistinct(input);

Modified: pig/branches/spark/src/org/apache/pig/builtin/DoubleAvg.java
URL: 
http://svn.apache.org/viewvc/pig/branches/spark/src/org/apache/pig/builtin/DoubleAvg.java?rev=1733627&r1=1733626&r2=1733627&view=diff
==============================================================================
--- pig/branches/spark/src/org/apache/pig/builtin/DoubleAvg.java (original)
+++ pig/branches/spark/src/org/apache/pig/builtin/DoubleAvg.java Fri Mar  4 
18:17:39 2016
@@ -18,28 +18,25 @@
 package org.apache.pig.builtin;
 
 import java.io.IOException;
-import java.util.HashMap;
 import java.util.Iterator;
 
 import org.apache.pig.Accumulator;
 import org.apache.pig.Algebraic;
 import org.apache.pig.EvalFunc;
-import org.apache.pig.FuncSpec;
 import org.apache.pig.PigException;
+import org.apache.pig.backend.executionengine.ExecException;
 import org.apache.pig.data.DataBag;
 import org.apache.pig.data.DataType;
 import org.apache.pig.data.Tuple;
 import org.apache.pig.data.TupleFactory;
-import org.apache.pig.impl.logicalLayer.FrontendException;
 import org.apache.pig.impl.logicalLayer.schema.Schema;
-import org.apache.pig.backend.executionengine.ExecException;
 
 
 /**
  * This method should never be used directly, use {@link AVG}.
  */
 public class DoubleAvg extends EvalFunc<Double> implements Algebraic, 
Accumulator<Double> {
-    
+
     private static TupleFactory mTupleFactory = TupleFactory.getInstance();
 
     @Override
@@ -56,21 +53,24 @@ public class DoubleAvg extends EvalFunc<
             Double avg = null;
             if (count > 0)
                 avg = new Double(sum / count);
-    
+
             return avg;
         } catch (ExecException ee) {
             throw ee;
         }
     }
 
+    @Override
     public String getInitial() {
         return Initial.class.getName();
     }
 
+    @Override
     public String getIntermed() {
         return Intermediate.class.getName();
     }
 
+    @Override
     public String getFinal() {
         return Final.class.getName();
     }
@@ -91,7 +91,7 @@ public class DoubleAvg extends EvalFunc<
                 t.set(0, d);
                 if (d != null){
                     t.set(1, 1L);
-                }else{    
+                } else {
                     t.set(1, 0L);
                 }
                 return t;
@@ -100,9 +100,9 @@ public class DoubleAvg extends EvalFunc<
             } catch (Exception e) {
                 int errCode = 2106;
                 String msg = "Error while computing average in " + 
this.getClass().getSimpleName();
-                throw new ExecException(msg, errCode, PigException.BUG, e);    
       
+                throw new ExecException(msg, errCode, PigException.BUG, e);
             }
-                
+
         }
     }
 
@@ -117,7 +117,7 @@ public class DoubleAvg extends EvalFunc<
             } catch (Exception e) {
                 int errCode = 2106;
                 String msg = "Error while computing average in " + 
this.getClass().getSimpleName();
-                throw new ExecException(msg, errCode, PigException.BUG, e);    
        
+                throw new ExecException(msg, errCode, PigException.BUG, e);
             }
         }
     }
@@ -145,7 +145,7 @@ public class DoubleAvg extends EvalFunc<
             } catch (Exception e) {
                 int errCode = 2106;
                 String msg = "Error while computing average in " + 
this.getClass().getSimpleName();
-                throw new ExecException(msg, errCode, PigException.BUG, e);    
        
+                throw new ExecException(msg, errCode, PigException.BUG, e);
             }
         }
     }
@@ -166,7 +166,7 @@ public class DoubleAvg extends EvalFunc<
             Tuple t = it.next();
             Double d = (Double)t.get(0);
             // we count nulls in avg as contributing 0
-            // a departure from SQL for performance of 
+            // a departure from SQL for performance of
             // COUNT() which implemented by just inspecting
             // size of the bag
             if(d == null) {
@@ -200,9 +200,9 @@ public class DoubleAvg extends EvalFunc<
 
     static protected Double sum(Tuple input) throws ExecException, IOException 
{
         DataBag values = (DataBag)input.get(0);
-        
+
         // if we were handed an empty bag, return NULL
-        if(values.size() == 0) {
+        if(values == null || values.size() == 0) {
             return null;
         }
 
@@ -228,17 +228,17 @@ public class DoubleAvg extends EvalFunc<
             return null;
         }
     }
-    
+
     @Override
     public Schema outputSchema(Schema input) {
-        return new Schema(new Schema.FieldSchema(null, DataType.DOUBLE)); 
+        return new Schema(new Schema.FieldSchema(null, DataType.DOUBLE));
     }
-    
+
     /* Accumulator interface */
-    
+
     private Double intermediateSum = null;
     private Double intermediateCount = null;
-    
+
     @Override
     public void accumulate(Tuple b) throws IOException {
         try {
@@ -251,7 +251,7 @@ public class DoubleAvg extends EvalFunc<
                 intermediateSum = 0.0;
                 intermediateCount = 0.0;
             }
-            
+
             double count = (Long)count(b);
 
             if (count > 0) {
@@ -263,9 +263,9 @@ public class DoubleAvg extends EvalFunc<
         } catch (Exception e) {
             int errCode = 2106;
             String msg = "Error while computing average in " + 
this.getClass().getSimpleName();
-            throw new ExecException(msg, errCode, PigException.BUG, e);        
   
+            throw new ExecException(msg, errCode, PigException.BUG, e);
         }
-    }        
+    }
 
     @Override
     public void cleanup() {
@@ -280,5 +280,5 @@ public class DoubleAvg extends EvalFunc<
             avg = new Double(intermediateSum / intermediateCount);
         }
         return avg;
-    }    
+    }
 }

Modified: pig/branches/spark/src/org/apache/pig/builtin/FloatAvg.java
URL: 
http://svn.apache.org/viewvc/pig/branches/spark/src/org/apache/pig/builtin/FloatAvg.java?rev=1733627&r1=1733626&r2=1733627&view=diff
==============================================================================
--- pig/branches/spark/src/org/apache/pig/builtin/FloatAvg.java (original)
+++ pig/branches/spark/src/org/apache/pig/builtin/FloatAvg.java Fri Mar  4 
18:17:39 2016
@@ -24,19 +24,19 @@ import org.apache.pig.Accumulator;
 import org.apache.pig.Algebraic;
 import org.apache.pig.EvalFunc;
 import org.apache.pig.PigException;
+import org.apache.pig.backend.executionengine.ExecException;
 import org.apache.pig.data.DataBag;
 import org.apache.pig.data.DataType;
 import org.apache.pig.data.Tuple;
 import org.apache.pig.data.TupleFactory;
 import org.apache.pig.impl.logicalLayer.schema.Schema;
-import org.apache.pig.backend.executionengine.ExecException;
 
 
 /**
  * This method should never be used directly, use {@link AVG}.
  */
 public class FloatAvg extends EvalFunc<Double> implements Algebraic, 
Accumulator<Double> {
-    
+
     private static TupleFactory mTupleFactory = TupleFactory.getInstance();
 
     @Override
@@ -53,21 +53,24 @@ public class FloatAvg extends EvalFunc<D
             Double avg = null;
             if (count > 0)
                 avg = new Double(sum / count);
-    
+
             return avg;
         } catch (ExecException ee) {
             throw ee;
         }
     }
 
+    @Override
     public String getInitial() {
         return Initial.class.getName();
     }
 
+    @Override
     public String getIntermed() {
         return Intermediate.class.getName();
     }
 
+    @Override
     public String getFinal() {
         return Final.class.getName();
     }
@@ -96,9 +99,9 @@ public class FloatAvg extends EvalFunc<D
             } catch (Exception e) {
                 int errCode = 2106;
                 String msg = "Error while computing average in " + 
this.getClass().getSimpleName();
-                throw new ExecException(msg, errCode, PigException.BUG, e);    
       
+                throw new ExecException(msg, errCode, PigException.BUG, e);
             }
-                
+
         }
     }
 
@@ -113,7 +116,7 @@ public class FloatAvg extends EvalFunc<D
             } catch (Exception e) {
                 int errCode = 2106;
                 String msg = "Error while computing average in " + 
this.getClass().getSimpleName();
-                throw new ExecException(msg, errCode, PigException.BUG, e);    
       
+                throw new ExecException(msg, errCode, PigException.BUG, e);
             }
         }
     }
@@ -141,7 +144,7 @@ public class FloatAvg extends EvalFunc<D
             } catch (Exception e) {
                 int errCode = 2106;
                 String msg = "Error while computing average in " + 
this.getClass().getSimpleName();
-                throw new ExecException(msg, errCode, PigException.BUG, e);    
       
+                throw new ExecException(msg, errCode, PigException.BUG, e);
             }
         }
     }
@@ -162,7 +165,7 @@ public class FloatAvg extends EvalFunc<D
             Tuple t = it.next();
             Double d = (Double)t.get(0);
             // we count nulls in avg as contributing 0
-            // a departure from SQL for performance of 
+            // a departure from SQL for performance of
             // COUNT() which implemented by just inspecting
             // size of the bag
             if(d == null) {
@@ -199,7 +202,7 @@ public class FloatAvg extends EvalFunc<D
         DataBag values = (DataBag)input.get(0);
 
         // if we were handed an empty bag, return NULL
-        if(values.size() == 0) {
+        if(values == null || values.size() == 0) {
             return null;
         }
 
@@ -225,17 +228,17 @@ public class FloatAvg extends EvalFunc<D
             return null;
         }
     }
-    
+
     @Override
     public Schema outputSchema(Schema input) {
-        return new Schema(new Schema.FieldSchema(null, DataType.DOUBLE)); 
+        return new Schema(new Schema.FieldSchema(null, DataType.DOUBLE));
     }
-    
+
     /* Accumulator interface */
 
     private Double intermediateSum = null;
     private Double intermediateCount = null;
-    
+
     @Override
     public void accumulate(Tuple b) throws IOException {
         try {
@@ -248,9 +251,9 @@ public class FloatAvg extends EvalFunc<D
                 intermediateSum = 0.0;
                 intermediateCount = 0.0;
             }
-            
+
             double count = (Long)count(b);
-            
+
             if (count > 0) {
                 intermediateCount += count;
                 intermediateSum += sum;
@@ -260,9 +263,9 @@ public class FloatAvg extends EvalFunc<D
         } catch (Exception e) {
             int errCode = 2106;
             String msg = "Error while computing average in " + 
this.getClass().getSimpleName();
-            throw new ExecException(msg, errCode, PigException.BUG, e);        
   
+            throw new ExecException(msg, errCode, PigException.BUG, e);
         }
-    }        
+    }
 
     @Override
     public void cleanup() {
@@ -277,6 +280,6 @@ public class FloatAvg extends EvalFunc<D
             avg = new Double(intermediateSum / intermediateCount);
         }
         return avg;
-    }    
+    }
 
 }

Modified: pig/branches/spark/src/org/apache/pig/builtin/IntAvg.java
URL: 
http://svn.apache.org/viewvc/pig/branches/spark/src/org/apache/pig/builtin/IntAvg.java?rev=1733627&r1=1733626&r2=1733627&view=diff
==============================================================================
--- pig/branches/spark/src/org/apache/pig/builtin/IntAvg.java (original)
+++ pig/branches/spark/src/org/apache/pig/builtin/IntAvg.java Fri Mar  4 
18:17:39 2016
@@ -24,19 +24,19 @@ import org.apache.pig.Accumulator;
 import org.apache.pig.Algebraic;
 import org.apache.pig.EvalFunc;
 import org.apache.pig.PigException;
+import org.apache.pig.backend.executionengine.ExecException;
 import org.apache.pig.data.DataBag;
 import org.apache.pig.data.DataType;
 import org.apache.pig.data.Tuple;
 import org.apache.pig.data.TupleFactory;
 import org.apache.pig.impl.logicalLayer.schema.Schema;
-import org.apache.pig.backend.executionengine.ExecException;
 
 
 /**
  * This method should never be used directly, use {@link AVG}.
  */
 public class IntAvg extends EvalFunc<Double> implements Algebraic, 
Accumulator<Double> {
-    
+
     private static TupleFactory mTupleFactory = TupleFactory.getInstance();
 
     @Override
@@ -53,21 +53,24 @@ public class IntAvg extends EvalFunc<Dou
             Double avg = null;
             if (count > 0)
                 avg = new Double(sum / count);
-    
+
             return avg;
         } catch (ExecException ee) {
             throw ee;
         }
     }
 
+    @Override
     public String getInitial() {
         return Initial.class.getName();
     }
 
+    @Override
     public String getIntermed() {
         return Intermediate.class.getName();
     }
 
+    @Override
     public String getFinal() {
         return Final.class.getName();
     }
@@ -75,7 +78,7 @@ public class IntAvg extends EvalFunc<Dou
     static public class Initial extends EvalFunc<Tuple> {
         @Override
         public Tuple exec(Tuple input) throws IOException {
-            
+
             try {
                 Tuple t = mTupleFactory.newTuple(2);
                 // input is a bag with one tuple containing
@@ -97,9 +100,9 @@ public class IntAvg extends EvalFunc<Dou
             } catch (Exception e) {
                 int errCode = 2106;
                 String msg = "Error while computing average in " + 
this.getClass().getSimpleName();
-                throw new ExecException(msg, errCode, PigException.BUG, e);    
       
+                throw new ExecException(msg, errCode, PigException.BUG, e);
             }
-                
+
         }
     }
 
@@ -114,7 +117,7 @@ public class IntAvg extends EvalFunc<Dou
             } catch (Exception e) {
                 int errCode = 2106;
                 String msg = "Error while computing average in " + 
this.getClass().getSimpleName();
-                throw new ExecException(msg, errCode, PigException.BUG, e);    
       
+                throw new ExecException(msg, errCode, PigException.BUG, e);
             }
         }
     }
@@ -142,7 +145,7 @@ public class IntAvg extends EvalFunc<Dou
             } catch (Exception e) {
                 int errCode = 2106;
                 String msg = "Error while computing average in " + 
this.getClass().getSimpleName();
-                throw new ExecException(msg, errCode, PigException.BUG, e);    
       
+                throw new ExecException(msg, errCode, PigException.BUG, e);
             }
         }
     }
@@ -163,7 +166,7 @@ public class IntAvg extends EvalFunc<Dou
             Tuple t = it.next();
             Long l = (Long)t.get(0);
             // we count nulls in avg as contributing 0
-            // a departure from SQL for performance of 
+            // a departure from SQL for performance of
             // COUNT() which implemented by just inspecting
             // size of the bag
             if(l == null) {
@@ -200,7 +203,7 @@ public class IntAvg extends EvalFunc<Dou
         DataBag values = (DataBag)input.get(0);
 
         // if we were handed an empty bag, return NULL
-        if(values.size() == 0) {
+        if(values == null || values.size() == 0) {
             return null;
         }
 
@@ -211,7 +214,7 @@ public class IntAvg extends EvalFunc<Dou
             try {
                 Integer i = (Integer)(t.get(0));
                 // we count nulls in avg as contributing 0
-                // a departure from SQL for performance of 
+                // a departure from SQL for performance of
                 // COUNT() which implemented by just inspecting
                 // size of the bag
                 if (i == null) continue;
@@ -230,17 +233,17 @@ public class IntAvg extends EvalFunc<Dou
             return null;
         }
     }
-    
+
     @Override
     public Schema outputSchema(Schema input) {
-        return new Schema(new Schema.FieldSchema(null, DataType.DOUBLE)); 
+        return new Schema(new Schema.FieldSchema(null, DataType.DOUBLE));
     }
 
     /* Accumulator interface */
 
     private Long intermediateSum = null;
     private Double intermediateCount = null;
-    
+
     @Override
     public void accumulate(Tuple b) throws IOException {
         try {
@@ -253,7 +256,7 @@ public class IntAvg extends EvalFunc<Dou
                 intermediateSum = 0L;
                 intermediateCount = 0.0;
             }
-            
+
             double count = (Long)count(b);
 
             if (count > 0) {
@@ -265,9 +268,9 @@ public class IntAvg extends EvalFunc<Dou
         } catch (Exception e) {
             int errCode = 2106;
             String msg = "Error while computing average in " + 
this.getClass().getSimpleName();
-            throw new ExecException(msg, errCode, PigException.BUG, e);        
   
+            throw new ExecException(msg, errCode, PigException.BUG, e);
         }
-    }        
+    }
 
     @Override
     public void cleanup() {

Modified: pig/branches/spark/src/org/apache/pig/builtin/JsonLoader.java
URL: 
http://svn.apache.org/viewvc/pig/branches/spark/src/org/apache/pig/builtin/JsonLoader.java?rev=1733627&r1=1733626&r2=1733627&view=diff
==============================================================================
--- pig/branches/spark/src/org/apache/pig/builtin/JsonLoader.java (original)
+++ pig/branches/spark/src/org/apache/pig/builtin/JsonLoader.java Fri Mar  4 
18:17:39 2016
@@ -25,14 +25,7 @@ import java.util.HashMap;
 import java.util.List;
 import java.util.Map;
 import java.util.Properties;
-import java.math.BigDecimal;
 
-import org.joda.time.format.ISODateTimeFormat;
-import org.joda.time.format.DateTimeFormatter;
-import org.codehaus.jackson.JsonFactory;
-import org.codehaus.jackson.JsonParseException;
-import org.codehaus.jackson.JsonParser;
-import org.codehaus.jackson.JsonToken;
 import org.apache.hadoop.io.Text;
 import org.apache.hadoop.mapreduce.InputFormat;
 import org.apache.hadoop.mapreduce.Job;
@@ -56,37 +49,44 @@ import org.apache.pig.data.Tuple;
 import org.apache.pig.data.TupleFactory;
 import org.apache.pig.impl.util.UDFContext;
 import org.apache.pig.impl.util.Utils;
+import org.codehaus.jackson.JsonFactory;
+import org.codehaus.jackson.JsonParser;
+import org.codehaus.jackson.JsonToken;
+import org.joda.time.format.DateTimeFormatter;
+import org.joda.time.format.ISODateTimeFormat;
 
 /**
  * A loader for data stored using {@link JsonStorage}.  This is not a generic
  * JSON loader. It depends on the schema being stored with the data when
  * conceivably you could write a loader that determines the schema from the
- * JSON. 
+ * JSON.
  */
 public class JsonLoader extends LoadFunc implements LoadMetadata {
 
     protected RecordReader reader = null;
     protected ResourceSchema schema = null;
-    
+
     private String udfcSignature = null;
     private JsonFactory jsonFactory = null;
     private TupleFactory tupleFactory = TupleFactory.getInstance();
     private BagFactory bagFactory = BagFactory.getInstance();
-    
+
     private static final String SCHEMA_SIGNATURE = "pig.jsonloader.schema";
-    
+
     public JsonLoader() {
     }
-    
+
     public JsonLoader(String schemaString) throws IOException {
         schema = new ResourceSchema(Utils.parseSchema(schemaString));
     }
 
+    @Override
     public void setLocation(String location, Job job) throws IOException {
         // Tell our input format where we will be reading from
         FileInputFormat.setInputPaths(job, location);
     }
-    
+
+    @Override
     @SuppressWarnings("unchecked")
     public InputFormat getInputFormat() throws IOException {
         // We will use TextInputFormat, the default Hadoop input format for
@@ -95,17 +95,19 @@ public class JsonLoader extends LoadFunc
         return new TextInputFormat();
     }
 
+    @Override
     public LoadCaster getLoadCaster() throws IOException {
         // We do not expect to do casting of byte arrays, because we will be
         // returning typed data.
         return null;
     }
 
+    @Override
     @SuppressWarnings("unchecked")
     public void prepareToRead(RecordReader reader, PigSplit split)
     throws IOException {
         this.reader = reader;
-        
+
         // Get the schema string from the UDFContext object.
         UDFContext udfc = UDFContext.getUDFContext();
         Properties p =
@@ -121,6 +123,7 @@ public class JsonLoader extends LoadFunc
         jsonFactory = new JsonFactory();
     }
 
+    @Override
     public Tuple getNext() throws IOException {
         Text val = null;
         try {
@@ -150,31 +153,34 @@ public class JsonLoader extends LoadFunc
         // isn't what we expect we return a tuple with null fields rather than
         // throwing an exception.  That way a few mangled lines don't fail the
         // job.
-        
+
         try {
             if (p.nextToken() != JsonToken.START_OBJECT) {
                 warn("Bad record, could not find start of record " +
                     val.toString(), PigWarning.UDF_WARNING_1);
                 return t;
             }
-    
+
             // Read each field in the record
             for (int i = 0; i < fields.length; i++) {
                 t.set(i, readField(p, fields[i], i));
             }
-    
+
             if (p.nextToken() != JsonToken.END_OBJECT) {
                 warn("Bad record, could not find end of record " +
                     val.toString(), PigWarning.UDF_WARNING_1);
                 return t;
             }
-            
+
         } catch (Exception jpe) {
-            warn("Bad record, returning null for " + val, 
PigWarning.UDF_WARNING_1);
+            Throwable ex = jpe.getCause() == null ? jpe : jpe.getCause();
+            warn("Encountered exception " + ex.getClass().getName() + ": "
+                    + ex.getMessage() + ". Bad record, returning null for "
+                    + val, PigWarning.UDF_WARNING_1);
         } finally {
             p.close();
         }
-        
+
         return t;
     }
 
@@ -186,44 +192,44 @@ public class JsonLoader extends LoadFunc
             // Read based on our expected type
             case DataType.BOOLEAN:
                 return p.getBooleanValue();
-    
+
             case DataType.INTEGER:
                 return p.getIntValue();
-    
+
             case DataType.LONG:
                 return p.getLongValue();
-    
+
             case DataType.FLOAT:
                 return p.getFloatValue();
-    
+
             case DataType.DOUBLE:
                 return p.getDoubleValue();
-    
+
             case DataType.DATETIME:
                 DateTimeFormatter formatter = 
ISODateTimeFormat.dateTimeParser();
                 return formatter.withOffsetParsed().parseDateTime(p.getText());
-    
+
             case DataType.BYTEARRAY:
                 byte[] b = p.getText().getBytes();
                 // Use the DBA constructor that copies the bytes so that we own
                 // the memory
                 return new DataByteArray(b, 0, b.length);
-    
+
             case DataType.CHARARRAY:
                 return p.getText();
-    
+
             case DataType.BIGINTEGER:
                 return p.getBigIntegerValue();
-    
+
             case DataType.BIGDECIMAL:
                 return new BigDecimal(p.getText());
-    
+
             default:
                 throw new IOException("Unknown type in input schema: " +
                         field.getType() );
         }
     }
-    
+
     private Object readField(JsonParser p,
                              ResourceFieldSchema field,
                              int fieldnum) throws IOException {
@@ -237,7 +243,7 @@ public class JsonLoader extends LoadFunc
 
         // Check to see if this value was null
         if (tok == JsonToken.VALUE_NULL) return null;
-        
+
         tok = p.nextToken();
 
         // Read based on our expected type
@@ -324,11 +330,13 @@ public class JsonLoader extends LoadFunc
     }
 
     //------------------------------------------------------------------------
-    
+
+    @Override
     public void setUDFContextSignature(String signature) {
         udfcSignature = signature;
     }
 
+    @Override
     public ResourceSchema getSchema(String location, Job job)
     throws IOException {
 
@@ -338,12 +346,12 @@ public class JsonLoader extends LoadFunc
         } else {
             // Parse the schema
             s = (new JsonMetadata()).getSchema(location, job, true);
-    
+
             if (s == null) {
                 throw new IOException("Unable to parse schema found in file in 
" + location);
             }
         }
-    
+
         // Now that we have determined the schema, store it in our
         // UDFContext properties object so we have it when we need it on the
         // backend
@@ -355,18 +363,21 @@ public class JsonLoader extends LoadFunc
         return s;
     }
 
-    public ResourceStatistics getStatistics(String location, Job job) 
+    @Override
+    public ResourceStatistics getStatistics(String location, Job job)
     throws IOException {
         // We don't implement this one.
         return null;
     }
 
+    @Override
     public String[] getPartitionKeys(String location, Job job)
     throws IOException {
         // We don't have partitions
         return null;
     }
 
+    @Override
     public void setPartitionFilter(Expression partitionFilter)
     throws IOException {
         // We don't have partitions

Modified: pig/branches/spark/src/org/apache/pig/builtin/LongAvg.java
URL: 
http://svn.apache.org/viewvc/pig/branches/spark/src/org/apache/pig/builtin/LongAvg.java?rev=1733627&r1=1733626&r2=1733627&view=diff
==============================================================================
--- pig/branches/spark/src/org/apache/pig/builtin/LongAvg.java (original)
+++ pig/branches/spark/src/org/apache/pig/builtin/LongAvg.java Fri Mar  4 
18:17:39 2016
@@ -24,19 +24,19 @@ import org.apache.pig.Accumulator;
 import org.apache.pig.Algebraic;
 import org.apache.pig.EvalFunc;
 import org.apache.pig.PigException;
+import org.apache.pig.backend.executionengine.ExecException;
 import org.apache.pig.data.DataBag;
 import org.apache.pig.data.DataType;
 import org.apache.pig.data.Tuple;
 import org.apache.pig.data.TupleFactory;
 import org.apache.pig.impl.logicalLayer.schema.Schema;
-import org.apache.pig.backend.executionengine.ExecException;
 
 
 /**
  * This method should never be used directly, use {@link AVG}.
  */
 public class LongAvg extends EvalFunc<Double> implements Algebraic, 
Accumulator<Double> {
-    
+
     private static TupleFactory mTupleFactory = TupleFactory.getInstance();
 
     @Override
@@ -53,21 +53,24 @@ public class LongAvg extends EvalFunc<Do
             Double avg = null;
             if (count > 0)
                 avg = new Double(sum / count);
-    
+
             return avg;
         } catch (ExecException ee) {
             throw ee;
         }
     }
 
+    @Override
     public String getInitial() {
         return Initial.class.getName();
     }
 
+    @Override
     public String getIntermed() {
         return Intermediate.class.getName();
     }
 
+    @Override
     public String getFinal() {
         return Final.class.getName();
     }
@@ -96,9 +99,9 @@ public class LongAvg extends EvalFunc<Do
             } catch (Exception e) {
                 int errCode = 2106;
                 String msg = "Error while computing average in " + 
this.getClass().getSimpleName();
-                throw new ExecException(msg, errCode, PigException.BUG, e);    
       
+                throw new ExecException(msg, errCode, PigException.BUG, e);
             }
-                
+
         }
     }
 
@@ -113,7 +116,7 @@ public class LongAvg extends EvalFunc<Do
             } catch (Exception e) {
                 int errCode = 2106;
                 String msg = "Error while computing average in " + 
this.getClass().getSimpleName();
-                throw new ExecException(msg, errCode, PigException.BUG, e);    
       
+                throw new ExecException(msg, errCode, PigException.BUG, e);
             }
         }
     }
@@ -141,7 +144,7 @@ public class LongAvg extends EvalFunc<Do
             } catch (Exception e) {
                 int errCode = 2106;
                 String msg = "Error while computing average in " + 
this.getClass().getSimpleName();
-                throw new ExecException(msg, errCode, PigException.BUG, e);    
       
+                throw new ExecException(msg, errCode, PigException.BUG, e);
             }
         }
     }
@@ -162,7 +165,7 @@ public class LongAvg extends EvalFunc<Do
             Tuple t = it.next();
             Long l = (Long)t.get(0);
             // we count nulls in avg as contributing 0
-            // a departure from SQL for performance of 
+            // a departure from SQL for performance of
             // COUNT() which implemented by just inspecting
             // size of the bag
             if(l == null) {
@@ -199,7 +202,7 @@ public class LongAvg extends EvalFunc<Do
         DataBag values = (DataBag)input.get(0);
 
         // if we were handed an empty bag, return NULL
-        if(values.size() == 0) {
+        if(values == null || values.size() == 0) {
             return null;
         }
 
@@ -225,17 +228,17 @@ public class LongAvg extends EvalFunc<Do
             return null;
         }
     }
-    
+
     @Override
     public Schema outputSchema(Schema input) {
-        return new Schema(new Schema.FieldSchema(null, DataType.DOUBLE)); 
+        return new Schema(new Schema.FieldSchema(null, DataType.DOUBLE));
     }
-    
+
     /* Accumulator interface */
-   
+
     private Long intermediateSum = null;
     private Double intermediateCount = null;
-    
+
     @Override
     public void accumulate(Tuple b) throws IOException {
         try {
@@ -248,7 +251,7 @@ public class LongAvg extends EvalFunc<Do
                 intermediateSum = 0L;
                 intermediateCount = 0.0;
             }
-            
+
             double count = (Long)count(b);
 
             if (count > 0) {
@@ -260,9 +263,9 @@ public class LongAvg extends EvalFunc<Do
         } catch (Exception e) {
             int errCode = 2106;
             String msg = "Error while computing average in " + 
this.getClass().getSimpleName();
-            throw new ExecException(msg, errCode, PigException.BUG, e);        
   
+            throw new ExecException(msg, errCode, PigException.BUG, e);
         }
-    }        
+    }
 
     @Override
     public void cleanup() {

Modified: pig/branches/spark/src/org/apache/pig/builtin/OrcStorage.java
URL: 
http://svn.apache.org/viewvc/pig/branches/spark/src/org/apache/pig/builtin/OrcStorage.java?rev=1733627&r1=1733626&r2=1733627&view=diff
==============================================================================
--- pig/branches/spark/src/org/apache/pig/builtin/OrcStorage.java (original)
+++ pig/branches/spark/src/org/apache/pig/builtin/OrcStorage.java Fri Mar  4 
18:17:39 2016
@@ -37,6 +37,7 @@ import org.apache.commons.logging.LogFac
 import org.apache.hadoop.fs.FileStatus;
 import org.apache.hadoop.fs.FileSystem;
 import org.apache.hadoop.fs.Path;
+import org.apache.hadoop.fs.PathFilter;
 import org.apache.hadoop.hive.conf.HiveConf;
 import org.apache.hadoop.hive.ql.io.orc.CompressionKind;
 import org.apache.hadoop.hive.ql.io.orc.OrcFile;
@@ -91,7 +92,7 @@ import org.apache.pig.impl.logicalLayer.
 import org.apache.pig.impl.util.ObjectSerializer;
 import org.apache.pig.impl.util.UDFContext;
 import org.apache.pig.impl.util.Utils;
-import org.apache.pig.impl.util.orc.OrcUtils;
+import org.apache.pig.impl.util.hive.HiveUtils;
 import org.joda.time.DateTime;
 
 import com.esotericsoftware.kryo.io.Input;
@@ -235,7 +236,7 @@ public class OrcStorage extends LoadFunc
             typeInfo = 
(TypeInfo)ObjectSerializer.deserialize(p.getProperty(signature + 
SchemaSignatureSuffix));
         }
         if (oi==null) {
-            oi = OrcUtils.createObjectInspector(typeInfo);
+            oi = HiveUtils.createObjectInspector(typeInfo);
         }
     }
 
@@ -244,7 +245,7 @@ public class OrcStorage extends LoadFunc
         ResourceFieldSchema fs = new ResourceFieldSchema();
         fs.setType(DataType.TUPLE);
         fs.setSchema(rs);
-        typeInfo = OrcUtils.getTypeInfo(fs);
+        typeInfo = HiveUtils.getTypeInfo(fs);
         Properties p = 
UDFContext.getUDFContext().getUDFProperties(this.getClass());
         p.setProperty(signature + SchemaSignatureSuffix, 
ObjectSerializer.serialize(typeInfo));
     }
@@ -376,7 +377,7 @@ public class OrcStorage extends LoadFunc
             }
             Object value = in.getCurrentValue();
 
-            Tuple t = (Tuple)OrcUtils.convertOrcToPig(value, oi, 
mRequiredColumns);
+            Tuple t = (Tuple)HiveUtils.convertHiveToPig(value, oi, 
mRequiredColumns);
             return t;
         } catch (InterruptedException e) {
             int errCode = 6018;
@@ -406,7 +407,7 @@ public class OrcStorage extends LoadFunc
         return FuncUtils.getShipFiles(classList);
     }
 
-    private static Path getFirstFile(String location, FileSystem fs) throws 
IOException {
+    private static Path getFirstFile(String location, FileSystem fs, 
PathFilter filter) throws IOException {
         String[] locations = getPathStrings(location);
         Path[] paths = new Path[locations.length];
         for (int i = 0; i < paths.length; ++i) {
@@ -423,7 +424,7 @@ public class OrcStorage extends LoadFunc
         }
         FileStatus[] statusArray = (FileStatus[]) statusList
                 .toArray(new FileStatus[statusList.size()]);
-        Path p = Utils.depthFirstSearchForFile(statusArray, fs);
+        Path p = Utils.depthFirstSearchForFile(statusArray, fs, filter);
         return p;
     }
 
@@ -438,7 +439,7 @@ public class OrcStorage extends LoadFunc
             }
         }
 
-        ResourceFieldSchema fs = OrcUtils.getResourceFieldSchema(typeInfo);
+        ResourceFieldSchema fs = HiveUtils.getResourceFieldSchema(typeInfo);
         return fs.getSchema();
     }
 
@@ -456,7 +457,7 @@ public class OrcStorage extends LoadFunc
 
     private TypeInfo getTypeInfoFromLocation(String location, Job job) throws 
IOException {
         FileSystem fs = FileSystem.get(job.getConfiguration());
-        Path path = getFirstFile(location, fs);
+        Path path = getFirstFile(location, fs, new NonEmptyOrcFileFilter(fs));
         if (path == null) {
             log.info("Cannot find any ORC files from " + location +
                     ". Probably multiple load store in script.");
@@ -467,6 +468,28 @@ public class OrcStorage extends LoadFunc
         return TypeInfoUtils.getTypeInfoFromObjectInspector(oip);
     }
 
+    public static class NonEmptyOrcFileFilter implements PathFilter {
+        private FileSystem fs;
+        public NonEmptyOrcFileFilter(FileSystem fs) {
+            this.fs = fs;
+        }
+        @Override
+        public boolean accept(Path path) {
+            Reader reader;
+            try {
+                reader = OrcFile.createReader(fs, path);
+                ObjectInspector oip = 
(ObjectInspector)reader.getObjectInspector();
+                ResourceFieldSchema rs = 
HiveUtils.getResourceFieldSchema(TypeInfoUtils.getTypeInfoFromObjectInspector(oip));
+                if (rs.getSchema().getFields().length!=0) {
+                    return true;
+                }
+            } catch (IOException e) {
+                throw new RuntimeException(e);
+            }
+            return false;
+        }
+    }
+
     @Override
     public ResourceStatistics getStatistics(String location, Job job)
             throws IOException {

Modified: pig/branches/spark/src/org/apache/pig/builtin/PigStorage.java
URL: 
http://svn.apache.org/viewvc/pig/branches/spark/src/org/apache/pig/builtin/PigStorage.java?rev=1733627&r1=1733626&r2=1733627&view=diff
==============================================================================
--- pig/branches/spark/src/org/apache/pig/builtin/PigStorage.java (original)
+++ pig/branches/spark/src/org/apache/pig/builtin/PigStorage.java Fri Mar  4 
18:17:39 2016
@@ -54,6 +54,7 @@ import org.apache.pig.LoadFunc;
 import org.apache.pig.LoadMetadata;
 import org.apache.pig.LoadPushDown;
 import org.apache.pig.OverwritableStoreFunc;
+import org.apache.pig.PigConfiguration;
 import org.apache.pig.PigException;
 import org.apache.pig.ResourceSchema;
 import org.apache.pig.ResourceSchema.ResourceFieldSchema;
@@ -67,6 +68,7 @@ import org.apache.pig.backend.hadoop.exe
 import 
org.apache.pig.backend.hadoop.executionengine.mapReduceLayer.PigTextInputFormat;
 import 
org.apache.pig.backend.hadoop.executionengine.mapReduceLayer.PigTextOutputFormat;
 import 
org.apache.pig.backend.hadoop.executionengine.physicalLayer.relationalOperators.POStore;
+import org.apache.pig.backend.hadoop.executionengine.shims.HadoopShims;
 import org.apache.pig.bzip2r.Bzip2TextInputFormat;
 import org.apache.pig.data.DataByteArray;
 import org.apache.pig.data.Tuple;
@@ -159,6 +161,10 @@ LoadPushDown, LoadMetadata, StoreMetadat
     private static final String TAG_SOURCE_PATH = "tagPath";
     private Path sourcePath = null;
 
+    // it determines whether to depend on pig's own Bzip2TextInputFormat or
+    // to simply depend on hadoop for handling bzip2 inputs
+    private boolean bzipinput_usehadoops ;
+
     private Options populateValidOptions() {
         Options validOptions = new Options();
         validOptions.addOption("schema", false, "Loads / Stores the schema of 
the relation using a hidden JSON file.");
@@ -419,9 +425,12 @@ LoadPushDown, LoadMetadata, StoreMetadat
 
     @Override
     public InputFormat getInputFormat() {
-        if(loadLocation.endsWith(".bz2") || loadLocation.endsWith(".bz")) {
+        if((loadLocation.endsWith(".bz2") || loadLocation.endsWith(".bz"))
+           && (!bzipinput_usehadoops || !HadoopShims.isHadoopYARN()) ) {
+            mLog.info("Using Bzip2TextInputFormat");
             return new Bzip2TextInputFormat();
         } else {
+            mLog.info("Using PigTextInputFormat");
             return new PigTextInputFormat();
         }
     }
@@ -439,6 +448,9 @@ LoadPushDown, LoadMetadata, StoreMetadat
     throws IOException {
         loadLocation = location;
         FileInputFormat.setInputPaths(job, location);
+        bzipinput_usehadoops = job.getConfiguration().getBoolean(
+                                  
PigConfiguration.PIG_BZIP_USE_HADOOP_INPUTFORMAT,
+                                  true );
     }
 
     @Override

Modified: pig/branches/spark/src/org/apache/pig/builtin/PluckTuple.java
URL: 
http://svn.apache.org/viewvc/pig/branches/spark/src/org/apache/pig/builtin/PluckTuple.java?rev=1733627&r1=1733626&r2=1733627&view=diff
==============================================================================
--- pig/branches/spark/src/org/apache/pig/builtin/PluckTuple.java (original)
+++ pig/branches/spark/src/org/apache/pig/builtin/PluckTuple.java Fri Mar  4 
18:17:39 2016
@@ -20,6 +20,7 @@ package org.apache.pig.builtin;
 
 import java.io.IOException;
 import java.util.List;
+import java.util.regex.Pattern;
 
 import org.apache.pig.EvalFunc;
 import org.apache.pig.data.DataType;
@@ -35,6 +36,8 @@ import com.google.common.collect.Lists;
  * filter for the columns in a relation that begin with that prefix.
  *
  * Example:
+ *
+ * 1) Prefix
  * a = load 'a' as (x, y);
  * b = load 'b' as (x, y);
  * c = join a by x, b by x;
@@ -44,16 +47,35 @@ import com.google.common.collect.Lists;
  * c: {a::x: bytearray,a::y: bytearray,b::x: bytearray,b::y: bytearray}
  * describe d;
  * d: {plucked::a::x: bytearray,plucked::a::y: bytearray}
+ *
+ * 2) Regex
+ * a = load 'a' as (x, y);
+ * b = load 'b' as (x, y);
+ * c = join a by x, b by x;
+ * DEFINE pluck PluckTuple('.*::y');
+ * d = foreach c generate FLATTEN(pluck(*));
+ * describe c;
+ * c: {a::x: bytearray,a::y: bytearray,b::x: bytearray,b::y: bytearray}
+ * describe d;
+ * d: {plucked::a::y: bytearray,plucked::a::y: bytearray}
  */
 public class PluckTuple extends EvalFunc<Tuple> {
     private static final TupleFactory mTupleFactory = 
TupleFactory.getInstance();
+    private Pattern pattern;
 
     private boolean isInitialized = false;
     private int[] indicesToInclude;
     private String prefix;
+    private boolean match;
 
     public PluckTuple(String prefix) {
+        this(prefix,"true");
+    }
+
+    public PluckTuple(String prefix, String match) {
         this.prefix = prefix;
+        this.match = Boolean.valueOf(match);
+        pattern = Pattern.compile(prefix);
     }
 
     @Override
@@ -63,7 +85,10 @@ public class PluckTuple extends EvalFunc
             Schema inputSchema = getInputSchema();
             for (int i = 0; i < inputSchema.size(); i++) {
                 String alias = inputSchema.getField(i).alias;
-                if (alias.startsWith(prefix)) {
+                if (this.match && (alias.startsWith(prefix) || 
pattern.matcher(alias).matches()) ) {
+                    indicesToInclude.add(i);
+                }
+                else if (!this.match && !alias.startsWith(prefix) && 
!pattern.matcher(alias).matches() ){
                     indicesToInclude.add(i);
                 }
             }
@@ -92,7 +117,10 @@ public class PluckTuple extends EvalFunc
                 } catch (FrontendException e) {
                     throw new RuntimeException(e); // Should never happen
                 }
-                if (alias.startsWith(prefix)) {
+                if (this.match && (alias.startsWith(prefix) || 
pattern.matcher(alias).matches())) {
+                    indicesToInclude.add(i);
+                }
+                else if (!this.match && !alias.startsWith(prefix) && 
!pattern.matcher(alias).matches()){
                     indicesToInclude.add(i);
                 }
             }

Modified: pig/branches/spark/src/org/apache/pig/builtin/RANDOM.java
URL: 
http://svn.apache.org/viewvc/pig/branches/spark/src/org/apache/pig/builtin/RANDOM.java?rev=1733627&r1=1733626&r2=1733627&view=diff
==============================================================================
--- pig/branches/spark/src/org/apache/pig/builtin/RANDOM.java (original)
+++ pig/branches/spark/src/org/apache/pig/builtin/RANDOM.java Fri Mar  4 
18:17:39 2016
@@ -22,6 +22,10 @@ import java.io.IOException;
 import java.util.Random;
 
 import org.apache.pig.EvalFunc;
+import org.apache.pig.PigConstants;
+import org.apache.pig.StaticDataCleanup;
+import 
org.apache.pig.backend.hadoop.executionengine.mapReduceLayer.PigMapReduce;
+import 
org.apache.pig.backend.hadoop.executionengine.mapReduceLayer.MRConfiguration;
 import org.apache.pig.data.Tuple;
 import org.apache.pig.impl.logicalLayer.schema.Schema;
 import org.apache.pig.data.DataType;
@@ -32,23 +36,60 @@ import org.apache.pig.data.DataType;
  */
 @Nondeterministic
 public class RANDOM extends EvalFunc<Double>{
-    private Random r;
+    private Random r = null;
 
     public RANDOM() {
-        r = new Random();
     }
 
     public RANDOM(String seed) {
         r = new Random(Long.parseLong(seed));
     }
 
-       @Override
-       public Double exec(Tuple input) throws IOException {
-               return r.nextDouble();
-       }
+    @Override
+    public Double exec(Tuple input) throws IOException {
+        if( r == null ) {
+            int jobidhash = 
PigMapReduce.sJobConfInternal.get().get(MRConfiguration.JOB_ID).hashCode();
+            int taskIndex = 
Integer.valueOf(PigMapReduce.sJobConfInternal.get().get(PigConstants.TASK_INDEX));
+
+            // XOR-ing 3 separate values
+            // |<-----32 bits---->|<----32 bits----->|
+            // |-jobidhash(int)---|-jobidhash(int)---|
+            // |        |---taskIndex(int)--|
+            // |----------seedUniquifier (long)------|
+            //          |                            |
+            //          |<-- Only 48 bits used ----->|
+            //          |    by java.util.Random     |
+            //          |                            |
+            //
+            // Reason for repeating jobidhash and shifting taskIndex is, seed
+            // too close to each others would produce very similar values.
+            //
+            // Goal of this method is to produce a pseudo-random values that
+            // would
+            // (1) Produce a same sequence of peusdo-random variables for 
attempts from same jobid/vertexid and taskid
+            // (2) When taskid, jobid, or vertexid(tez) differ, they should 
produce a different random sequence
+            // (3) When Random is called more than once inside the script, 
they should also produce different random values
+            //     e.g. B = FOREACH A generate RANDOM(), RANDOM();
+            //
+            r = new Random( (((long) jobidhash) << 32 | (jobidhash &  
0xffffffffL))  ^ ((long) taskIndex << 16) ^ seedUniquifier);
+
+            // L'Ecuyer, "Tables of Linear Congruential Generators of
+            // Different Sizes and Good Lattice Structure", 1999
+            seedUniquifier *= 4292484099903637661L;
+        }
+        return r.nextDouble();
+    }
 
     @Override
     public Schema outputSchema(Schema input) {
         return new Schema(new 
Schema.FieldSchema(getSchemaName(this.getClass().getName().toLowerCase(), 
input), DataType.DOUBLE));
     }
+
+    // Taking the initial seed value from java.util.Random
+    private static long seedUniquifier = 8682522807148012L;
+
+    @StaticDataCleanup
+    public static void resetSeedUniquifier() {
+        seedUniquifier = 8682522807148012L;
+    }
 }

Modified: pig/branches/spark/src/org/apache/pig/builtin/REPLACE.java
URL: 
http://svn.apache.org/viewvc/pig/branches/spark/src/org/apache/pig/builtin/REPLACE.java?rev=1733627&r1=1733626&r2=1733627&view=diff
==============================================================================
--- pig/branches/spark/src/org/apache/pig/builtin/REPLACE.java (original)
+++ pig/branches/spark/src/org/apache/pig/builtin/REPLACE.java Fri Mar  4 
18:17:39 2016
@@ -21,14 +21,15 @@ package org.apache.pig.builtin;
 import java.io.IOException;
 import java.util.ArrayList;
 import java.util.List;
+import java.util.regex.Pattern;
 
 import org.apache.pig.EvalFunc;
 import org.apache.pig.FuncSpec;
 import org.apache.pig.PigWarning;
-import org.apache.pig.data.Tuple;
 import org.apache.pig.data.DataType;
-import org.apache.pig.impl.logicalLayer.schema.Schema;
+import org.apache.pig.data.Tuple;
 import org.apache.pig.impl.logicalLayer.FrontendException;
+import org.apache.pig.impl.logicalLayer.schema.Schema;
 
 /**
  * REPLACE implements eval function to replace part of a string.
@@ -42,6 +43,8 @@ import org.apache.pig.impl.logicalLayer.
  */
 public class REPLACE extends EvalFunc<String>
 {
+    private Pattern mPattern = null;
+
     /**
      * Method invoked on every tuple during foreach evaluation
      * @param input tuple; first column is assumed to have the column to 
convert
@@ -52,13 +55,29 @@ public class REPLACE extends EvalFunc<St
         if (input == null || input.size() < 3)
             return null;
 
-        try{
-            String source = (String)input.get(0);
-            String target = (String)input.get(1);
-            String replacewith = (String)input.get(2);
-            return source.replaceAll(target, replacewith);
+        String source = (String)input.get(0);
+        String target = (String)input.get(1);
+
+        if (target == null) {
+            warn("Replace : Regular expression is null", 
PigWarning.UDF_WARNING_1);
+            return null;
+        }
+        
+        if (mPattern == null || ! target.equals(mPattern.pattern())) {
+            try {
+                mPattern = Pattern.compile(target);
+            } catch (Exception e) {
+                warn("Replace : Mal-Formed Regular expression : " + target, 
PigWarning.UDF_WARNING_1);
+                return null;
+            }
+        }            
+        
+        String replacewith = (String)input.get(2);
+        
+        try {    
+           return mPattern.matcher(source).replaceAll(replacewith);
         }catch(Exception e){
-            warn("Failed to process input; error - " + e.getMessage(), 
PigWarning.UDF_WARNING_1);
+            warn("Replace : Failed to process input; error - " + 
e.getMessage(), PigWarning.UDF_WARNING_1);
             return null;
         }
     }

Modified: pig/branches/spark/src/org/apache/pig/builtin/RollupDimensions.java
URL: 
http://svn.apache.org/viewvc/pig/branches/spark/src/org/apache/pig/builtin/RollupDimensions.java?rev=1733627&r1=1733626&r2=1733627&view=diff
==============================================================================
--- pig/branches/spark/src/org/apache/pig/builtin/RollupDimensions.java 
(original)
+++ pig/branches/spark/src/org/apache/pig/builtin/RollupDimensions.java Fri Mar 
 4 18:17:39 2016
@@ -47,10 +47,6 @@ public class RollupDimensions extends Ev
     private static BagFactory bf = BagFactory.getInstance();
     private static TupleFactory tf = TupleFactory.getInstance();
     private final String allMarker;
-    // the pivot position
-    private int pivot = -1;
-    // to check if rollup is optimized or not
-    private boolean rollupHIIoptimizable = false;
 
     public RollupDimensions() {
        this(null);
@@ -61,18 +57,6 @@ public class RollupDimensions extends Ev
        this.allMarker = allMarker;
     }
 
-    public void setRollupHIIOptimizable(boolean check) {
-        this.rollupHIIoptimizable = check;
-    }
-
-    public boolean getRollupHIIOptimizable() {
-        return this.rollupHIIoptimizable;
-    }
-
-    public void setPivot(int pvt) throws IOException {
-        this.pivot = pvt;
-    }
-
     @Override
     public DataBag exec(Tuple tuple) throws IOException {
        List<Tuple> result = Lists.newArrayListWithCapacity(tuple.size() + 1);
@@ -82,32 +66,12 @@ public class RollupDimensions extends Ev
        return bf.newDefaultBag(result);
     }
 
-    private void iterativelyRollup(List<Tuple> result, Tuple input)
-            throws IOException {
-
-        Tuple tempTup = tf.newTuple(input.getAll());
-
-        //if (this.rollupHIIoptimizable != null) { // rule is enabled
-            if (this.rollupHIIoptimizable == true) {
-                if (this.pivot == -1) // user did not specify the pivot 
position
-                                      // --> IRG approach
-                    return;
-                else { // user did specify the pivot position --> IRG + IRG
-                    if (this.pivot == 0) // we use the IRG approach
-                        return;
-                    else { // we use IRG+IRG approach
-                        for (int i = this.pivot - 1; i < input.size(); i++)
-                            tempTup.set(i, allMarker);
-                        result.add(tf.newTuple(tempTup.getAll()));
-                    }
-                }
-            }
-            else { // we can not optimize --> Vanilla approach
-            for (int i = input.size() - 1; i >= 0; i--) {
-                tempTup.set(i, allMarker);
-                result.add(tf.newTuple(tempTup.getAll()));
-            }
-        }
+    private void iterativelyRollup(List<Tuple> result, Tuple input) throws 
ExecException {
+       Tuple tempTup = tf.newTuple(input.getAll());
+       for (int i = input.size() - 1; i >= 0; i--) {
+           tempTup.set(i, allMarker);
+           result.add(tf.newTuple(tempTup.getAll()));
+       }
     }
 
     @Override

Modified: pig/branches/spark/src/org/apache/pig/builtin/StringMax.java
URL: 
http://svn.apache.org/viewvc/pig/branches/spark/src/org/apache/pig/builtin/StringMax.java?rev=1733627&r1=1733626&r2=1733627&view=diff
==============================================================================
--- pig/branches/spark/src/org/apache/pig/builtin/StringMax.java (original)
+++ pig/branches/spark/src/org/apache/pig/builtin/StringMax.java Fri Mar  4 
18:17:39 2016
@@ -45,14 +45,17 @@ public class StringMax extends EvalFunc<
         }
     }
 
+    @Override
     public String getInitial() {
         return Initial.class.getName();
     }
 
+    @Override
     public String getIntermed() {
         return Intermediate.class.getName();
     }
 
+    @Override
     public String getFinal() {
         return Final.class.getName();
     }
@@ -77,7 +80,7 @@ public class StringMax extends EvalFunc<
             } catch (Exception e) {
                 int errCode = 2106;
                 String msg = "Error while computing max in " + 
this.getClass().getSimpleName();
-                throw new ExecException(msg, errCode, PigException.BUG, e);    
       
+                throw new ExecException(msg, errCode, PigException.BUG, e);
             }
         }
     }
@@ -94,7 +97,7 @@ public class StringMax extends EvalFunc<
             } catch (Exception e) {
                 int errCode = 2106;
                 String msg = "Error while computing max in " + 
this.getClass().getSimpleName();
-                throw new ExecException(msg, errCode, PigException.BUG, e);    
       
+                throw new ExecException(msg, errCode, PigException.BUG, e);
             }
         }
     }
@@ -108,17 +111,17 @@ public class StringMax extends EvalFunc<
             } catch (Exception e) {
                 int errCode = 2106;
                 String msg = "Error while computing max in " + 
this.getClass().getSimpleName();
-                throw new ExecException(msg, errCode, PigException.BUG, e);    
       
+                throw new ExecException(msg, errCode, PigException.BUG, e);
             }
         }
     }
 
     static protected String max(Tuple input) throws ExecException {
         DataBag values = (DataBag)input.get(0);
-        
+
         // if we were handed an empty bag, return NULL
         // this is in compliance with SQL standard
-        if(values.size() == 0) {
+        if(values == null || values.size() == 0) {
             return null;
         }
 
@@ -129,7 +132,7 @@ public class StringMax extends EvalFunc<
             Tuple t = it.next();
             curMax = (String)(t.get(0));
         }
-        
+
         for (; it.hasNext();) {
             Tuple t = it.next();
             try {
@@ -138,26 +141,26 @@ public class StringMax extends EvalFunc<
                 if( s.compareTo(curMax) > 0) {
                     curMax = s;
                 }
-                
+
             } catch (RuntimeException exp) {
                 int errCode = 2103;
                 String msg = "Problem while computing max of strings.";
                 throw new ExecException(msg, errCode, PigException.BUG, exp);
             }
         }
-    
+
         return curMax;
     }
 
     @Override
     public Schema outputSchema(Schema input) {
-        return new Schema(new Schema.FieldSchema(null, DataType.CHARARRAY)); 
+        return new Schema(new Schema.FieldSchema(null, DataType.CHARARRAY));
     }
 
 
     /* accumulator interface */
     private String intermediateMax = null;
-    
+
     @Override
     public void accumulate(Tuple b) throws IOException {
         try {
@@ -166,16 +169,16 @@ public class StringMax extends EvalFunc<
                 return;
             }
             // check if it lexicographically follows curMax
-            if (intermediateMax == null || intermediateMax.compareTo(curMax) > 
0) {
+            if (intermediateMax == null || intermediateMax.compareTo(curMax) < 
0) {
                 intermediateMax = curMax;
-            }            
+            }
 
         } catch (ExecException ee) {
             throw ee;
         } catch (Exception e) {
             int errCode = 2106;
             String msg = "Error while computing max in " + 
this.getClass().getSimpleName();
-            throw new ExecException(msg, errCode, PigException.BUG, e);        
   
+            throw new ExecException(msg, errCode, PigException.BUG, e);
         }
     }
 



Reply via email to