Modified: pig/branches/spark/src/org/apache/pig/builtin/StringMin.java
URL: 
http://svn.apache.org/viewvc/pig/branches/spark/src/org/apache/pig/builtin/StringMin.java?rev=1733627&r1=1733626&r2=1733627&view=diff
==============================================================================
--- pig/branches/spark/src/org/apache/pig/builtin/StringMin.java (original)
+++ pig/branches/spark/src/org/apache/pig/builtin/StringMin.java Fri Mar  4 
18:17:39 2016
@@ -46,14 +46,17 @@ public class StringMin extends EvalFunc<
         }
     }
 
+    @Override
     public String getInitial() {
         return Initial.class.getName();
     }
 
+    @Override
     public String getIntermed() {
         return Intermediate.class.getName();
     }
 
+    @Override
     public String getFinal() {
         return Final.class.getName();
     }
@@ -78,7 +81,7 @@ public class StringMin extends EvalFunc<
             } catch (Exception e) {
                 int errCode = 2106;
                 String msg = "Error while computing min in " + 
this.getClass().getSimpleName();
-                throw new ExecException(msg, errCode, PigException.BUG, e);    
       
+                throw new ExecException(msg, errCode, PigException.BUG, e);
             }
         }
     }
@@ -95,7 +98,7 @@ public class StringMin extends EvalFunc<
             } catch (Exception e) {
                 int errCode = 2106;
                 String msg = "Error while computing min in " + 
this.getClass().getSimpleName();
-                throw new ExecException(msg, errCode, PigException.BUG, e);    
       
+                throw new ExecException(msg, errCode, PigException.BUG, e);
             }
         }
     }
@@ -109,17 +112,17 @@ public class StringMin extends EvalFunc<
             } catch (Exception e) {
                 int errCode = 2106;
                 String msg = "Error while computing min in " + 
this.getClass().getSimpleName();
-                throw new ExecException(msg, errCode, PigException.BUG, e);    
       
+                throw new ExecException(msg, errCode, PigException.BUG, e);
             }
         }
     }
 
     static protected String min(Tuple input) throws ExecException {
         DataBag values = (DataBag)input.get(0);
-        
+
         // if we were handed an empty bag, return NULL
         // this is in compliance with SQL standard
-        if(values.size() == 0) {
+        if(values == null || values.size() == 0) {
             return null;
         }
 
@@ -130,7 +133,7 @@ public class StringMin extends EvalFunc<
             Tuple t = it.next();
             curMin = (String)(t.get(0));
         }
-        
+
         for (; it.hasNext();) {
             Tuple t = it.next();
             try {
@@ -139,25 +142,25 @@ public class StringMin extends EvalFunc<
                 if( s.compareTo(curMin) < 0) {
                     curMin = s;
                 }
-                
+
             } catch (RuntimeException exp) {
                 int errCode = 2103;
                 String msg = "Problem while computing min of strings.";
                 throw new ExecException(msg, errCode, PigException.BUG, exp);
             }
         }
-    
+
         return curMin;
     }
 
     @Override
     public Schema outputSchema(Schema input) {
-        return new Schema(new Schema.FieldSchema(null, DataType.CHARARRAY)); 
+        return new Schema(new Schema.FieldSchema(null, DataType.CHARARRAY));
     }
-    
+
     /* accumulator interface */
     private String intermediateMin = null;
-    
+
     @Override
     public void accumulate(Tuple b) throws IOException {
         try {
@@ -166,16 +169,16 @@ public class StringMin extends EvalFunc<
                 return;
             }
             // check if it lexicographically follows curMax
-            if (intermediateMin == null || intermediateMin.compareTo(curMin) < 
0) {
+            if (intermediateMin == null || intermediateMin.compareTo(curMin) > 
0) {
                 intermediateMin = curMin;
-            }            
+            }
 
         } catch (ExecException ee) {
             throw ee;
         } catch (Exception e) {
             int errCode = 2106;
             String msg = "Error while computing max in " + 
this.getClass().getSimpleName();
-            throw new ExecException(msg, errCode, PigException.BUG, e);        
   
+            throw new ExecException(msg, errCode, PigException.BUG, e);
         }
     }
 

Modified: pig/branches/spark/src/org/apache/pig/builtin/TOMAP.java
URL: 
http://svn.apache.org/viewvc/pig/branches/spark/src/org/apache/pig/builtin/TOMAP.java?rev=1733627&r1=1733626&r2=1733627&view=diff
==============================================================================
--- pig/branches/spark/src/org/apache/pig/builtin/TOMAP.java (original)
+++ pig/branches/spark/src/org/apache/pig/builtin/TOMAP.java Fri Mar  4 
18:17:39 2016
@@ -22,6 +22,7 @@ import java.util.Map;
 import java.util.HashMap;
 
 import org.apache.pig.EvalFunc;
+import org.apache.pig.data.DataBag;
 import org.apache.pig.data.DataType;
 import org.apache.pig.data.Tuple;
 import org.apache.pig.impl.logicalLayer.schema.Schema;
@@ -30,27 +31,53 @@ import org.apache.pig.impl.logicalLayer.
  * This class makes a map out of the parameters passed to it
  * T = foreach U generate TOMAP($0, $1, $2, $3);
  * It generates a map $0->1, $2->$3
+ *
+ * This UDF also accepts a bag with 'pair' tuples (i.e. tuples with a 'key' 
and a 'value').
+ *
  */
 public class TOMAP extends EvalFunc<Map> {
 
     @Override
     public Map exec(Tuple input) throws IOException {
-       if (input == null || input.size() < 2)
-               return null;
+        if (input == null || input.size() == 0) {
+            return null;
+        }
+
+        Map<String, Object> output = new HashMap<String, Object>();
+
         try {
-           Map<String, Object> output = new HashMap<String, Object>();
+            // Is this a single bag with all the values?
+            if (input.size() == 1) {
+                if (input.get(0) instanceof DataBag) {
+                    DataBag bagOfPairs = (DataBag)input.get(0);
+                    if (bagOfPairs.size() == 0) {
+                        return output;
+                    }
+
+                    for (Tuple tuple: bagOfPairs) {
+                        if (tuple.size() != 2) {
+                            throw new RuntimeException("All input tuples in 
the bag MUST have exactly 2 fields");
+                        }
+                        String key = (String)tuple.get(0);
+                        Object val = tuple.get(1);
+                        output.put(key, val);
+                    }
+                    return output;
+                } else {
+                    return null; // If only 1 value then it must be a bag
+                }
+            }
 
             for (int i = 0; i < input.size(); i=i+2) {
-               String key = (String)input.get(i);
-               Object val = input.get(i+1);    
-               output.put(key, val);
+                String key = (String)input.get(i);
+                Object val = input.get(i+1);
+                output.put(key, val);
             }
-
-           return output;
-       } catch (ClassCastException e){
-               throw new RuntimeException("Map key must be a String");
-       } catch (ArrayIndexOutOfBoundsException e){
-               throw new RuntimeException("Function input must have even 
number of parameters");
+            return output;
+        } catch (ClassCastException e){
+            throw new RuntimeException("Map key must be a String");
+        } catch (ArrayIndexOutOfBoundsException e){
+            throw new RuntimeException("Function input must have even number 
of parameters");
         } catch (Exception e) {
             throw new RuntimeException("Error while creating a map", e);
         }
@@ -58,7 +85,34 @@ public class TOMAP extends EvalFunc<Map>
 
     @Override
     public Schema outputSchema(Schema input) {
-        return new Schema(new Schema.FieldSchema(null, DataType.MAP));
+        Byte valueType = null;
+        if (input.size() == 1) {
+            // If input is bag with 'pair' tuples
+            Schema bagSchema = input.getFields().get(0).schema;
+            if (bagSchema != null && bagSchema.size() == 1) {
+                Schema tupleSchema = bagSchema.getFields().get(0).schema;
+                if (tupleSchema != null) {
+                    valueType = tupleSchema.getFields().get(1).type;
+                }
+            }
+        } else if (input != null && input.getFields()!=null) {
+            for (int i=0;i<input.size();i++) {
+                if (i % 2 == 1) {
+                    if (valueType == null) {
+                        valueType = input.getFields().get(i).type;
+                    } else if  (valueType != input.getFields().get(i).type) {
+                        valueType = DataType.BYTEARRAY;
+                        break;
+                    }
+                }
+            }
+        }
+        Schema s = new Schema(new Schema.FieldSchema(null, DataType.MAP));
+        if (valueType != null && valueType != DataType.BYTEARRAY) {
+            s.getFields().get(0).schema = new Schema(new 
Schema.FieldSchema(null, valueType));
+            return s;
+        }
+        return s;
     }
 
     @Override

Modified: pig/branches/spark/src/org/apache/pig/builtin/TOP.java
URL: 
http://svn.apache.org/viewvc/pig/branches/spark/src/org/apache/pig/builtin/TOP.java?rev=1733627&r1=1733626&r2=1733627&view=diff
==============================================================================
--- pig/branches/spark/src/org/apache/pig/builtin/TOP.java (original)
+++ pig/branches/spark/src/org/apache/pig/builtin/TOP.java Fri Mar  4 18:17:39 
2016
@@ -27,6 +27,7 @@ import java.util.Random;
 
 import org.apache.commons.logging.Log;
 import org.apache.commons.logging.LogFactory;
+import org.apache.pig.AccumulatorEvalFunc;
 import org.apache.pig.Algebraic;
 import org.apache.pig.EvalFunc;
 import org.apache.pig.FuncSpec;
@@ -68,7 +69,7 @@ import org.apache.pig.impl.logicalLayer.
  *          GENERATE FLATTEN(result); *          
  *  }
  */
-public class TOP extends EvalFunc<DataBag> implements Algebraic{
+public class TOP extends AccumulatorEvalFunc<DataBag> implements Algebraic {
     private static final Log log = LogFactory.getLog(TOP.class);
     private static BagFactory mBagFactory = BagFactory.getInstance();
     private static TupleFactory mTupleFactory = TupleFactory.getInstance();
@@ -139,35 +140,27 @@ public class TOP extends EvalFunc<DataBa
         }
     }
 
+    // for Accumulator interface
+    private PriorityQueue<Tuple> store = null;
+    
     @Override
-    public DataBag exec(Tuple tuple) throws IOException {
+    public void accumulate(Tuple tuple) throws IOException {
         if (tuple == null || tuple.size() < 3) {
-            return null;
+            return;
         }
         try {
             int n = (Integer) tuple.get(0);
             int fieldNum = (Integer) tuple.get(1);
             DataBag inputBag = (DataBag) tuple.get(2);
             if (inputBag == null) {
-                return null;
+                return;
             }
 
-            PriorityQueue<Tuple> store = new PriorityQueue<Tuple>(n + 1,
-                    new TupleComparator(fieldNum, sortDesc));
-            updateTop(store, n, inputBag);
-            DataBag outputBag = mBagFactory.newDefaultBag();
-            for (Tuple t : store) {
-                outputBag.add(t);
-            }
-            if (log.isDebugEnabled()) {
-                if (randomizer.nextInt(1000) == 1) {
-                    log.debug("outputting a bag: ");
-                    for (Tuple t : outputBag)
-                        log.debug("outputting "+t.toDelimitedString("\t"));
-                    log.debug("==================");
-                }
+            if (store == null) {
+                store = new PriorityQueue<Tuple>(n + 1, new 
TupleComparator(fieldNum, sortDesc));
             }
-            return outputBag;
+            
+            updateTop(store, n, inputBag);
         } catch (ExecException e) {
             throw new RuntimeException("ExecException executing function: ", 
e);
         } catch (Exception e) {
@@ -175,6 +168,40 @@ public class TOP extends EvalFunc<DataBa
         }
     }
 
+       @Override
+       public DataBag getValue() {
+        if (store == null) {
+            return null;
+        }
+        
+        DataBag outputBag = mBagFactory.newDefaultBag();
+        
+        for (Tuple t : store) {
+            outputBag.add(t);
+        }
+        
+        if (log.isDebugEnabled()) {
+            if (randomizer.nextInt(1000) == 1) {
+                log.debug("outputting a bag: ");
+                try {
+                    for (Tuple t : outputBag) {
+                        log.debug("outputting "+t.toDelimitedString("\t"));
+                    }
+                    } catch (ExecException e) {
+                        throw new RuntimeException("ExecException executing 
function: ", e);
+                    }
+                log.debug("==================");
+            }
+        }
+        
+        return outputBag;
+    }
+
+    @Override
+    public void cleanup() {
+        store = null;
+    }
+
     protected static void updateTop(PriorityQueue<Tuple> store, int limit, 
DataBag inputBag) {
         Iterator<Tuple> itr = inputBag.iterator();
         while (itr.hasNext()) {

Modified: pig/branches/spark/src/org/apache/pig/builtin/TextLoader.java
URL: 
http://svn.apache.org/viewvc/pig/branches/spark/src/org/apache/pig/builtin/TextLoader.java?rev=1733627&r1=1733626&r2=1733627&view=diff
==============================================================================
--- pig/branches/spark/src/org/apache/pig/builtin/TextLoader.java (original)
+++ pig/branches/spark/src/org/apache/pig/builtin/TextLoader.java Fri Mar  4 
18:17:39 2016
@@ -22,6 +22,8 @@ import java.math.BigDecimal;
 import java.math.BigInteger;
 import java.util.Map;
 
+import org.apache.commons.logging.Log;
+import org.apache.commons.logging.LogFactory;
 import org.apache.hadoop.io.Text;
 import org.apache.hadoop.mapreduce.InputFormat;
 import org.apache.hadoop.mapreduce.Job;
@@ -29,11 +31,13 @@ import org.apache.hadoop.mapreduce.Recor
 import org.apache.hadoop.mapreduce.lib.input.FileInputFormat;
 import org.apache.pig.LoadCaster;
 import org.apache.pig.LoadFunc;
+import org.apache.pig.PigConfiguration;
 import org.apache.pig.PigException;
 import org.apache.pig.ResourceSchema.ResourceFieldSchema;
 import org.apache.pig.backend.executionengine.ExecException;
 import org.apache.pig.backend.hadoop.executionengine.mapReduceLayer.PigSplit;
 import 
org.apache.pig.backend.hadoop.executionengine.mapReduceLayer.PigTextInputFormat;
+import org.apache.pig.backend.hadoop.executionengine.shims.HadoopShims;
 import org.apache.pig.bzip2r.Bzip2TextInputFormat;
 import org.apache.pig.data.DataBag;
 import org.apache.pig.data.DataByteArray;
@@ -51,6 +55,12 @@ public class TextLoader extends LoadFunc
     protected RecordReader in = null;
     private TupleFactory mTupleFactory = TupleFactory.getInstance();
     private String loadLocation;
+    protected final Log mLog = LogFactory.getLog(getClass());
+
+    // it determines whether to depend on pig's own Bzip2TextInputFormat or
+    // to simply depend on hadoop for handling bzip2 inputs
+    private boolean bzipinput_usehadoops ;
+
 
     @Override
     public Tuple getNext() throws IOException {
@@ -248,9 +258,13 @@ public class TextLoader extends LoadFunc
 
     @Override
     public InputFormat getInputFormat() {
-        if(loadLocation.endsWith(".bz2") || loadLocation.endsWith(".bz")) {
+        if((loadLocation.endsWith(".bz2") || loadLocation.endsWith(".bz"))
+           && !HadoopShims.isHadoopYARN()
+           && !bzipinput_usehadoops ) {
+            mLog.info("Using Bzip2TextInputFormat");
             return new Bzip2TextInputFormat();
         } else {
+            mLog.info("Using PigTextInputFormat");
             return new PigTextInputFormat();
         }
     }
@@ -269,5 +283,8 @@ public class TextLoader extends LoadFunc
     public void setLocation(String location, Job job) throws IOException {
         loadLocation = location;
         FileInputFormat.setInputPaths(job, location);
+        bzipinput_usehadoops = job.getConfiguration().getBoolean(
+                                  
PigConfiguration.PIG_BZIP_USE_HADOOP_INPUTFORMAT,
+                                  true );
     }
 }

Modified: pig/branches/spark/src/org/apache/pig/builtin/ToDate.java
URL: 
http://svn.apache.org/viewvc/pig/branches/spark/src/org/apache/pig/builtin/ToDate.java?rev=1733627&r1=1733626&r2=1733627&view=diff
==============================================================================
--- pig/branches/spark/src/org/apache/pig/builtin/ToDate.java (original)
+++ pig/branches/spark/src/org/apache/pig/builtin/ToDate.java Fri Mar  4 
18:17:39 2016
@@ -117,15 +117,45 @@ public class ToDate extends EvalFunc<Dat
     }
 
     public static DateTimeZone extractDateTimeZone(String dtStr) {
-        return isoDateTimeFormatter.parseDateTime(dtStr).getZone();
+        return 
isoDateTimeFormatter.parseDateTime(allowIso8601Space(dtStr)).getZone();
     }
 
     public static DateTime extractDateTime(String dtStr) {
-        return isoDateTimeFormatter.parseDateTime(dtStr);
+        return isoDateTimeFormatter.parseDateTime(allowIso8601Space(dtStr));
+    }
+    
+       /*
+        * ISO-8601 format and JDBC timestamp format are similar but not the 
same.
+        * 
+        * Strict ISO-8601 specifies a 'T' between the date portion and
+        * the time portion:
+        *   2015-05-29T10:41:30.123
+        *   
+        * ISO-8601 allows a space instead of a 'T' as a looser variant. 
+        * This variant is often adopted because it increases human 
readability. 
+        * The JDBC timestamp format uses the ' ' space variant. 
+        *   2015-05-29 10:41:30.123
+        * 
+        * Hive & Impala are database-oriented and generate JDBC timestamps
+        * with a ' ' space. 
+        * 
+        * We would like to accept both 'T' and ' ' space formats. 
+        * 
+        * org.joda.time.format.ISODateTimeFormatter requires the 'T'. 
+        * The cleanest way to get joda-time to accept both is to convert
+        * the ' ' space to a a 'T' before feeding the string to the
+        * ISODateTimeFormatter. 
+        */
+    private static String allowIso8601Space(String dtStr) {
+       if (dtStr == null || dtStr.length() <= 10 || dtStr.charAt(10) != ' ') {
+               return dtStr;
+       }
+       return dtStr.substring(0, 10) + 'T' + dtStr.substring(11);
     }
 
     @Override
     public boolean allowCompileTimeCalculation() {
         return true;
     }
+    
 }

Modified: 
pig/branches/spark/src/org/apache/pig/builtin/Utf8StorageConverter.java
URL: 
http://svn.apache.org/viewvc/pig/branches/spark/src/org/apache/pig/builtin/Utf8StorageConverter.java?rev=1733627&r1=1733626&r2=1733627&view=diff
==============================================================================
--- pig/branches/spark/src/org/apache/pig/builtin/Utf8StorageConverter.java 
(original)
+++ pig/branches/spark/src/org/apache/pig/builtin/Utf8StorageConverter.java Fri 
Mar  4 18:17:39 2016
@@ -321,6 +321,7 @@ public class Utf8StorageConverter implem
             break;
         case DataType.BIGDECIMAL:
             field = bytesToBigDecimal(b);
+            break;
         case DataType.DATETIME:
             field = bytesToDateTime(b);
             break;

Modified: pig/branches/spark/src/org/apache/pig/builtin/VALUELIST.java
URL: 
http://svn.apache.org/viewvc/pig/branches/spark/src/org/apache/pig/builtin/VALUELIST.java?rev=1733627&r1=1733626&r2=1733627&view=diff
==============================================================================
--- pig/branches/spark/src/org/apache/pig/builtin/VALUELIST.java (original)
+++ pig/branches/spark/src/org/apache/pig/builtin/VALUELIST.java Fri Mar  4 
18:17:39 2016
@@ -105,7 +105,7 @@ public class VALUELIST extends EvalFunc<
                 throw new RuntimeException(fe);
             }
             if(fs != null) {
-                innerFieldSchema = new Schema.FieldSchema(null, fs.type);
+                innerFieldSchema = new Schema.FieldSchema(null, new 
Schema(fs));
             }
         } else {
             innerFieldSchema = new Schema.FieldSchema(null, 
DataType.BYTEARRAY);

Modified: pig/branches/spark/src/org/apache/pig/builtin/VALUESET.java
URL: 
http://svn.apache.org/viewvc/pig/branches/spark/src/org/apache/pig/builtin/VALUESET.java?rev=1733627&r1=1733626&r2=1733627&view=diff
==============================================================================
--- pig/branches/spark/src/org/apache/pig/builtin/VALUESET.java (original)
+++ pig/branches/spark/src/org/apache/pig/builtin/VALUESET.java Fri Mar  4 
18:17:39 2016
@@ -118,7 +118,7 @@ public class VALUESET extends EvalFunc<D
                 throw new RuntimeException(fe);
             }
             if (fs != null) {
-                innerFieldSchema = new Schema.FieldSchema(null, fs.type);
+                innerFieldSchema = new Schema.FieldSchema(null, new 
Schema(fs));
             }
         } else {
             innerFieldSchema = new Schema.FieldSchema(null, 
DataType.BYTEARRAY);

Modified: pig/branches/spark/src/org/apache/pig/builtin/mock/Storage.java
URL: 
http://svn.apache.org/viewvc/pig/branches/spark/src/org/apache/pig/builtin/mock/Storage.java?rev=1733627&r1=1733626&r2=1733627&view=diff
==============================================================================
--- pig/branches/spark/src/org/apache/pig/builtin/mock/Storage.java (original)
+++ pig/branches/spark/src/org/apache/pig/builtin/mock/Storage.java Fri Mar  4 
18:17:39 2016
@@ -33,6 +33,7 @@ import java.util.Map;
 import java.util.Map.Entry;
 import java.util.Properties;
 import java.util.Set;
+import java.util.TreeMap;
 
 import org.apache.hadoop.fs.Path;
 import org.apache.hadoop.io.Writable;
@@ -74,7 +75,9 @@ import org.apache.pig.parser.ParserExcep
  *      data.set("foo",
  *      tuple("a"),
  *      tuple("b"),
- *      tuple("c")
+ *      tuple("c"),
+ *      tuple(map("d","e", "f","g")),
+ *      tuple(bag(tuple("h"),tuple("i")))
  *      );
  *
  *  pigServer.registerQuery("A = LOAD 'foo' USING mock.Storage();");
@@ -85,6 +88,8 @@ import org.apache.pig.parser.ParserExcep
  *  assertEquals(tuple("a"), out.get(0));
  *  assertEquals(tuple("b"), out.get(1));
  *  assertEquals(tuple("c"), out.get(2));
+ *  assertEquals(tuple(map("f", "g", "d", "e" )), out.get(3));
+ *  assertEquals(tuple(bag(tuple("h"),tuple("i"))), out.get(4));
  * </pre>
  * With Schema:
  *  <pre>
@@ -102,7 +107,7 @@ import org.apache.pig.parser.ParserExcep
  *  pigServer.registerQuery("STORE B INTO 'bar' USING mock.Storage();");
  *
  *  assertEquals(schema("a:chararray,b:chararray"), data.getSchema("bar"));
- *  
+ *
  *  List<Tuple> out = data.get("bar");
  *  assertEquals(tuple("a", "a"), out.get(0));
  *  assertEquals(tuple("b", "b"), out.get(1));
@@ -132,7 +137,37 @@ public class Storage extends LoadFunc im
   public static DataBag bag(Tuple... tuples) {
     return new NonSpillableDataBag(Arrays.asList(tuples));
   }
-  
+
+  /**
+   * @param input These params are alternating "key", "value". So the number 
of params MUST be even !!
+   * Implementation is very similar to the TOMAP UDF.
+   * So map("A", B, "C", D) generates a map "A"->B, "C"->D
+   * @return a map containing the provided objects
+   */
+  public static Map<String, Object> map(Object... input) {
+    if (input == null || input.length < 2) {
+      return null;
+    }
+
+    try {
+      Map<String, Object> output = new HashMap<String, Object>();
+
+      for (int i = 0; i < input.length; i=i+2) {
+        String key = (String)input[i];
+        Object val = input[i+1];
+        output.put(key, val);
+      }
+
+      return output;
+    } catch (ClassCastException e){
+      throw new IllegalArgumentException("Map key must be a String");
+    } catch (ArrayIndexOutOfBoundsException e){
+      throw new IllegalArgumentException("Function input must have even number 
of parameters");
+    } catch (Exception e) {
+      throw new RuntimeException("Error while creating a map", e);
+    }
+  }
+
   /**
    * @param schema
    * @return the schema represented by the string
@@ -193,7 +228,8 @@ public class Storage extends LoadFunc im
 
   private static class Parts {
     final String location;
-    final Map<String, Collection<Tuple>> parts = new HashMap<String, 
Collection<Tuple>>();
+    // TreeMap to read part files in order
+    final Map<String, Collection<Tuple>> parts = new TreeMap<String, 
Collection<Tuple>>();
 
     public Parts(String location) {
       super();
@@ -216,7 +252,7 @@ public class Storage extends LoadFunc im
     }
 
   }
-  
+
   /**
    * An isolated data store to avoid side effects
    *
@@ -249,7 +285,7 @@ public class Storage extends LoadFunc im
     public void set(String location, String schema, Tuple... data) throws 
ParserException {
       set(location, Utils.getSchemaFromString(schema), Arrays.asList(data));
     }
-    
+
     /**
      * to set the data in a location with a known schema
      *
@@ -316,7 +352,7 @@ public class Storage extends LoadFunc im
     public void set(String location, Tuple... data) {
         set(location, Arrays.asList(data));
     }
-    
+
     /**
      *
      * @param location
@@ -330,7 +366,7 @@ public class Storage extends LoadFunc im
     }
 
     /**
-     * 
+     *
      * @param location
      * @return the schema stored in this location
      */
@@ -352,7 +388,7 @@ public class Storage extends LoadFunc im
   private String location;
 
   private Data data;
-  
+
   private Schema schema;
 
   private Iterator<Tuple> dataBeingRead;
@@ -403,9 +439,9 @@ private MockRecordWriter mockRecordWrite
   public void setUDFContextSignature(String signature) {
     super.setUDFContextSignature(signature);
   }
-  
+
   // LoadMetaData
-  
+
   @Override
   public ResourceSchema getSchema(String location, Job job) throws IOException 
{
        init(location, job);
@@ -477,7 +513,7 @@ private MockRecordWriter mockRecordWrite
   }
 
   // StoreMetaData
-  
+
   @Override
   public void storeStatistics(ResourceStatistics stats, String location, Job 
job)
                throws IOException {
@@ -490,7 +526,7 @@ private MockRecordWriter mockRecordWrite
        init(location, job);
        data.setSchema(location, Schema.getPigSchema(schema));
   }
-  
+
   // Mocks for LoadFunc
 
   private static class MockRecordReader extends RecordReader<Object, Object> {

Modified: pig/branches/spark/src/org/apache/pig/data/BinInterSedes.java
URL: 
http://svn.apache.org/viewvc/pig/branches/spark/src/org/apache/pig/data/BinInterSedes.java?rev=1733627&r1=1733626&r2=1733627&view=diff
==============================================================================
--- pig/branches/spark/src/org/apache/pig/data/BinInterSedes.java (original)
+++ pig/branches/spark/src/org/apache/pig/data/BinInterSedes.java Fri Mar  4 
18:17:39 2016
@@ -1072,7 +1072,7 @@ public class BinInterSedes implements In
                         // we have a compound tuple key (main_key, 
secondary_key). Each key has its own sort order, so
                         // we have to deal with them separately. We delegate 
it to the first invocation of
                         // compareDatum()
-                        assert (tsz1 == 3); // main_key, secondary_key, value
+                        assert (tsz1 == 2); // main_key, secondary_key
                         result = compareDatum(t1.get(0), t2.get(0), mAsc);
                         if (result == 0)
                             result = compareDatum(t1.get(1), t2.get(1), 
mSecondaryAsc);

Modified: pig/branches/spark/src/org/apache/pig/data/DataReaderWriter.java
URL: 
http://svn.apache.org/viewvc/pig/branches/spark/src/org/apache/pig/data/DataReaderWriter.java?rev=1733627&r1=1733626&r2=1733627&view=diff
==============================================================================
--- pig/branches/spark/src/org/apache/pig/data/DataReaderWriter.java (original)
+++ pig/branches/spark/src/org/apache/pig/data/DataReaderWriter.java Fri Mar  4 
18:17:39 2016
@@ -174,10 +174,14 @@ public class DataReaderWriter {
                 return Double.valueOf(in.readDouble());
 
             case DataType.BIGINTEGER:
-                return new BigInteger(((DataByteArray)readDatum(in, 
in.readByte())).get());
+                byte[] bigIntegerByteArray = new byte[in.readInt()];
+                in.readFully(bigIntegerByteArray);
+                return new BigInteger(bigIntegerByteArray);
 
             case DataType.BIGDECIMAL:
-                return new BigDecimal((String)readDatum(in, in.readByte()));
+                byte[] bt = new byte[in.readInt()];
+                in.readFully(bt);
+                return new BigDecimal(new String(bt, DataReaderWriter.UTF8));
 
             case DataType.BOOLEAN:
                 return Boolean.valueOf(in.readBoolean());
@@ -315,12 +319,16 @@ public class DataReaderWriter {
 
             case DataType.BIGINTEGER:
                 out.writeByte(DataType.BIGINTEGER);
-                writeDatum(out, ((BigInteger)val).toByteArray());
+                byte[] bytes = ((BigInteger)val).toByteArray();
+                out.writeInt(bytes.length);
+                out.write(bytes);
                 break;
 
             case DataType.BIGDECIMAL:
                 out.writeByte(DataType.BIGDECIMAL);
-                writeDatum(out, ((BigDecimal)val).toString());
+                byte[] bt =  
((BigDecimal)val).toString().getBytes(DataReaderWriter.UTF8);
+                out.writeInt(bt.length);
+                out.write(bt);
                 break;
 
             case DataType.CHARARRAY: {

Modified: pig/branches/spark/src/org/apache/pig/data/DataType.java
URL: 
http://svn.apache.org/viewvc/pig/branches/spark/src/org/apache/pig/data/DataType.java?rev=1733627&r1=1733626&r2=1733627&view=diff
==============================================================================
--- pig/branches/spark/src/org/apache/pig/data/DataType.java (original)
+++ pig/branches/spark/src/org/apache/pig/data/DataType.java Fri Mar  4 
18:17:39 2016
@@ -1126,7 +1126,7 @@ public class DataType {
             case UNKNOWN:
             default:
                 int errCode = 1071;
-                String msg = "Cannot convert a " + findTypeName(o) + " to a 
Boolean";
+                String msg = "Cannot convert a " + findTypeName(o) + " to a 
DateTime";
                 throw new ExecException(msg, errCode, PigException.INPUT);
             }
         } catch (ClassCastException cce) {
@@ -1135,11 +1135,11 @@ public class DataType {
             throw ee;
         } catch (NumberFormatException nfe) {
             int errCode = 1074;
-            String msg = "Problem with formatting. Could not convert " + o + " 
to Float.";
+            String msg = "Problem with formatting. Could not convert " + o + " 
to DateTime.";
             throw new ExecException(msg, errCode, PigException.INPUT, nfe);
         } catch (Exception e) {
             int errCode = 2054;
-            String msg = "Internal error. Could not convert " + o + " to 
Float.";
+            String msg = "Internal error. Could not convert " + o + " to 
DateTime.";
             throw new ExecException(msg, errCode, PigException.BUG);
         }
     }

Modified: pig/branches/spark/src/org/apache/pig/data/SchemaTuple.java
URL: 
http://svn.apache.org/viewvc/pig/branches/spark/src/org/apache/pig/data/SchemaTuple.java?rev=1733627&r1=1733626&r2=1733627&view=diff
==============================================================================
--- pig/branches/spark/src/org/apache/pig/data/SchemaTuple.java (original)
+++ pig/branches/spark/src/org/apache/pig/data/SchemaTuple.java Fri Mar  4 
18:17:39 2016
@@ -26,6 +26,8 @@ import java.util.Iterator;
 import java.util.List;
 import java.util.Map;
 
+import org.apache.commons.logging.Log;
+import org.apache.commons.logging.LogFactory;
 import org.joda.time.DateTime;
 import org.joda.time.DateTimeZone;
 
@@ -37,7 +39,6 @@ import org.apache.pig.data.utils.MethodH
 import org.apache.pig.data.utils.SedesHelper;
 import org.apache.pig.impl.logicalLayer.schema.Schema;
 import org.apache.pig.impl.util.ObjectSerializer;
-import org.mortbay.log.Log;
 
 import com.google.common.collect.Lists;
 
@@ -52,6 +53,7 @@ import com.google.common.collect.Lists;
 @InterfaceAudience.Public
 @InterfaceStability.Unstable
 public abstract class SchemaTuple<T extends SchemaTuple<T>> extends 
AbstractTuple implements TypeAwareTuple {
+    private static final Log LOG = LogFactory.getLog(SchemaTuple.class);
 
     private static final long serialVersionUID = 1L;
     private static final int ONE_MINUTE = 60000;
@@ -924,7 +926,7 @@ public abstract class SchemaTuple<T exte
     protected static Schema staticSchemaGen(String s) {
         try {
             if (s.equals("")) {
-                Log.warn("No Schema present in SchemaTuple generated class");
+                LOG.warn("No Schema present in SchemaTuple generated class");
                 return new Schema();
             }
             return (Schema) ObjectSerializer.deserialize(s);

Modified: pig/branches/spark/src/org/apache/pig/impl/PigContext.java
URL: 
http://svn.apache.org/viewvc/pig/branches/spark/src/org/apache/pig/impl/PigContext.java?rev=1733627&r1=1733626&r2=1733627&view=diff
==============================================================================
--- pig/branches/spark/src/org/apache/pig/impl/PigContext.java (original)
+++ pig/branches/spark/src/org/apache/pig/impl/PigContext.java Fri Mar  4 
18:17:39 2016
@@ -57,9 +57,7 @@ import org.apache.log4j.Level;
 import org.apache.pig.ExecType;
 import org.apache.pig.ExecTypeProvider;
 import org.apache.pig.FuncSpec;
-import org.apache.pig.JVMReuseManager;
 import org.apache.pig.PigException;
-import org.apache.pig.StaticDataCleanup;
 import org.apache.pig.backend.datastorage.DataStorage;
 import org.apache.pig.backend.datastorage.DataStorageException;
 import org.apache.pig.backend.datastorage.ElementDescriptor;
@@ -177,11 +175,7 @@ public class PigContext implements Seria
     // List of paths skipped for automatic shipping
     List<String> skippedShipPaths = new ArrayList<String>();
 
-    static {
-        
JVMReuseManager.getInstance().registerForStaticDataCleanup(PigContext.class);
-    }
-
-    @StaticDataCleanup
+    //@StaticDataCleanup
     public static void staticDataCleanup() {
         packageImportList.set(null);
     }

Modified: pig/branches/spark/src/org/apache/pig/impl/builtin/GFCross.java
URL: 
http://svn.apache.org/viewvc/pig/branches/spark/src/org/apache/pig/impl/builtin/GFCross.java?rev=1733627&r1=1733626&r2=1733627&view=diff
==============================================================================
--- pig/branches/spark/src/org/apache/pig/impl/builtin/GFCross.java (original)
+++ pig/branches/spark/src/org/apache/pig/impl/builtin/GFCross.java Fri Mar  4 
18:17:39 2016
@@ -18,8 +18,11 @@
 package org.apache.pig.impl.builtin;
 
 import java.io.IOException;
+import java.text.MessageFormat;
 import java.util.Random;
 
+import org.apache.commons.logging.Log;
+import org.apache.commons.logging.LogFactory;
 import org.apache.hadoop.conf.Configuration;
 import org.apache.pig.EvalFunc;
 import org.apache.pig.backend.executionengine.ExecException;
@@ -32,6 +35,9 @@ import org.apache.pig.impl.util.UDFConte
 
 
 public class GFCross extends EvalFunc<DataBag> {
+
+    private static final Log LOG = LogFactory.getLog(GFCross.class);
+
     private int numInputs, myNumber, numGroupsPerInput, numGroupsGoingTo;
     private BagFactory mBagFactory = BagFactory.getInstance();
     private TupleFactory mTupleFactory = TupleFactory.getInstance();
@@ -70,6 +76,12 @@ public class GFCross extends EvalFunc<Da
 
             numGroupsPerInput = (int) Math.ceil(Math.pow(parallelism, 
1.0/numInputs));
             numGroupsGoingTo = (int) Math.pow(numGroupsPerInput,numInputs - 1);
+
+            LOG.info(MessageFormat.format("Parallelism = {0}, numInputs = {1}, 
myNumber = {2},"
+                            + " numGroupsPerInput = {3}, numGroupsGoingTo = 
{4}",
+                            parallelism, numInputs, myNumber,
+                            numGroupsPerInput, numGroupsGoingTo));
+
         }
 
         DataBag output = mBagFactory.newDefaultBag();

Modified: pig/branches/spark/src/org/apache/pig/impl/builtin/GetMemNumRows.java
URL: 
http://svn.apache.org/viewvc/pig/branches/spark/src/org/apache/pig/impl/builtin/GetMemNumRows.java?rev=1733627&r1=1733626&r2=1733627&view=diff
==============================================================================
--- pig/branches/spark/src/org/apache/pig/impl/builtin/GetMemNumRows.java 
(original)
+++ pig/branches/spark/src/org/apache/pig/impl/builtin/GetMemNumRows.java Fri 
Mar  4 18:17:39 2016
@@ -21,6 +21,7 @@ import java.io.IOException;
 import java.lang.reflect.Type;
 
 import org.apache.pig.EvalFunc;
+import org.apache.pig.data.SizeUtil;
 import org.apache.pig.data.Tuple;
 import org.apache.pig.data.TupleFactory;
 
@@ -56,6 +57,7 @@ public class GetMemNumRows extends EvalF
         int tSize = in.size();
        if(tSize >=2 && 
            PoissonSampleLoader.NUMROWS_TUPLE_MARKER.equals(in.get(tSize-2)) ){
+           memSize -= 
SizeUtil.getPigObjMemSize(PoissonSampleLoader.NUMROWS_TUPLE_MARKER);
            numRows = (Long)in.get(tSize-1);
        }
        

Modified: 
pig/branches/spark/src/org/apache/pig/impl/builtin/PoissonSampleLoader.java
URL: 
http://svn.apache.org/viewvc/pig/branches/spark/src/org/apache/pig/impl/builtin/PoissonSampleLoader.java?rev=1733627&r1=1733626&r2=1733627&view=diff
==============================================================================
--- pig/branches/spark/src/org/apache/pig/impl/builtin/PoissonSampleLoader.java 
(original)
+++ pig/branches/spark/src/org/apache/pig/impl/builtin/PoissonSampleLoader.java 
Fri Mar  4 18:17:39 2016
@@ -64,6 +64,9 @@ public class PoissonSampleLoader extends
 
     private int sampleRate = DEFAULT_SAMPLE_RATE;
 
+    // total memory in bytes
+    private long totalMemory;
+
     private double heapPerc = PartitionSkewedKeys.DEFAULT_PERCENT_MEMUSAGE;
 
     // new Sample tuple
@@ -89,7 +92,8 @@ public class PoissonSampleLoader extends
             if(t == null) {
                 return createNumRowTuple(null);
             }
-            long availRedMem = (long) (Runtime.getRuntime().maxMemory() * 
heapPerc);
+            long availRedMem = (long) ( totalMemory * heapPerc);
+            // availRedMem = 155084396;
             memToSkipPerSample = availRedMem/sampleRate;
             updateSkipInterval(t);
 
@@ -175,6 +179,10 @@ public class PoissonSampleLoader extends
         sampleRate = 
conf.getInt(PigConfiguration.PIG_POISSON_SAMPLER_SAMPLE_RATE, 
DEFAULT_SAMPLE_RATE);
         heapPerc = 
conf.getFloat(PigConfiguration.PIG_SKEWEDJOIN_REDUCE_MEMUSAGE,
                 PartitionSkewedKeys.DEFAULT_PERCENT_MEMUSAGE);
+        totalMemory = conf.getLong(PigConfiguration.PIG_SKEWEDJOIN_REDUCE_MEM, 
-1L);
+        if (totalMemory == -1) {
+            totalMemory = Runtime.getRuntime().maxMemory();
+        }
     }
 
 }

Modified: pig/branches/spark/src/org/apache/pig/impl/builtin/ReadScalars.java
URL: 
http://svn.apache.org/viewvc/pig/branches/spark/src/org/apache/pig/impl/builtin/ReadScalars.java?rev=1733627&r1=1733626&r2=1733627&view=diff
==============================================================================
--- pig/branches/spark/src/org/apache/pig/impl/builtin/ReadScalars.java 
(original)
+++ pig/branches/spark/src/org/apache/pig/impl/builtin/ReadScalars.java Fri Mar 
 4 18:17:39 2016
@@ -74,7 +74,8 @@ public class ReadScalars extends EvalFun
                     valueLoaded = true;
                     return null;
                 } else if (inputBag.size() > 1) {
-                    String msg = "Scalar has more than one row in the output.";
+                    String msg = "Scalar has more than one row in the output."
+                            +" (common cause: \"JOIN\" then \"FOREACH ... 
GENERATE foo.bar\" should be \"foo::bar\" )";
                     throw new ExecException(msg);
                 }
                 Tuple t1 = inputBag.iterator().next();
@@ -111,7 +112,8 @@ public class ReadScalars extends EvalFun
                 Tuple t2 = loader.getNext();
                 if(t2 != null){
                     String msg = "Scalar has more than one row in the output. "
-                        + "1st : " + t1 + ", 2nd :" + t2;
+                        + "1st : " + t1 + ", 2nd :" + t2
+                        +" (common cause: \"JOIN\" then \"FOREACH ... GENERATE 
foo.bar\" should be \"foo::bar\" )";
                     throw new ExecException(msg);
                 }
                 valueLoaded = true;

Modified: 
pig/branches/spark/src/org/apache/pig/impl/io/NullablePartitionWritable.java
URL: 
http://svn.apache.org/viewvc/pig/branches/spark/src/org/apache/pig/impl/io/NullablePartitionWritable.java?rev=1733627&r1=1733626&r2=1733627&view=diff
==============================================================================
--- 
pig/branches/spark/src/org/apache/pig/impl/io/NullablePartitionWritable.java 
(original)
+++ 
pig/branches/spark/src/org/apache/pig/impl/io/NullablePartitionWritable.java 
Fri Mar  4 18:17:39 2016
@@ -70,9 +70,9 @@ public class NullablePartitionWritable e
 
        @Override
     public void readFields(DataInput in) throws IOException {
-               String c = in.readUTF();
+               byte type = in.readByte();
                try {
-                       key = HDataType.getWritableComparable(c);
+                       key = HDataType.getNewWritableComparable(type);
                } catch(Exception e) {
                        throw new IOException(e);
                }
@@ -81,7 +81,7 @@ public class NullablePartitionWritable e
 
        @Override
     public void write(DataOutput out) throws IOException {
-               out.writeUTF(key.getClass().getName());
+               
out.writeByte(HDataType.findTypeFromClassName(key.getClass().getName()));
                key.write(out);
        }
 

Modified: pig/branches/spark/src/org/apache/pig/impl/io/ReadToEndLoader.java
URL: 
http://svn.apache.org/viewvc/pig/branches/spark/src/org/apache/pig/impl/io/ReadToEndLoader.java?rev=1733627&r1=1733626&r2=1733627&view=diff
==============================================================================
--- pig/branches/spark/src/org/apache/pig/impl/io/ReadToEndLoader.java 
(original)
+++ pig/branches/spark/src/org/apache/pig/impl/io/ReadToEndLoader.java Fri Mar  
4 18:17:39 2016
@@ -196,13 +196,15 @@ public class ReadToEndLoader extends Loa
 
     private boolean initializeReader() throws IOException, 
     InterruptedException {
+        // Close the previous reader first
+        if(reader != null) {
+            reader.close();
+            reader = null;
+        }
         if(curSplitIndex > inpSplits.size() - 1) {
             // past the last split, we are done
             return false;
         }
-        if(reader != null){
-            reader.close();
-        }
         InputSplit curSplit = inpSplits.get(curSplitIndex);
         TaskAttemptContext tAContext = 
HadoopShims.createTaskAttemptContext(conf, 
                 new TaskAttemptID());

Modified: 
pig/branches/spark/src/org/apache/pig/impl/logicalLayer/schema/SchemaUtil.java
URL: 
http://svn.apache.org/viewvc/pig/branches/spark/src/org/apache/pig/impl/logicalLayer/schema/SchemaUtil.java?rev=1733627&r1=1733626&r2=1733627&view=diff
==============================================================================
--- 
pig/branches/spark/src/org/apache/pig/impl/logicalLayer/schema/SchemaUtil.java 
(original)
+++ 
pig/branches/spark/src/org/apache/pig/impl/logicalLayer/schema/SchemaUtil.java 
Fri Mar  4 18:17:39 2016
@@ -263,7 +263,7 @@ public class SchemaUtil {
                 throw new FrontendException(
                         "Currently pig do not support this kind of type using 
Schema:"
                                 + DataType.findTypeName(type)
-                                + ". You can write shema by yourself.");
+                                + ". You can write schema by yourself.");
             }
         }
 
@@ -271,11 +271,10 @@ public class SchemaUtil {
 
     private static void checkParameters(List<String> names, List<Byte> 
dataTypes)
             throws FrontendException {
-        // TODO Auto-generated method stub
         checkDataTypes(dataTypes);
         if (names.size() != dataTypes.size()) {
             throw new FrontendException(
-                    "The number of names is not equal to the number of 
dataTypes");
+                    "The number of names (" + names.size() + ") is not equal 
to the number of dataTypes (" + dataTypes.size() + ")");
         }
     }
 

Modified: pig/branches/spark/src/org/apache/pig/impl/plan/OperatorKey.java
URL: 
http://svn.apache.org/viewvc/pig/branches/spark/src/org/apache/pig/impl/plan/OperatorKey.java?rev=1733627&r1=1733626&r2=1733627&view=diff
==============================================================================
--- pig/branches/spark/src/org/apache/pig/impl/plan/OperatorKey.java (original)
+++ pig/branches/spark/src/org/apache/pig/impl/plan/OperatorKey.java Fri Mar  4 
18:17:39 2016
@@ -89,5 +89,10 @@ public class OperatorKey implements Seri
             NodeIdGenerator.getGenerator().getNextNodeId(scope));
     }
 
+    static public OperatorKey fromString(String op) {
+        String scope = op.substring(0, op.indexOf("-"));
+        long id = Long.parseLong(op.substring(op.indexOf("-")+1));
+        return new OperatorKey(scope, id);
+    }
 
 }

Modified: 
pig/branches/spark/src/org/apache/pig/impl/streaming/OutputHandler.java
URL: 
http://svn.apache.org/viewvc/pig/branches/spark/src/org/apache/pig/impl/streaming/OutputHandler.java?rev=1733627&r1=1733626&r2=1733627&view=diff
==============================================================================
--- pig/branches/spark/src/org/apache/pig/impl/streaming/OutputHandler.java 
(original)
+++ pig/branches/spark/src/org/apache/pig/impl/streaming/OutputHandler.java Fri 
Mar  4 18:17:39 2016
@@ -153,9 +153,15 @@ public abstract class OutputHandler {
             recordDelimLength = recordDelimBa.length - 1; //Ignore trailing \n
             recordDelimStr = new String(recordDelimBa, 0, recordDelimLength,  
Charsets.UTF_8);
         }
-        if (recordDelimLength == 0 || currValue.getLength() < 
recordDelimLength) {
+
+        if (recordDelimLength == 0) {
             return true;
         }
+        //If our current section is less than the delim length, then its not 
the end of the row.
+        if (currValue.getLength() < recordDelimLength) {
+            return false;
+        }
+
         return currValue.find(recordDelimStr, currValue.getLength() - 
recordDelimLength) >= 0;
     }
     

Modified: 
pig/branches/spark/src/org/apache/pig/impl/streaming/PigStreamingUDF.java
URL: 
http://svn.apache.org/viewvc/pig/branches/spark/src/org/apache/pig/impl/streaming/PigStreamingUDF.java?rev=1733627&r1=1733626&r2=1733627&view=diff
==============================================================================
--- pig/branches/spark/src/org/apache/pig/impl/streaming/PigStreamingUDF.java 
(original)
+++ pig/branches/spark/src/org/apache/pig/impl/streaming/PigStreamingUDF.java 
Fri Mar  4 18:17:39 2016
@@ -202,7 +202,7 @@ public class PigStreamingUDF extends Pig
 
             if (StreamingDelimiters.isDelimiter(DELIMS.getFieldDelim(), buf, 
index, depth, endIndex)) {
                 val = extractString(buf, fieldStart, index - 1, true);
-                map.put(key, val);
+                if (key != null) map.put(key, val);
                 fieldStart = index + 3;
             }
         }

Modified: pig/branches/spark/src/org/apache/pig/impl/util/CompilerUtils.java
URL: 
http://svn.apache.org/viewvc/pig/branches/spark/src/org/apache/pig/impl/util/CompilerUtils.java?rev=1733627&r1=1733626&r2=1733627&view=diff
==============================================================================
--- pig/branches/spark/src/org/apache/pig/impl/util/CompilerUtils.java 
(original)
+++ pig/branches/spark/src/org/apache/pig/impl/util/CompilerUtils.java Fri Mar  
4 18:17:39 2016
@@ -21,10 +21,11 @@ package org.apache.pig.impl.util;
 import java.util.ArrayList;
 import java.util.List;
 
-
 import org.apache.pig.EvalFunc;
 import org.apache.pig.FuncSpec;
 import 
org.apache.pig.backend.hadoop.executionengine.physicalLayer.expressionOperators.ConstantExpression;
+import 
org.apache.pig.backend.hadoop.executionengine.physicalLayer.expressionOperators.ExpressionOperator;
+import 
org.apache.pig.backend.hadoop.executionengine.physicalLayer.expressionOperators.POAnd;
 import 
org.apache.pig.backend.hadoop.executionengine.physicalLayer.expressionOperators.POBinCond;
 import 
org.apache.pig.backend.hadoop.executionengine.physicalLayer.expressionOperators.POProject;
 import 
org.apache.pig.backend.hadoop.executionengine.physicalLayer.expressionOperators.POUserFunc;
@@ -41,30 +42,43 @@ import org.apache.pig.impl.plan.NodeIdGe
 import org.apache.pig.impl.plan.OperatorKey;
 import org.apache.pig.impl.plan.PlanException;
 
-/* 
+/*
  * A class to add util functions that gets used by LogToPhyTranslator and 
MRCompiler
- * 
+ *
  */
 public class CompilerUtils {
 
-    public static void addEmptyBagOuterJoin(PhysicalPlan fePlan, Schema 
inputSchema) throws PlanException {
+    public static void addEmptyBagOuterJoin(PhysicalPlan fePlan, Schema 
inputSchema,
+            boolean skewedRightOuterJoin, String isFirstReduceOfKeyClassName) 
throws PlanException {
         // we currently have POProject[bag] as the only operator in the plan
         // If the bag is an empty bag, we should replace
         // it with a bag with one tuple with null fields so that when we 
flatten
         // we do not drop records (flatten will drop records if the bag is left
-        // as an empty bag) and actually project nulls for the fields in 
+        // as an empty bag) and actually project nulls for the fields in
         // the empty bag
-        
+
         // So we need to get to the following state:
         // POProject[Bag]
-        //         \     
-        //    POUserFunc["IsEmpty()"] Const[Bag](bag with null fields)   
-        //                        \      |    POProject[Bag]             
+        //         \
+        //    POUserFunc["IsEmpty()"] Const[Bag](bag with null fields)
+        //                        \      |    POProject[Bag]
+        //                         \     |    /
+        //                          POBinCond
+        // Further, if it is skewed right outer join, only the first reduce of 
the key
+        // will generate tuple with null fields (See PIG-4377)
+        //
+        // POProject[key]              POProject[Bag]
+        //         \                      /
+        //      IsFirstReduceOfKey  POUserFunc["IsEmpty()"]
+        //                   \        /
+        //                    \      /
+        //                       AND  Const[Bag](bag with null fields)
+        //                        \      |    POProject[Bag]
         //                         \     |    /
         //                          POBinCond
         POProject relationProject = (POProject) fePlan.getRoots().get(0);
         try {
-            
+
             // condition of the bincond
             POProject relationProjectForIsEmpty = relationProject.clone();
             fePlan.add(relationProjectForIsEmpty);
@@ -76,7 +90,36 @@ public class CompilerUtils {
             isEmpty.setResultType(DataType.BOOLEAN);
             fePlan.add(isEmpty);
             fePlan.connect(relationProjectForIsEmpty, isEmpty);
-            
+
+            ExpressionOperator cond;
+            if (skewedRightOuterJoin) {
+                POProject projectForKey = new POProject(new 
OperatorKey(scope,NodeIdGenerator.getGenerator().getNextNodeId(scope)));
+                projectForKey.setColumn(0);
+                projectForKey.setOverloaded(false);
+                projectForKey.setResultType(inputSchema.getField(0).type);
+
+                POAnd and = new POAnd(new OperatorKey(scope, 
NodeIdGenerator.getGenerator().
+                        getNextNodeId(scope)));
+                FuncSpec isFirstReduceOfKeySpec = new 
FuncSpec(isFirstReduceOfKeyClassName);
+                Object f1 = 
PigContext.instantiateFuncFromSpec(isFirstReduceOfKeySpec);
+                POUserFunc isFirstReduceOfKey = new POUserFunc(new 
OperatorKey(scope, NodeIdGenerator.getGenerator().
+                            getNextNodeId(scope)), -1, null, 
isFirstReduceOfKeySpec, (EvalFunc) f1);
+
+                fePlan.add(projectForKey);
+                fePlan.add(isFirstReduceOfKey);
+                fePlan.add(and);
+
+                fePlan.connect(projectForKey, isFirstReduceOfKey);
+                fePlan.connect(isFirstReduceOfKey, and);
+                fePlan.connect(isEmpty, and);
+                and.setLhs(isFirstReduceOfKey);
+                and.setRhs(isEmpty);
+
+                cond = and;
+            } else {
+                cond = isEmpty;
+            }
+
             // lhs of bincond (const bag with null fields)
             ConstantExpression ce = new ConstantExpression(new 
OperatorKey(scope,
                     NodeIdGenerator.getGenerator().getNextNodeId(scope)));
@@ -93,25 +136,25 @@ public class CompilerUtils {
             ce.setResultType(DataType.BAG);
             //this operator doesn't have any predecessors
             fePlan.add(ce);
-            
+
             //rhs of bincond is the original project
             // let's set up the bincond now
             POBinCond bincond = new POBinCond(new OperatorKey(scope,
                     NodeIdGenerator.getGenerator().getNextNodeId(scope)));
-            bincond.setCond(isEmpty);
+            bincond.setCond(cond);
             bincond.setLhs(ce);
             bincond.setRhs(relationProject);
             bincond.setResultType(DataType.BAG);
             fePlan.add(bincond);
 
-            fePlan.connect(isEmpty, bincond);
+            fePlan.connect(cond, bincond);
             fePlan.connect(ce, bincond);
             fePlan.connect(relationProject, bincond);
 
         } catch (Exception e) {
             throw new PlanException("Error setting up outerjoin", e);
         }
-       
+
     }
 
 }

Modified: pig/branches/spark/src/org/apache/pig/impl/util/PropertiesUtil.java
URL: 
http://svn.apache.org/viewvc/pig/branches/spark/src/org/apache/pig/impl/util/PropertiesUtil.java?rev=1733627&r1=1733626&r2=1733627&view=diff
==============================================================================
--- pig/branches/spark/src/org/apache/pig/impl/util/PropertiesUtil.java 
(original)
+++ pig/branches/spark/src/org/apache/pig/impl/util/PropertiesUtil.java Fri Mar 
 4 18:17:39 2016
@@ -22,6 +22,7 @@ import java.io.BufferedInputStream;
 import java.io.File;
 import java.io.FileInputStream;
 import java.io.InputStream;
+import java.util.Map.Entry;
 import java.util.Properties;
 
 import org.apache.commons.logging.Log;
@@ -45,7 +46,7 @@ public class PropertiesUtil {
         loadPropertiesFromClasspath(properties, DEFAULT_PROPERTIES_FILE);
         loadPropertiesFromClasspath(properties, PROPERTIES_FILE);
         setDefaultsIfUnset(properties);
-        
+
         //Now set these as system properties only if they are not already 
defined.
         if (log.isDebugEnabled()) {
             for (Object o: properties.keySet()) {
@@ -61,7 +62,13 @@ public class PropertiesUtil {
 
                // Add System properties which include command line overrides
                // Any existing keys will be overridden
-               properties.putAll(System.getProperties());
+        for (Entry<Object, Object> entry : System.getProperties().entrySet()) {
+            String key = (String) entry.getKey();
+            if (key.startsWith("sun.") || key.startsWith("java.")) {
+                continue;
+            }
+            properties.put(key, entry.getValue());
+        }
 
                // For telling error fast when there are problems
                ConfigurationValidator.validatePigProperties(properties) ;
@@ -150,7 +157,7 @@ public class PropertiesUtil {
             properties.setProperty(PigConfiguration.PIG_OPT_FETCH, ""+true);
         }
     }
-    
+
     /**
      * Loads default properties.
      * @return default properties

Modified: 
pig/branches/spark/src/org/apache/pig/impl/util/SpillableMemoryManager.java
URL: 
http://svn.apache.org/viewvc/pig/branches/spark/src/org/apache/pig/impl/util/SpillableMemoryManager.java?rev=1733627&r1=1733626&r2=1733627&view=diff
==============================================================================
--- pig/branches/spark/src/org/apache/pig/impl/util/SpillableMemoryManager.java 
(original)
+++ pig/branches/spark/src/org/apache/pig/impl/util/SpillableMemoryManager.java 
Fri Mar  4 18:17:39 2016
@@ -48,9 +48,13 @@ import org.apache.commons.logging.LogFac
  */
 public class SpillableMemoryManager implements NotificationListener {
 
-    private final Log log = LogFactory.getLog(getClass());
+    private static final Log log = 
LogFactory.getLog(SpillableMemoryManager.class);
 
-    LinkedList<WeakReference<Spillable>> spillables = new 
LinkedList<WeakReference<Spillable>>();
+    private LinkedList<WeakReference<Spillable>> spillables = new 
LinkedList<WeakReference<Spillable>>();
+    // References to spillables with size
+    private LinkedList<SpillablePtr> spillablesSR = null;
+
+    private Object spillLock = new Object();
 
     // if we freed at least this much, invoke GC
     // (default 40 MB - this can be overridden by user supplied property)
@@ -62,15 +66,15 @@ public class SpillableMemoryManager impl
 
     // this will keep track of memory freed across spills
     // and between GC invocations
-    private static long accumulatedFreeSize = 0L;
+    private long accumulatedFreeSize = 0L;
 
     // fraction of biggest heap for which we want to get
     // "memory usage threshold exceeded" notifications
-    private static double memoryThresholdFraction = 0.7;
+    private double memoryThresholdFraction = 0.7;
 
     // fraction of biggest heap for which we want to get
     // "collection threshold exceeded" notifications
-    private static double collectionMemoryThresholdFraction = 0.5;
+    private double collectionMemoryThresholdFraction = 0.5;
 
     // log notification on usage threshold exceeded only the first time
     private boolean firstUsageThreshExceededLogged = false;
@@ -80,10 +84,18 @@ public class SpillableMemoryManager impl
 
     // fraction of the total heap used for the threshold to determine
     // if we want to perform an extra gc before the spill
-    private static double extraGCThresholdFraction = 0.05;
-    private static long extraGCSpillSizeThreshold  = 0L;
+    private double extraGCThresholdFraction = 0.05;
+    private long extraGCSpillSizeThreshold  = 0L;
+
+    private volatile boolean blockRegisterOnSpill = false;
+
+    private static final SpillableMemoryManager manager = new 
SpillableMemoryManager();
 
-    private static volatile SpillableMemoryManager manager;
+    //@StaticDataCleanup
+    public static void staticDataCleanup() {
+        manager.spillables.clear();
+        manager.accumulatedFreeSize = 0L;
+    }
 
     private SpillableMemoryManager() {
         
((NotificationEmitter)ManagementFactory.getMemoryMXBean()).addNotificationListener(this,
 null, null);
@@ -129,9 +141,6 @@ public class SpillableMemoryManager impl
     }
 
     public static SpillableMemoryManager getInstance() {
-        if (manager == null) {
-            manager = new SpillableMemoryManager();
-        }
         return manager;
     }
 
@@ -187,119 +196,136 @@ public class SpillableMemoryManager impl
             }
 
         }
-        clearSpillables();
         if (toFree < 0) {
             log.debug("low memory handler returning " +
                 "because there is nothing to free");
             return;
         }
-        synchronized(spillables) {
-            Collections.sort(spillables, new 
Comparator<WeakReference<Spillable>>() {
 
-                /**
-                 * We don't lock anything, so this sort may not be stable if a 
WeakReference suddenly
-                 * becomes null, but it will be close enough.
-                 * Also between the time we sort and we use these spillables, 
they
-                 * may actually change in size - so this is just best effort
-                 */
-                @Override
-                public int compare(WeakReference<Spillable> o1Ref, 
WeakReference<Spillable> o2Ref) {
-                    Spillable o1 = o1Ref.get();
-                    Spillable o2 = o2Ref.get();
-                    if (o1 == null && o2 == null) {
-                        return 0;
-                    }
-                    if (o1 == null) {
-                        return 1;
+        // Use a separate spillLock to block multiple handleNotification calls
+        synchronized (spillLock) {
+            synchronized(spillables) {
+                spillablesSR = new LinkedList<SpillablePtr>();
+                for (Iterator<WeakReference<Spillable>> i = 
spillables.iterator(); i.hasNext();) {
+                    Spillable s = i.next().get();
+                    if (s == null) {
+                        i.remove();
+                        continue;
                     }
-                    if (o2 == null) {
+                    // Create a list with spillable size for stable sorting. 
Refer PIG-4012
+                    spillablesSR.add(new SpillablePtr(s, s.getMemorySize()));
+                }
+                log.debug("Spillables list size: " + spillablesSR.size());
+                Collections.sort(spillablesSR, new Comparator<SpillablePtr>() {
+                    @Override
+                    public int compare(SpillablePtr o1Ref, SpillablePtr o2Ref) 
{
+                        long o1Size = o1Ref.getMemorySize();
+                        long o2Size = o2Ref.getMemorySize();
+
+                        if (o1Size == o2Size) {
+                            return 0;
+                        }
+                        if (o1Size < o2Size) {
+                            return 1;
+                        }
                         return -1;
                     }
-                    long o1Size = o1.getMemorySize();
-                    long o2Size = o2.getMemorySize();
+                });
+                // Block new bags from being registered
+                blockRegisterOnSpill = true;
+            }
 
-                    if (o1Size == o2Size) {
-                        return 0;
-                    }
-                    if (o1Size < o2Size) {
-                        return 1;
-                    }
-                    return -1;
-                }
-            });
-            long estimatedFreed = 0;
-            int numObjSpilled = 0;
-            boolean invokeGC = false;
-            boolean extraGCCalled = false;
-            for (Iterator<WeakReference<Spillable>> i = spillables.iterator(); 
i.hasNext();) {
-                WeakReference<Spillable> weakRef = i.next();
-                Spillable s = weakRef.get();
-                // Still need to check for null here, even after we removed
-                // above, because the reference may have gone bad on us
-                // since the last check.
-                if (s == null) {
-                    i.remove();
-                    continue;
-                }
-                long toBeFreed = s.getMemorySize();
-                log.debug("Memorysize = "+toBeFreed+", spillFilesizethreshold 
= "+spillFileSizeThreshold+", gcactivationsize = "+gcActivationSize);
-                // Don't keep trying if the rest of files are too small
-                if (toBeFreed < spillFileSizeThreshold) {
-                    log.debug("spilling small files - getting out of memory 
handler");
-                    break ;
-                }
-                // If single Spillable is bigger than the threshold,
-                // we force GC to make sure we really need to keep this
-                // object before paying for the expensive spill().
-                // Done at most once per handleNotification.
-                // Do not invoke extraGC for GroupingSpillable. Its size will 
always exceed
-                // extraGCSpillSizeThreshold and the data is always strong 
referenced.
-                if( !extraGCCalled && extraGCSpillSizeThreshold != 0
-                    && toBeFreed > extraGCSpillSizeThreshold  && !(s 
instanceof GroupingSpillable)) {
-                    log.debug("Single spillable has size " + toBeFreed + 
"bytes. Calling extra gc()");
-                    // this extra assignment to null is needed so that gc can 
free the
-                    // spillable if nothing else is pointing at it
-                    s = null;
-                    System.gc();
-                    extraGCCalled = true;
-                    // checking again to see if this reference is still valid
-                    s = weakRef.get();
+            try {
+                long estimatedFreed = 0;
+                int numObjSpilled = 0;
+                boolean invokeGC = false;
+                boolean extraGCCalled = false;
+                boolean isGroupingSpillable = false;
+                for (Iterator<SpillablePtr> i = spillablesSR.iterator(); 
i.hasNext();) {
+                    SpillablePtr sPtr = i.next();
+                    Spillable s = sPtr.get();
+                    // Still need to check for null here, even after we removed
+                    // above, because the reference may have gone bad on us
+                    // since the last check.
                     if (s == null) {
                         i.remove();
-                        accumulatedFreeSize = 0;
-                        invokeGC = false;
                         continue;
                     }
+                    long toBeFreed = sPtr.getMemorySize();
+                    log.debug("Memorysize = "+toBeFreed+", 
spillFilesizethreshold = "+spillFileSizeThreshold+", gcactivationsize = 
"+gcActivationSize);
+                    // Don't keep trying if the rest of files are too small
+                    if (toBeFreed < spillFileSizeThreshold) {
+                        log.debug("spilling small files - getting out of 
memory handler");
+                        break ;
+                    }
+                    isGroupingSpillable = (s instanceof GroupingSpillable);
+                    // If single Spillable is bigger than the threshold,
+                    // we force GC to make sure we really need to keep this
+                    // object before paying for the expensive spill().
+                    // Done at most once per handleNotification.
+                    // Do not invoke extraGC for GroupingSpillable. Its size 
will always exceed
+                    // extraGCSpillSizeThreshold and the data is always strong 
referenced.
+                    if( !extraGCCalled && extraGCSpillSizeThreshold != 0
+                        && toBeFreed > extraGCSpillSizeThreshold  && 
!isGroupingSpillable) {
+                        log.debug("Single spillable has size " + toBeFreed + 
"bytes. Calling extra gc()");
+                        // this extra assignment to null is needed so that gc 
can free the
+                        // spillable if nothing else is pointing at it
+                        s = null;
+                        System.gc();
+                        extraGCCalled = true;
+                        // checking again to see if this reference is still 
valid
+                        s = sPtr.get();
+                        if (s == null) {
+                            i.remove();
+                            accumulatedFreeSize = 0;
+                            invokeGC = false;
+                            continue;
+                        }
+                    }
+                    // Unblock registering of new bags temporarily as 
aggregation
+                    // of POPartialAgg requires new record to be loaded.
+                    blockRegisterOnSpill = !isGroupingSpillable;
+                    long numSpilled;
+                    try {
+                        numSpilled = s.spill();
+                    } finally {
+                        blockRegisterOnSpill = true;
+                    }
+
+                    if (numSpilled > 0) {
+                        numObjSpilled++;
+                        estimatedFreed += toBeFreed;
+                        accumulatedFreeSize += toBeFreed;
+                    }
+                    // This should significantly reduce the number of small 
files
+                    // in case that we have a lot of nested bags
+                    if (accumulatedFreeSize > gcActivationSize) {
+                        invokeGC = true;
+                    }
+
+                    if (estimatedFreed > toFree) {
+                        log.debug("Freed enough space - getting out of memory 
handler");
+                        invokeGC = true;
+                        break;
+                    }
                 }
-                s.spill();
-                numObjSpilled++;
-                estimatedFreed += toBeFreed;
-                accumulatedFreeSize += toBeFreed;
-                // This should significantly reduce the number of small files
-                // in case that we have a lot of nested bags
-                if (accumulatedFreeSize > gcActivationSize) {
-                    invokeGC = true;
+                spillablesSR = null;
+                /* Poke the GC again to see if we successfully freed enough 
memory */
+                if(invokeGC) {
+                    System.gc();
+                    // now that we have invoked the GC, reset 
accumulatedFreeSize
+                    accumulatedFreeSize = 0;
                 }
-
-                if (estimatedFreed > toFree) {
-                    log.debug("Freed enough space - getting out of memory 
handler");
-                    invokeGC = true;
-                    break;
+                if(estimatedFreed > 0){
+                    String msg = "Spilled an estimate of " + estimatedFreed +
+                    " bytes from " + numObjSpilled + " objects. " + 
info.getUsage();;
+                    log.info(msg);
                 }
+            } finally {
+                blockRegisterOnSpill = false;
             }
-            /* Poke the GC again to see if we successfully freed enough memory 
*/
-            if(invokeGC) {
-                System.gc();
-                // now that we have invoked the GC, reset accumulatedFreeSize
-                accumulatedFreeSize = 0;
-            }
-            if(estimatedFreed > 0){
-                String msg = "Spilled an estimate of " + estimatedFreed +
-                " bytes from " + numObjSpilled + " objects. " + 
info.getUsage();;
-                log.info(msg);
-            }
-
         }
+
     }
 
     public void clearSpillables() {
@@ -321,7 +347,7 @@ public class SpillableMemoryManager impl
      * @param s the spillable to track.
      */
     public void registerSpillable(Spillable s) {
-        synchronized(spillables) {
+        synchronized (spillables) {
             // Cleaing the entire list is too expensive.  Just trim off the 
front while
             // we can.
             WeakReference<Spillable> first = spillables.peek();
@@ -329,7 +355,46 @@ public class SpillableMemoryManager impl
                 spillables.remove();
                 first = spillables.peek();
             }
+
+            if (blockRegisterOnSpill) {
+                // When the spill is happening we do not want to register new 
bags
+                // save for exceptions like POPartialAgg. So block here.
+                // blockRegisterOnSpill is set to false in the finally block 
after spill.
+                // But just in case adding a safeguard of 5 min timeout 
(assuming a large
+                // spill completes within 5 mins) instead of infinitely 
blocking
+                // in case there are missed corner cases causing deadlock.
+                try {
+                    int i = 6000;
+                    for (; i > 0 && blockRegisterOnSpill; i--) {
+                        Thread.sleep(50);
+                    }
+                    if (i == 0) {
+                        log.warn("Spill took more than 5 mins. This needs 
investigation");
+                    }
+                } catch (InterruptedException e) {
+                    log.warn("Interrupted exception in registerSpillable while 
blocked on spill", e);
+                }
+                blockRegisterOnSpill = false;
+            }
             spillables.add(new WeakReference<Spillable>(s));
         }
     }
+
+    private static class SpillablePtr {
+        private WeakReference<Spillable> spillable;
+        private long size;
+
+        public SpillablePtr(Spillable p, long s) {
+            spillable = new WeakReference<Spillable>(p);
+            size = s;
+        }
+
+        public Spillable get() {
+            return spillable.get();
+        }
+
+        public long getMemorySize() {
+            return size;
+        }
+    }
 }

Modified: pig/branches/spark/src/org/apache/pig/impl/util/UDFContext.java
URL: 
http://svn.apache.org/viewvc/pig/branches/spark/src/org/apache/pig/impl/util/UDFContext.java?rev=1733627&r1=1733626&r2=1733627&view=diff
==============================================================================
--- pig/branches/spark/src/org/apache/pig/impl/util/UDFContext.java (original)
+++ pig/branches/spark/src/org/apache/pig/impl/util/UDFContext.java Fri Mar  4 
18:17:39 2016
@@ -24,8 +24,6 @@ import java.util.HashMap;
 import java.util.Properties;
 
 import org.apache.hadoop.conf.Configuration;
-import org.apache.pig.JVMReuseManager;
-import org.apache.pig.StaticDataCleanup;
 import 
org.apache.pig.backend.hadoop.executionengine.mapReduceLayer.MRConfiguration;
 
 public class UDFContext {
@@ -33,8 +31,9 @@ public class UDFContext {
     private Configuration jconf = null;
     private HashMap<UDFContextKey, Properties> udfConfs;
     private Properties clientSysProps;
-    private static final String CLIENT_SYS_PROPS = "pig.client.sys.props";
-    private static final String UDF_CONTEXT = "pig.udf.context";
+
+    static final String CLIENT_SYS_PROPS = "pig.client.sys.props";
+    static final String UDF_CONTEXT = "pig.udf.context";
 
     private static ThreadLocal<UDFContext> tss = new ThreadLocal<UDFContext>() 
{
         @Override
@@ -43,10 +42,6 @@ public class UDFContext {
         }
     };
 
-    static {
-        
JVMReuseManager.getInstance().registerForStaticDataCleanup(UDFContext.class);
-    }
-
     private UDFContext() {
         udfConfs = new HashMap<UDFContextKey, Properties>();
     }
@@ -68,8 +63,8 @@ public class UDFContext {
     /*
      *  internal pig use only - should NOT be called from user code
      */
-    @StaticDataCleanup
-    public static void cleanupStaticData() {
+    //@StaticDataCleanup
+    public static void staticDataCleanup() {
         tss = new ThreadLocal<UDFContext>() {
             @Override
             public UDFContext initialValue() {
@@ -81,6 +76,14 @@ public class UDFContext {
     /*
      *  internal pig use only - should NOT be called from user code
      */
+    HashMap<UDFContextKey, Properties> getUdfConfs() {
+        return udfConfs;
+    }
+
+
+    /*
+     *  internal pig use only - should NOT be called from user code
+     */
     public void setClientSystemProps(Properties properties) {
         clientSysProps = properties;
     }
@@ -201,6 +204,7 @@ public class UDFContext {
         conf.set(CLIENT_SYS_PROPS, ObjectSerializer.serialize(clientSysProps));
     }
 
+
     /**
      * Populate the udfConfs field.  This function is intended to
      * be called by Map.configure or Reduce.configure on the backend.
@@ -255,23 +259,31 @@ public class UDFContext {
      *  it holds the class and args of the udf, and
      *  implements equals() and hashCode()
      */
-    private static class UDFContextKey implements Serializable{
+    static class UDFContextKey implements Serializable{
 
         private static final long serialVersionUID = 1;
         private String className;
         private String[] args;
 
-        UDFContextKey(){
-        }
-
         UDFContextKey(String className, String [] args){
             this.className = className;
             this.args = args;
         }
 
-        /* (non-Javadoc)
-         * @see java.lang.Object#hashCode()
-         */
+        String getClassName() {
+            return className;
+        }
+
+        String[] getArgs() {
+            return args;
+        }
+
+        @Override
+        public String toString() {
+            return "UDFContextKey [className=" + className + ", args="
+                    + Arrays.toString(args) + "]";
+        }
+
         @Override
         public int hashCode() {
             final int prime = 31;
@@ -282,9 +294,6 @@ public class UDFContext {
             return result;
         }
 
-        /* (non-Javadoc)
-         * @see java.lang.Object#equals(java.lang.Object)
-         */
         @Override
         public boolean equals(Object obj) {
             if (this == obj)

Modified: pig/branches/spark/src/org/apache/pig/impl/util/Utils.java
URL: 
http://svn.apache.org/viewvc/pig/branches/spark/src/org/apache/pig/impl/util/Utils.java?rev=1733627&r1=1733626&r2=1733627&view=diff
==============================================================================
--- pig/branches/spark/src/org/apache/pig/impl/util/Utils.java (original)
+++ pig/branches/spark/src/org/apache/pig/impl/util/Utils.java Fri Mar  4 
18:17:39 2016
@@ -255,6 +255,13 @@ public class Utils {
         return schema;
     }
 
+    public static Object parseConstant(String constantString) throws 
ParserException {
+        QueryParserDriver queryParser = new QueryParserDriver( new 
PigContext(),
+                "util", new HashMap<String, String>() ) ;
+        Object constant = queryParser.parseConstant(constantString);
+        return constant;
+    }
+
     /**
      * This method adds FieldSchema of 'input source tag/path' as the first
      * field. This will be called only when PigStorage is invoked with
@@ -616,16 +623,9 @@ public class Utils {
      * @throws IOException
      */
 
-    public static Path depthFirstSearchForFile(final FileStatus fileStatus,
-        final FileSystem fileSystem) throws IOException {
-      if (fileSystem.isFile(fileStatus.getPath())) {
-        return fileStatus.getPath();
-      } else {
-        return depthFirstSearchForFile(
-            fileSystem.listStatus(fileStatus.getPath(), VISIBLE_FILES),
-            fileSystem);
-      }
-
+    public static Path depthFirstSearchForFile(final FileStatus[] statusArray,
+            final FileSystem fileSystem) throws IOException {
+        return depthFirstSearchForFile(statusArray, fileSystem, null);
     }
 
     /**
@@ -637,7 +637,7 @@ public class Utils {
      * @throws IOException
      */
     public static Path depthFirstSearchForFile(final FileStatus[] statusArray,
-        final FileSystem fileSystem) throws IOException {
+        final FileSystem fileSystem, PathFilter filter) throws IOException {
 
       // Most recent files first
       Arrays.sort(statusArray,
@@ -650,10 +650,17 @@ public class Utils {
       );
 
       for (FileStatus f : statusArray) {
-        Path p = depthFirstSearchForFile(f, fileSystem);
-        if (p != null) {
-          return p;
-        }
+          if (fileSystem.isFile(f.getPath())) {
+              if (filter == null || filter.accept(f.getPath())) {
+                  return f.getPath();
+              } else {
+                  continue;
+              }
+            } else {
+              return depthFirstSearchForFile(
+                  fileSystem.listStatus(f.getPath(), VISIBLE_FILES),
+                  fileSystem, filter);
+            }
       }
 
       return null;

Modified: 
pig/branches/spark/src/org/apache/pig/impl/util/avro/AvroBagWrapper.java
URL: 
http://svn.apache.org/viewvc/pig/branches/spark/src/org/apache/pig/impl/util/avro/AvroBagWrapper.java?rev=1733627&r1=1733626&r2=1733627&view=diff
==============================================================================
--- pig/branches/spark/src/org/apache/pig/impl/util/avro/AvroBagWrapper.java 
(original)
+++ pig/branches/spark/src/org/apache/pig/impl/util/avro/AvroBagWrapper.java 
Fri Mar  4 18:17:39 2016
@@ -99,7 +99,7 @@ public final class AvroBagWrapper<T> imp
               if (arg instanceof IndexedRecord) {
                 return new AvroTupleWrapper<IndexedRecord>((IndexedRecord) 
arg);
               } else {
-                return 
TupleFactory.getInstance().newTuple(AvroTupleWrapper.unionResolver(arg));
+                return 
TupleFactory.getInstance().newTuple(AvroTupleWrapper.getPigObject(arg));
               }
             }
           }

Modified: 
pig/branches/spark/src/org/apache/pig/impl/util/avro/AvroMapWrapper.java
URL: 
http://svn.apache.org/viewvc/pig/branches/spark/src/org/apache/pig/impl/util/avro/AvroMapWrapper.java?rev=1733627&r1=1733626&r2=1733627&view=diff
==============================================================================
--- pig/branches/spark/src/org/apache/pig/impl/util/avro/AvroMapWrapper.java 
(original)
+++ pig/branches/spark/src/org/apache/pig/impl/util/avro/AvroMapWrapper.java 
Fri Mar  4 18:17:39 2016
@@ -20,6 +20,7 @@ package org.apache.pig.impl.util.avro;
 
 import java.util.AbstractMap;
 import java.util.Collection;
+import java.util.HashSet;
 import java.util.Map;
 import java.util.Set;
 
@@ -65,6 +66,10 @@ public final class AvroMapWrapper implem
 
   @Override
   public boolean containsKey(final Object key) {
+    if (isUtf8key && !(key instanceof  Utf8)) {
+      // Assuming keys can either be utf8 or string
+      return innerMap.containsKey(new Utf8((String) key));
+    }
     return innerMap.containsKey(key);
   }
 
@@ -81,12 +86,7 @@ public final class AvroMapWrapper implem
     } else {
       v = innerMap.get(key);
     }
-
-    if (v instanceof Utf8) {
-      return v.toString();
-    } else {
-      return v;
-    }
+    return AvroTupleWrapper.getPigObject(v);
   }
 
   @Override
@@ -112,6 +112,13 @@ public final class AvroMapWrapper implem
 
   @Override
   public Set<CharSequence> keySet() {
+    if (isUtf8key) {
+      final Set<CharSequence> keySet = new HashSet<CharSequence>();
+      for (CharSequence cs : innerMap.keySet()) {
+        keySet.add(cs.toString());
+      }
+      return keySet;
+    }
     return innerMap.keySet();
   }
 
@@ -122,11 +129,7 @@ public final class AvroMapWrapper implem
         new Function() {
             @Override
             public Object apply(final Object v) {
-              if (v instanceof Utf8) {
-                return v.toString();
-              } else {
-                return v;
-              }
+              return AvroTupleWrapper.getPigObject(v);
             }
           }
         );
@@ -138,18 +141,13 @@ public final class AvroMapWrapper implem
         Sets.newHashSetWithExpectedSize(innerMap.size());
     for (java.util.Map.Entry<CharSequence, Object> e : innerMap.entrySet()) {
       CharSequence k = e.getKey();
-      Object v = e.getValue();
+      final Object v = AvroTupleWrapper.getPigObject(e.getValue());
       if (k instanceof Utf8) {
         k = k.toString();
       }
-      if (v instanceof Utf8) {
-        v = v.toString();
-      }
       theSet.add(new AbstractMap.SimpleEntry<CharSequence, Object>(k, v));
     }
-
     return theSet;
-
   }
 
 }

Modified: 
pig/branches/spark/src/org/apache/pig/impl/util/avro/AvroStorageDataConversionUtilities.java
URL: 
http://svn.apache.org/viewvc/pig/branches/spark/src/org/apache/pig/impl/util/avro/AvroStorageDataConversionUtilities.java?rev=1733627&r1=1733626&r2=1733627&view=diff
==============================================================================
--- 
pig/branches/spark/src/org/apache/pig/impl/util/avro/AvroStorageDataConversionUtilities.java
 (original)
+++ 
pig/branches/spark/src/org/apache/pig/impl/util/avro/AvroStorageDataConversionUtilities.java
 Fri Mar  4 18:17:39 2016
@@ -21,11 +21,14 @@ package org.apache.pig.impl.util.avro;
 
 import java.io.IOException;
 import java.nio.ByteBuffer;
+import java.util.HashMap;
+import java.util.Map;
 
 import org.apache.avro.Schema;
 import org.apache.avro.Schema.Field;
 import org.apache.avro.Schema.Type;
 import org.apache.avro.generic.GenericData;
+import org.apache.avro.util.Utf8;
 import org.apache.pig.data.DataBag;
 import org.apache.pig.data.DataByteArray;
 import org.apache.pig.data.DataType;
@@ -54,35 +57,7 @@ public class AvroStorageDataConversionUt
       for (Field f : s.getFields()) {
         Object o = t.get(f.pos());
         Schema innerSchema = f.schema();
-        if (AvroStorageSchemaConversionUtilities.isNullableUnion(innerSchema)) 
{
-          if (o == null) {
-            record.put(f.pos(), null);
-            continue;
-          }
-          innerSchema = AvroStorageSchemaConversionUtilities
-              .removeSimpleUnion(innerSchema);
-        }
-        switch(innerSchema.getType()) {
-        case RECORD:
-          record.put(f.pos(), packIntoAvro((Tuple) o, innerSchema));
-          break;
-        case ARRAY:
-          record.put(f.pos(), packIntoAvro((DataBag) o, innerSchema));
-          break;
-        case BYTES:
-          record.put(f.pos(), ByteBuffer.wrap(((DataByteArray) o).get()));
-          break;
-        case FIXED:
-          record.put(f.pos(), new GenericData.Fixed(
-              innerSchema, ((DataByteArray) o).get()));
-          break;
-        default:
-          if (t.getType(f.pos()) == DataType.DATETIME) {
-            record.put(f.pos(), ((DateTime) o).getMillis() );
-          } else {
-            record.put(f.pos(), o);
-          }
-        }
+        record.put(f.pos(), packIntoAvro(o, innerSchema));
       }
       return record;
     } catch (Exception e) {
@@ -123,5 +98,52 @@ public class AvroStorageDataConversionUt
     }
   }
 
+  private static Object packIntoAvro(final Object o, Schema s)
+      throws IOException {
+    if (AvroStorageSchemaConversionUtilities.isNullableUnion(s)) {
+      if (o == null) {
+        return null;
+      }
+      s = AvroStorageSchemaConversionUtilities.removeSimpleUnion(s);
+    }
+    // what if o == null and schema doesn't allow it ?
+    switch (s.getType()) {
+      case RECORD:
+        return packIntoAvro((Tuple) o, s);
+      case ARRAY:
+        return packIntoAvro((DataBag) o, s);
+      case MAP:
+        return packIntoAvro((Map<CharSequence, Object>) o, s);
+      case BYTES:
+        return ByteBuffer.wrap(((DataByteArray) o).get());
+      case FIXED:
+        return new GenericData.Fixed(s, ((DataByteArray) o).get());
+      default:
+        if (DataType.findType(o) == DataType.DATETIME) {
+          return ((DateTime) o).getMillis();
+        } else {
+          return o;
+        }
+    }
+  }
 
+  private static Map<Utf8, Object> packIntoAvro(Map<CharSequence, Object> 
input, Schema schema)
+      throws IOException {
+    final Map<Utf8, Object> output = new HashMap<Utf8, Object>();
+    for (Map.Entry<CharSequence, Object> e : input.entrySet()) {
+      final Utf8 k = utf8(e.getKey());
+      output.put(k, packIntoAvro(e.getValue(), schema.getValueType()));
+    }
+    return output;
+  }
+
+  private static Utf8 utf8(CharSequence v) {
+    if (v instanceof Utf8) {
+      return (Utf8) v;
+    } else {
+      final StringBuilder sb = new StringBuilder(v.length());
+      sb.append(v);
+      return new Utf8(sb.toString());
+    }
+  }
 }


Reply via email to