Modified: 
pig/branches/spark/src/org/apache/pig/backend/hadoop/executionengine/Launcher.java
URL: 
http://svn.apache.org/viewvc/pig/branches/spark/src/org/apache/pig/backend/hadoop/executionengine/Launcher.java?rev=1733627&r1=1733626&r2=1733627&view=diff
==============================================================================
--- 
pig/branches/spark/src/org/apache/pig/backend/hadoop/executionengine/Launcher.java
 (original)
+++ 
pig/branches/spark/src/org/apache/pig/backend/hadoop/executionengine/Launcher.java
 Fri Mar  4 18:17:39 2016
@@ -288,7 +288,7 @@ public abstract class Launcher {
         try {
             jobControlException = 
getExceptionFromString(jobControlExceptionStackTrace);
         } catch (Exception e) {
-            String errMsg = "Could not resolve error that occured when 
launching job: "
+            String errMsg = "Could not resolve error that occurred when 
launching job: "
                     + jobControlExceptionStackTrace;
             jobControlException = new RuntimeException(errMsg, throwable);
         }

Modified: 
pig/branches/spark/src/org/apache/pig/backend/hadoop/executionengine/fetch/FetchOptimizer.java
URL: 
http://svn.apache.org/viewvc/pig/branches/spark/src/org/apache/pig/backend/hadoop/executionengine/fetch/FetchOptimizer.java?rev=1733627&r1=1733626&r2=1733627&view=diff
==============================================================================
--- 
pig/branches/spark/src/org/apache/pig/backend/hadoop/executionengine/fetch/FetchOptimizer.java
 (original)
+++ 
pig/branches/spark/src/org/apache/pig/backend/hadoop/executionengine/fetch/FetchOptimizer.java
 Fri Mar  4 18:17:39 2016
@@ -176,7 +176,6 @@ public class FetchOptimizer {
 
         @Override
         public void visit() throws VisitorException {
-            new PhyPlanSetter(mPlan).visit();
             super.visit();
         }
 

Modified: 
pig/branches/spark/src/org/apache/pig/backend/hadoop/executionengine/mapReduceLayer/InputSizeReducerEstimator.java
URL: 
http://svn.apache.org/viewvc/pig/branches/spark/src/org/apache/pig/backend/hadoop/executionengine/mapReduceLayer/InputSizeReducerEstimator.java?rev=1733627&r1=1733626&r2=1733627&view=diff
==============================================================================
--- 
pig/branches/spark/src/org/apache/pig/backend/hadoop/executionengine/mapReduceLayer/InputSizeReducerEstimator.java
 (original)
+++ 
pig/branches/spark/src/org/apache/pig/backend/hadoop/executionengine/mapReduceLayer/InputSizeReducerEstimator.java
 Fri Mar  4 18:17:39 2016
@@ -137,11 +137,11 @@ public class InputSizeReducerEstimator i
                             }
                         } else {
                             // If file is not found, we should report -1
-                            return -1;
+                            continue;
                         }
                     } else {
                         // If we cannot estimate size of a location, we should 
report -1
-                        return -1;
+                        continue;
                     }
                 }
             }

Modified: 
pig/branches/spark/src/org/apache/pig/backend/hadoop/executionengine/mapReduceLayer/JobControlCompiler.java
URL: 
http://svn.apache.org/viewvc/pig/branches/spark/src/org/apache/pig/backend/hadoop/executionengine/mapReduceLayer/JobControlCompiler.java?rev=1733627&r1=1733626&r2=1733627&view=diff
==============================================================================
--- 
pig/branches/spark/src/org/apache/pig/backend/hadoop/executionengine/mapReduceLayer/JobControlCompiler.java
 (original)
+++ 
pig/branches/spark/src/org/apache/pig/backend/hadoop/executionengine/mapReduceLayer/JobControlCompiler.java
 Fri Mar  4 18:17:39 2016
@@ -38,6 +38,7 @@ import java.util.List;
 import java.util.Map;
 import java.util.Map.Entry;
 import java.util.Properties;
+import java.util.TreeMap;
 import java.util.regex.Matcher;
 import java.util.regex.Pattern;
 
@@ -66,14 +67,12 @@ import org.apache.pig.FuncSpec;
 import org.apache.pig.LoadFunc;
 import org.apache.pig.OverwritableStoreFunc;
 import org.apache.pig.PigConfiguration;
-import org.apache.pig.PigConstants;
 import org.apache.pig.PigException;
 import org.apache.pig.StoreFuncInterface;
 import org.apache.pig.backend.executionengine.ExecException;
 import org.apache.pig.backend.hadoop.HDataType;
 import org.apache.pig.backend.hadoop.datastorage.ConfigurationUtil;
 import org.apache.pig.backend.hadoop.executionengine.JobCreationException;
-import 
org.apache.pig.backend.hadoop.executionengine.mapReduceLayer.partitioners.RollupHIIPartitioner;
 import 
org.apache.pig.backend.hadoop.executionengine.mapReduceLayer.partitioners.SecondaryKeyPartitioner;
 import 
org.apache.pig.backend.hadoop.executionengine.mapReduceLayer.partitioners.SkewedPartitioner;
 import 
org.apache.pig.backend.hadoop.executionengine.mapReduceLayer.partitioners.WeightedRangePartitioner;
@@ -185,13 +184,10 @@ public class JobControlCompiler{
     public static final String PIG_MAP_STORES = "pig.map.stores";
     public static final String PIG_REDUCE_STORES = "pig.reduce.stores";
 
-    private static final String ROLLUP_PARTITIONER = 
RollupHIIPartitioner.class.getName();
-
     // A mapping of job to pair of store locations and tmp locations for that 
job
     private Map<Job, Pair<List<POStore>, Path>> jobStoreMap;
 
     private Map<Job, MapReduceOper> jobMroMap;
-    private int counterSize;
 
     public JobControlCompiler(PigContext pigContext, Configuration conf) {
         this(pigContext, conf, null);
@@ -360,7 +356,7 @@ public class JobControlCompiler{
             {
                 MapReduceOper mro = jobMroMap.get(job);
                 if (!pigContext.inIllustrator && mro.isCounterOperation())
-                    saveCounters(job,mro.getOperationID());
+                    saveCounters(job,mro.getOperationID(), mro.isRowNumber());
                 plan.remove(mro);
             }
         }
@@ -378,10 +374,11 @@ public class JobControlCompiler{
      * these values are passed via configuration file to PORank, by using the 
unique
      * operation identifier
      */
-    private void saveCounters(Job job, String operationID) {
+    private void saveCounters(Job job, String operationID, boolean isRowNumber 
) {
         Counters counters;
         Group groupCounters;
 
+        int counterSize = -1;
         Long previousValue = 0L;
         Long previousSum = 0L;
         ArrayList<Pair<String,Long>> counterPairs;
@@ -407,24 +404,28 @@ public class JobControlCompiler{
             }
             groupCounters = counters.getGroup(groupName);
 
-            Iterator<Counter> it = groupCounters.iterator();
-            HashMap<Integer,Long> counterList = new HashMap<Integer, Long>();
+            TreeMap<Integer,Long> counterList = new TreeMap<Integer, Long>();
 
-            while(it.hasNext()) {
-                try{
+            Iterator<Counter> it = groupCounters.iterator();
+            while (it.hasNext()) {
+                try {
                     Counter c = it.next();
                     counterList.put(Integer.valueOf(c.getDisplayName()), 
c.getValue());
                 } catch (Exception ex) {
                     ex.printStackTrace();
                 }
             }
+
             counterSize = counterList.size();
             counterPairs = new ArrayList<Pair<String,Long>>();
 
-            for(int i = 0; i < counterSize; i++){
+            // There could be empty tasks with no counters. That is not an 
issue
+            // and we only need to calculate offsets for non-empty task ids
+            // which will be accessed in PORank.
+            for (Entry<Integer, Long> entry : counterList.entrySet()) {
                 previousSum += previousValue;
-                previousValue = counterList.get(Integer.valueOf(i));
-                counterPairs.add(new Pair<String, 
Long>(JobControlCompiler.PIG_MAP_COUNTER + operationID + 
JobControlCompiler.PIG_MAP_SEPARATOR + i, previousSum));
+                previousValue = entry.getValue();
+                counterPairs.add(new Pair<String, 
Long>(JobControlCompiler.PIG_MAP_COUNTER + operationID + 
JobControlCompiler.PIG_MAP_SEPARATOR + entry.getKey(), previousSum));
             }
 
             globalCounters.put(operationID, counterPairs);
@@ -528,9 +529,6 @@ public class JobControlCompiler{
         configureCompression(conf);
 
         try{
-            //Set default value for PIG_HII_ROLLUP_OPTIMIZABLE to false
-            conf.setBoolean(PigConstants.PIG_HII_ROLLUP_OPTIMIZABLE, false);
-
             //Process the POLoads
             List<POLoad> lds = PlanHelper.getPhysicalOperators(mro.mapPlan, 
POLoad.class);
 
@@ -646,7 +644,11 @@ public class JobControlCompiler{
                             }
                         }
                         if (!predeployed) {
-                            
putJarOnClassPathThroughDistributedCache(pigContext, conf, jar);
+                            if (jar.getFile().toLowerCase().endsWith(".jar")) {
+                                
putJarOnClassPathThroughDistributedCache(pigContext, conf, jar);
+                            } else {
+                                setupDistributedCache(pigContext, conf, new 
String[] {jar.getPath()}, true);
+                            }
                         }
                     }
 
@@ -691,10 +693,15 @@ public class JobControlCompiler{
             if(Utils.isLocal(pigContext, conf)) {
                 ConfigurationUtil.replaceConfigForLocalMode(conf);
             }
-            conf.set("pig.inputs", ObjectSerializer.serialize(inp));
-            conf.set("pig.inpTargets", ObjectSerializer.serialize(inpTargets));
-            conf.set("pig.inpSignatures", 
ObjectSerializer.serialize(inpSignatureLists));
-            conf.set("pig.inpLimits", ObjectSerializer.serialize(inpLimits));
+            conf.set(PigInputFormat.PIG_INPUTS, 
ObjectSerializer.serialize(inp));
+            conf.set(PigInputFormat.PIG_INPUT_TARGETS, 
ObjectSerializer.serialize(inpTargets));
+            conf.set(PigInputFormat.PIG_INPUT_SIGNATURES, 
ObjectSerializer.serialize(inpSignatureLists));
+            conf.set(PigInputFormat.PIG_INPUT_LIMITS, 
ObjectSerializer.serialize(inpLimits));
+
+            // Removing job credential entry before serializing pigcontext 
into jobconf
+            // since this path would be invalid for the new job being created
+            
pigContext.getProperties().remove("mapreduce.job.credentials.binary");
+
             conf.set("pig.pigContext", ObjectSerializer.serialize(pigContext));
             conf.set("udf.import.list", 
ObjectSerializer.serialize(PigContext.getPackageImportList()));
             // this is for unit tests since some don't create PigServer
@@ -807,6 +814,7 @@ public class JobControlCompiler{
             // set parent plan in all operators in map and reduce plans
             // currently the parent plan is really used only when POStream is 
present in the plan
             new PhyPlanSetter(mro.mapPlan).visit();
+            new PhyPlanSetter(mro.combinePlan).visit();
             new PhyPlanSetter(mro.reducePlan).visit();
 
             // this call modifies the ReplFiles names of POFRJoin operators
@@ -844,51 +852,14 @@ public class JobControlCompiler{
                 }
                 pack = (POPackage)mro.reducePlan.getRoots().get(0);
 
-                if(pack!=null) {
-                    if(pack.getPivot()!=-1) {
-                        //Set value for PIG_HII_ROLLUP_OPTIMIZABLE to true
-                        
conf.setBoolean(PigConstants.PIG_HII_ROLLUP_OPTIMIZABLE, true);
-                        //Set the pivot value
-                        conf.setInt(PigConstants.PIG_HII_ROLLUP_PIVOT, 
pack.getPivot());
-                        //Set the index of the first field involves in ROLLUP
-                        conf.setInt(PigConstants.PIG_HII_ROLLUP_FIELD_INDEX, 
pack.getRollupFieldIndex());
-                        //Set the original index of the first field involves 
in ROLLUP in case it was moved to the end
-                        //(if we have the combination of cube and rollup)
-                        
conf.setInt(PigConstants.PIG_HII_ROLLUP_OLD_FIELD_INDEX, 
pack.getRollupOldFieldIndex());
-                        //Set the size of total fields that involve in CUBE 
clause
-                        conf.setInt(PigConstants.PIG_HII_NUMBER_TOTAL_FIELD, 
pack.getDimensionSize());
-                        //Set number of algebraic functions that used after 
rollup
-                        conf.setInt(PigConstants.PIG_HII_NUMBER_ALGEBRAIC, 
pack.getNumberAlgebraic());
-                        //Set number of reducer to 1 due to using IRG algorithm
-                        if(pack.getPivot() == 0 && !mro.reducePlan.isEmpty()) {
-                            updateNumReducers(plan, mro, nwJob);
-                        }
-                    }
-                }
-
                 if (!pigContext.inIllustrator) {
                     mro.reducePlan.remove(pack);
                 }
-
-                if (pack!=null && pack.getPivot()!=-1) {
-                    nwJob.setMapperClass(PigMapReduce.MapRollupHII.class);
-                } else {
-                    nwJob.setMapperClass(PigMapReduce.Map.class);
-                }
-
+                nwJob.setMapperClass(PigMapReduce.Map.class);
                 nwJob.setReducerClass(PigMapReduce.Reduce.class);
 
-                // Set Rollup Partitioner in case the pivot is not equal to -1
-                // and the custormPartitioner name is our rollup partitioner.
-                if (mro.customPartitioner != null) {
-                    if (mro.customPartitioner.equals(ROLLUP_PARTITIONER)) {
-                        if (pack.getPivot()!=-1) {
-                            
nwJob.setPartitionerClass(PigContext.resolveClassName(mro.customPartitioner));
-                        }
-                    } else {
-                        
nwJob.setPartitionerClass(PigContext.resolveClassName(mro.customPartitioner));
-                    }
-                }
+                if (mro.customPartitioner != null)
+                    
nwJob.setPartitionerClass(PigContext.resolveClassName(mro.customPartitioner));
 
                 if(!pigContext.inIllustrator)
                     conf.set("pig.mapPlan", 
ObjectSerializer.serialize(mro.mapPlan));
@@ -1104,26 +1075,6 @@ public class JobControlCompiler{
     }
 
     /**
-     * If pivot position is zero, we use only one reducer
-     * @param plan the MR plan
-     * @param mro the MR operator
-     * @param nwJob the current job
-     * @throws IOException
-     */
-    public void updateNumReducers(MROperPlan plan, MapReduceOper mro,
-    org.apache.hadoop.mapreduce.Job nwJob) throws IOException {
-        // Change number of reducer to 1 if only IRG is used
-        if (mro.customPartitioner != null && 
mro.customPartitioner.equals(ROLLUP_PARTITIONER)) {
-            log.info("Changing Parallelism to 1 due to using IRG");
-        }
-        conf.setInt("pig.info.reducers.default.parallel", 1);
-        conf.setInt("pig.info.reducers.requested.parallel", 1);
-        conf.setInt("pig.info.reducers.estimated.parallel", 1);
-        conf.setInt(MRConfiguration.REDUCE_TASKS, 1);
-        nwJob.setNumReduceTasks(1);
-    }
-
-    /**
      * Calculate the runtime #reducers based on the default_parallel, 
requested parallel and estimated
      * parallel, and save it to MapReduceOper's runtimeParallelism.
      * @return the runtimeParallelism
@@ -1762,7 +1713,7 @@ public class JobControlCompiler{
         return null;
     }
 
-    private static Path getCacheStagingDir(Configuration conf) throws 
IOException {
+    public static Path getCacheStagingDir(Configuration conf) throws 
IOException {
         String pigTempDir = conf.get(PigConfiguration.PIG_USER_CACHE_LOCATION,
                 conf.get(PigConfiguration.PIG_TEMP_DIR, "/tmp"));
         String currentUser = System.getProperty("user.name");
@@ -1773,7 +1724,7 @@ public class JobControlCompiler{
         return stagingDir;
     }
 
-    private static Path getFromCache(PigContext pigContext,
+    public static Path getFromCache(PigContext pigContext,
             Configuration conf,
             URL url) throws IOException {
         InputStream is1 = null;
@@ -1799,7 +1750,10 @@ public class JobControlCompiler{
             // attempt to copy to cache else return null
             fs.mkdirs(cacheDir, FileLocalizer.OWNER_ONLY_PERMS);
             is2 = url.openStream();
-            os = FileSystem.create(fs, cacheFile, 
FileLocalizer.OWNER_ONLY_PERMS);
+            short replication = 
(short)conf.getInt(PigConfiguration.PIG_USER_CACHE_REPLICATION,
+                    conf.getInt("mapred.submit.replication", 10));
+            os = fs.create(cacheFile, replication);
+            fs.setPermission(cacheFile, FileLocalizer.OWNER_ONLY_PERMS);
             IOUtils.copyBytes(is2, os, 4096, true);
 
             return cacheFile;

Modified: 
pig/branches/spark/src/org/apache/pig/backend/hadoop/executionengine/mapReduceLayer/MRCompiler.java
URL: 
http://svn.apache.org/viewvc/pig/branches/spark/src/org/apache/pig/backend/hadoop/executionengine/mapReduceLayer/MRCompiler.java?rev=1733627&r1=1733626&r2=1733627&view=diff
==============================================================================
--- 
pig/branches/spark/src/org/apache/pig/backend/hadoop/executionengine/mapReduceLayer/MRCompiler.java
 (original)
+++ 
pig/branches/spark/src/org/apache/pig/backend/hadoop/executionengine/mapReduceLayer/MRCompiler.java
 Fri Mar  4 18:17:39 2016
@@ -76,7 +76,6 @@ import org.apache.pig.backend.hadoop.exe
 import 
org.apache.pig.backend.hadoop.executionengine.physicalLayer.relationalOperators.POPackage;
 import 
org.apache.pig.backend.hadoop.executionengine.physicalLayer.relationalOperators.POPartitionRearrange;
 import 
org.apache.pig.backend.hadoop.executionengine.physicalLayer.relationalOperators.PORank;
-import 
org.apache.pig.backend.hadoop.executionengine.physicalLayer.relationalOperators.PORollupHIIForEach;
 import 
org.apache.pig.backend.hadoop.executionengine.physicalLayer.relationalOperators.POSkewedJoin;
 import 
org.apache.pig.backend.hadoop.executionengine.physicalLayer.relationalOperators.POSort;
 import 
org.apache.pig.backend.hadoop.executionengine.physicalLayer.relationalOperators.POSplit;
@@ -93,6 +92,7 @@ import org.apache.pig.impl.PigContext;
 import org.apache.pig.impl.builtin.DefaultIndexableLoader;
 import org.apache.pig.impl.builtin.FindQuantiles;
 import org.apache.pig.impl.builtin.GetMemNumRows;
+import org.apache.pig.impl.builtin.IsFirstReduceOfKey;
 import org.apache.pig.impl.builtin.PartitionSkewedKeys;
 import org.apache.pig.impl.builtin.PoissonSampleLoader;
 import org.apache.pig.impl.builtin.RandomSampleLoader;
@@ -1099,11 +1099,6 @@ public class MRCompiler extends PhyPlanV
     }
 
     @Override
-    public void visitPORollupHIIForEach(PORollupHIIForEach op) throws 
VisitorException {
-        visitPOForEach(op);
-    }
-
-    @Override
     public void visitGlobalRearrange(POGlobalRearrange op) throws 
VisitorException{
         try{
             blocking(op);
@@ -1950,8 +1945,13 @@ public class MRCompiler extends PhyPlanV
                 ep.add(prj);
                 eps.add(ep);
                 if (!inner[i]) {
-                    // Add an empty bag for outer join
-                    CompilerUtils.addEmptyBagOuterJoin(ep, op.getSchema(i));
+                    // Add an empty bag for outer join.
+                    if (i == 0) {
+                        // For right outer, add IsFirstReduceOfKey UDF as well
+                        CompilerUtils.addEmptyBagOuterJoin(ep, 
op.getSchema(i), true, IsFirstReduceOfKey.class.getName());
+                    } else {
+                        CompilerUtils.addEmptyBagOuterJoin(ep, 
op.getSchema(i), false, IsFirstReduceOfKey.class.getName());
+                    }
                 }
                 flat.add(true);
             }

Modified: 
pig/branches/spark/src/org/apache/pig/backend/hadoop/executionengine/mapReduceLayer/PigBigDecimalRawComparator.java
URL: 
http://svn.apache.org/viewvc/pig/branches/spark/src/org/apache/pig/backend/hadoop/executionengine/mapReduceLayer/PigBigDecimalRawComparator.java?rev=1733627&r1=1733626&r2=1733627&view=diff
==============================================================================
--- 
pig/branches/spark/src/org/apache/pig/backend/hadoop/executionengine/mapReduceLayer/PigBigDecimalRawComparator.java
 (original)
+++ 
pig/branches/spark/src/org/apache/pig/backend/hadoop/executionengine/mapReduceLayer/PigBigDecimalRawComparator.java
 Fri Mar  4 18:17:39 2016
@@ -25,7 +25,6 @@ import org.apache.commons.logging.LogFac
 import org.apache.hadoop.conf.Configurable;
 import org.apache.hadoop.conf.Configuration;
 import org.apache.hadoop.io.WritableComparator;
-import org.apache.hadoop.mapred.JobConf;
 import org.apache.pig.backend.hadoop.BigDecimalWritable;
 import org.apache.pig.impl.io.NullableBigDecimalWritable;
 import org.apache.pig.impl.util.ObjectSerializer;
@@ -74,8 +73,10 @@ public class PigBigDecimalRawComparator
         if (b1[s1] == 0 && b2[s2] == 0) {
             rc = mWrappedComp.compare(b1, s1 + 1, l1 - 2, b2, s2 + 1, l2 - 2);
         } else {
-            // For sorting purposes two nulls are equal.
-            if (b1[s1] != 0 && b2[s2] != 0) rc = 0;
+            // Two nulls are equal if indices are same
+            if (b1[s1] != 0 && b2[s2] != 0) {
+                rc = b1[s1 + 1] - b2[s2 + 1];
+            }
             else if (b1[s1] != 0) rc = -1;
             else rc = 1;
         }
@@ -93,8 +94,10 @@ public class PigBigDecimalRawComparator
         if (!ndw1.isNull() && !ndw2.isNull()) {
             rc = 
((BigDecimal)ndw1.getValueAsPigType()).compareTo((BigDecimal)ndw2.getValueAsPigType());
         } else {
-            // For sorting purposes two nulls are equal.
-            if (ndw1.isNull() && ndw2.isNull()) rc = 0;
+            // Two nulls are equal if indices are same
+            if (ndw1.isNull() && ndw2.isNull()) {
+                rc = ndw1.getIndex() - ndw2.getIndex();
+            }
             else if (ndw1.isNull()) rc = -1;
             else rc = 1;
         }

Modified: 
pig/branches/spark/src/org/apache/pig/backend/hadoop/executionengine/mapReduceLayer/PigBigIntegerRawComparator.java
URL: 
http://svn.apache.org/viewvc/pig/branches/spark/src/org/apache/pig/backend/hadoop/executionengine/mapReduceLayer/PigBigIntegerRawComparator.java?rev=1733627&r1=1733626&r2=1733627&view=diff
==============================================================================
--- 
pig/branches/spark/src/org/apache/pig/backend/hadoop/executionengine/mapReduceLayer/PigBigIntegerRawComparator.java
 (original)
+++ 
pig/branches/spark/src/org/apache/pig/backend/hadoop/executionengine/mapReduceLayer/PigBigIntegerRawComparator.java
 Fri Mar  4 18:17:39 2016
@@ -25,7 +25,6 @@ import org.apache.commons.logging.LogFac
 import org.apache.hadoop.conf.Configurable;
 import org.apache.hadoop.conf.Configuration;
 import org.apache.hadoop.io.WritableComparator;
-import org.apache.hadoop.mapred.JobConf;
 import org.apache.pig.backend.hadoop.BigIntegerWritable;
 import org.apache.pig.impl.io.NullableBigIntegerWritable;
 import org.apache.pig.impl.util.ObjectSerializer;
@@ -74,8 +73,10 @@ public class PigBigIntegerRawComparator
         if (b1[s1] == 0 && b2[s2] == 0) {
             rc = mWrappedComp.compare(b1, s1 + 1, l1 - 2, b2, s2 + 1, l2 - 2);
         } else {
-            // For sorting purposes two nulls are equal.
-            if (b1[s1] != 0 && b2[s2] != 0) rc = 0;
+            // Two nulls are equal if indices are same
+            if (b1[s1] != 0 && b2[s2] != 0) {
+                rc = b1[s1 + 1] - b2[s2 + 1];
+            }
             else if (b1[s1] != 0) rc = -1;
             else rc = 1;
         }
@@ -93,8 +94,10 @@ public class PigBigIntegerRawComparator
         if (!ndw1.isNull() && !ndw2.isNull()) {
             rc = 
((BigInteger)ndw1.getValueAsPigType()).compareTo((BigInteger)ndw2.getValueAsPigType());
         } else {
-            // For sorting purposes two nulls are equal.
-            if (ndw1.isNull() && ndw2.isNull()) rc = 0;
+            // Two nulls are equal if indices are same
+            if (ndw1.isNull() && ndw2.isNull()) {
+                rc = ndw1.getIndex() - ndw2.getIndex();
+            }
             else if (ndw1.isNull()) rc = -1;
             else rc = 1;
         }

Modified: 
pig/branches/spark/src/org/apache/pig/backend/hadoop/executionengine/mapReduceLayer/PigBooleanRawComparator.java
URL: 
http://svn.apache.org/viewvc/pig/branches/spark/src/org/apache/pig/backend/hadoop/executionengine/mapReduceLayer/PigBooleanRawComparator.java?rev=1733627&r1=1733626&r2=1733627&view=diff
==============================================================================
--- 
pig/branches/spark/src/org/apache/pig/backend/hadoop/executionengine/mapReduceLayer/PigBooleanRawComparator.java
 (original)
+++ 
pig/branches/spark/src/org/apache/pig/backend/hadoop/executionengine/mapReduceLayer/PigBooleanRawComparator.java
 Fri Mar  4 18:17:39 2016
@@ -25,7 +25,6 @@ import org.apache.hadoop.conf.Configurab
 import org.apache.hadoop.conf.Configuration;
 import org.apache.hadoop.io.BooleanWritable;
 import org.apache.hadoop.io.WritableComparator;
-import org.apache.hadoop.mapred.JobConf;
 import org.apache.pig.impl.io.NullableBooleanWritable;
 import org.apache.pig.impl.util.ObjectSerializer;
 
@@ -39,6 +38,7 @@ public class PigBooleanRawComparator ext
         super(NullableBooleanWritable.class);
         mWrappedComp = new BooleanWritable.Comparator();
     }
+    @Override
     public void setConf(Configuration conf) {
         try {
             mAsc = (boolean[])ObjectSerializer.deserialize(conf.get(
@@ -54,6 +54,7 @@ public class PigBooleanRawComparator ext
         }
     }
 
+    @Override
     public Configuration getConf() {
         return null;
     }
@@ -63,6 +64,7 @@ public class PigBooleanRawComparator ext
      * then BooleanWritable.compare() is used.  If both are null then the 
indices
      * are compared.  Otherwise the null one is defined to be less.
      */
+    @Override
     public int compare(byte[] b1, int s1, int l1, byte[] b2, int s2, int l2) {
         int rc = 0;
 
@@ -72,8 +74,10 @@ public class PigBooleanRawComparator ext
             byte byte2 = b2[s2 + 1];
             rc = (byte1 < byte2) ? -1 : ((byte1 > byte2) ? 1 : 0);
         } else {
-            // For sorting purposes two nulls are equal.
-            if (b1[s1] != 0 && b2[s2] != 0) rc = 0;
+            // Two nulls are equal if indices are same
+            if (b1[s1] != 0 && b2[s2] != 0) {
+                rc = b1[s1 + 1] - b2[s2 + 1];
+            }
             else if (b1[s1] != 0) rc = -1;
             else rc = 1;
         }
@@ -81,6 +85,7 @@ public class PigBooleanRawComparator ext
         return rc;
     }
 
+    @Override
     public int compare(Object o1, Object o2) {
         NullableBooleanWritable nbw1 = (NullableBooleanWritable)o1;
         NullableBooleanWritable nbw2 = (NullableBooleanWritable)o2;
@@ -90,8 +95,10 @@ public class PigBooleanRawComparator ext
         if (!nbw1.isNull() && !nbw2.isNull()) {
             rc = 
((Boolean)nbw1.getValueAsPigType()).compareTo((Boolean)nbw2.getValueAsPigType());
         } else {
-            // For sorting purposes two nulls are equal.
-            if (nbw1.isNull() && nbw2.isNull()) rc = 0;
+            // Two nulls are equal if indices are same
+            if (nbw1.isNull() && nbw2.isNull()) {
+                rc = nbw1.getIndex() - nbw2.getIndex();
+            }
             else if (nbw1.isNull()) rc = -1;
             else rc = 1;
         }

Modified: 
pig/branches/spark/src/org/apache/pig/backend/hadoop/executionengine/mapReduceLayer/PigBytesRawComparator.java
URL: 
http://svn.apache.org/viewvc/pig/branches/spark/src/org/apache/pig/backend/hadoop/executionengine/mapReduceLayer/PigBytesRawComparator.java?rev=1733627&r1=1733626&r2=1733627&view=diff
==============================================================================
--- 
pig/branches/spark/src/org/apache/pig/backend/hadoop/executionengine/mapReduceLayer/PigBytesRawComparator.java
 (original)
+++ 
pig/branches/spark/src/org/apache/pig/backend/hadoop/executionengine/mapReduceLayer/PigBytesRawComparator.java
 Fri Mar  4 18:17:39 2016
@@ -24,7 +24,6 @@ import org.apache.commons.logging.LogFac
 import org.apache.hadoop.conf.Configurable;
 import org.apache.hadoop.conf.Configuration;
 import org.apache.hadoop.io.WritableComparator;
-import org.apache.hadoop.mapred.JobConf;
 import org.apache.pig.data.BinInterSedes;
 import org.apache.pig.data.DataType;
 import org.apache.pig.impl.io.NullableBytesWritable;
@@ -34,13 +33,14 @@ public class PigBytesRawComparator exten
 
     private final Log mLog = LogFactory.getLog(getClass());
     private boolean[] mAsc;
-    private WritableComparator mWrappedComp;
+    private BinInterSedes.BinInterSedesTupleRawComparator mWrappedComp;
 
     public PigBytesRawComparator() {
         super(NullableBytesWritable.class);
         mWrappedComp = new BinInterSedes.BinInterSedesTupleRawComparator();
     }
 
+    @Override
     public void setConf(Configuration conf) {
         try {
             mAsc = (boolean[])ObjectSerializer.deserialize(conf.get(
@@ -57,6 +57,7 @@ public class PigBytesRawComparator exten
         
((BinInterSedes.BinInterSedesTupleRawComparator)mWrappedComp).setConf(conf);
     }
 
+    @Override
     public Configuration getConf() {
         return null;
     }
@@ -69,6 +70,7 @@ public class PigBytesRawComparator exten
      *    For non-bytearrays, we use BinInterSedesTupleRawComparator.
      * If either is null, null one is defined to be less.
      */
+    @Override
     public int compare(byte[] b1, int s1, int l1, byte[] b2, int s2, int l2) {
         int rc = 0;
 
@@ -122,14 +124,26 @@ public class PigBytesRawComparator exten
             if( dataByteArraysCompare ) {
               rc = WritableComparator.compareBytes(b1, offset1, length1, b2, 
offset2, length2);
             } else {
-              // Subtract 2, one for null byte and one for index byte. Also, 
do not reverse the sign
-              // of rc when mAsc[0] is false because 
BinInterSedesTupleRawComparator.compare() already
-              // takes that into account.
-              return mWrappedComp.compare(b1, s1 + 1, l1 - 2, b2, s2 + 1, l2 - 
2);
+              // Subtract 2, one for null byte and one for index byte.
+              rc = mWrappedComp.compare(b1, s1 + 1, l1 - 2, b2, s2 + 1, l2 - 
2);
+              // handle PIG-927. If tuples are equal but any field inside 
tuple is null,
+              // then we do not merge keys if indices are not same
+              if (rc == 0 && mWrappedComp.hasComparedTupleNull()) {
+                rc = b1[s1 + 1] - b2[s2 + 1];
+                // Redundant as there will not be any sort order with multiple 
indices
+                // But just for sake of completeness.
+                if (!mAsc[0]) rc *= -1;
+              }
+              // PIG-4298 - Return here to avoid reversing the sign of rc when
+              // mAsc[0] is false because 
BinInterSedesTupleRawComparator.compare()
+              // already takes that into account.
+              return rc;
             }
         } else {
-            // For sorting purposes two nulls are equal.
-            if (b1[s1] != 0 && b2[s2] != 0) rc = 0;
+            // Two nulls are equal if indices are same
+            if (b1[s1] != 0 && b2[s2] != 0) {
+                rc = b1[s1 + 1] - b2[s2 + 1];
+            }
             else if (b1[s1] != 0) rc = -1;
             else rc = 1;
         }
@@ -137,6 +151,7 @@ public class PigBytesRawComparator exten
         return rc;
     }
 
+    @Override
     public int compare(Object o1, Object o2) {
         NullableBytesWritable nbw1 = (NullableBytesWritable)o1;
         NullableBytesWritable nbw2 = (NullableBytesWritable)o2;
@@ -146,8 +161,10 @@ public class PigBytesRawComparator exten
         if (!nbw1.isNull() && !nbw2.isNull()) {
             rc = DataType.compare(nbw1.getValueAsPigType(), 
nbw2.getValueAsPigType());
         } else {
-            // For sorting purposes two nulls are equal.
-            if (nbw1.isNull() && nbw2.isNull()) rc = 0;
+            // Two nulls are equal if indices are same
+            if (nbw1.isNull() && nbw2.isNull()) {
+                rc = nbw1.getIndex() - nbw2.getIndex();
+            }
             else if (nbw1.isNull()) rc = -1;
             else rc = 1;
         }

Modified: 
pig/branches/spark/src/org/apache/pig/backend/hadoop/executionengine/mapReduceLayer/PigCombiner.java
URL: 
http://svn.apache.org/viewvc/pig/branches/spark/src/org/apache/pig/backend/hadoop/executionengine/mapReduceLayer/PigCombiner.java?rev=1733627&r1=1733626&r2=1733627&view=diff
==============================================================================
--- 
pig/branches/spark/src/org/apache/pig/backend/hadoop/executionengine/mapReduceLayer/PigCombiner.java
 (original)
+++ 
pig/branches/spark/src/org/apache/pig/backend/hadoop/executionengine/mapReduceLayer/PigCombiner.java
 Fri Mar  4 18:17:39 2016
@@ -27,9 +27,7 @@ import org.apache.hadoop.conf.Configurat
 import org.apache.hadoop.io.Writable;
 import org.apache.hadoop.mapreduce.Reducer;
 import org.apache.log4j.PropertyConfigurator;
-import org.apache.pig.JVMReuseManager;
 import org.apache.pig.PigException;
-import org.apache.pig.StaticDataCleanup;
 import org.apache.pig.backend.executionengine.ExecException;
 import org.apache.pig.backend.hadoop.HDataType;
 import org.apache.pig.backend.hadoop.executionengine.physicalLayer.POStatus;
@@ -38,11 +36,13 @@ import org.apache.pig.backend.hadoop.exe
 import 
org.apache.pig.backend.hadoop.executionengine.physicalLayer.plans.PhysicalPlan;
 import 
org.apache.pig.backend.hadoop.executionengine.physicalLayer.relationalOperators.JoinPackager;
 import 
org.apache.pig.backend.hadoop.executionengine.physicalLayer.relationalOperators.POPackage;
+import org.apache.pig.backend.hadoop.executionengine.util.MapRedUtil;
 import org.apache.pig.data.Tuple;
 import org.apache.pig.impl.PigContext;
 import org.apache.pig.impl.io.NullableTuple;
 import org.apache.pig.impl.io.PigNullableWritable;
 import org.apache.pig.impl.util.ObjectSerializer;
+import org.apache.pig.impl.util.UDFContext;
 import org.apache.pig.tools.pigstats.PigStatusReporter;
 
 public class PigCombiner {
@@ -75,11 +75,7 @@ public class PigCombiner {
         PigContext pigContext = null;
         private volatile boolean initialized = false;
 
-        static {
-            
JVMReuseManager.getInstance().registerForStaticDataCleanup(Combine.class);
-        }
-
-        @StaticDataCleanup
+        //@StaticDataCleanup
         public static void staticDataCleanup() {
             firstTime = true;
         }
@@ -98,6 +94,8 @@ public class PigCombiner {
                 pigContext = 
(PigContext)ObjectSerializer.deserialize(jConf.get("pig.pigContext"));
                 if (pigContext.getLog4jProperties()!=null)
                     
PropertyConfigurator.configure(pigContext.getLog4jProperties());
+                UDFContext.getUDFContext().reset();
+                MapRedUtil.setupUDFContext(context.getConfiguration());
 
                 cp = (PhysicalPlan) ObjectSerializer.deserialize(jConf
                         .get("pig.combinePlan"));

Modified: 
pig/branches/spark/src/org/apache/pig/backend/hadoop/executionengine/mapReduceLayer/PigDateTimeRawComparator.java
URL: 
http://svn.apache.org/viewvc/pig/branches/spark/src/org/apache/pig/backend/hadoop/executionengine/mapReduceLayer/PigDateTimeRawComparator.java?rev=1733627&r1=1733626&r2=1733627&view=diff
==============================================================================
--- 
pig/branches/spark/src/org/apache/pig/backend/hadoop/executionengine/mapReduceLayer/PigDateTimeRawComparator.java
 (original)
+++ 
pig/branches/spark/src/org/apache/pig/backend/hadoop/executionengine/mapReduceLayer/PigDateTimeRawComparator.java
 Fri Mar  4 18:17:39 2016
@@ -20,17 +20,15 @@ package org.apache.pig.backend.hadoop.ex
 
 import java.io.IOException;
 
-import org.joda.time.DateTime;
-
 import org.apache.commons.logging.Log;
 import org.apache.commons.logging.LogFactory;
 import org.apache.hadoop.conf.Configurable;
 import org.apache.hadoop.conf.Configuration;
 import org.apache.hadoop.io.WritableComparator;
-import org.apache.hadoop.mapred.JobConf;
 import org.apache.pig.backend.hadoop.DateTimeWritable;
 import org.apache.pig.impl.io.NullableDateTimeWritable;
 import org.apache.pig.impl.util.ObjectSerializer;
+import org.joda.time.DateTime;
 
 public class PigDateTimeRawComparator extends WritableComparator implements
         Configurable {
@@ -44,6 +42,7 @@ public class PigDateTimeRawComparator ex
         mWrappedComp = new DateTimeWritable.Comparator();
     }
 
+    @Override
     public void setConf(Configuration conf) {
         try {
             mAsc = (boolean[]) ObjectSerializer.deserialize(conf
@@ -59,6 +58,7 @@ public class PigDateTimeRawComparator ex
         }
     }
 
+    @Override
     public Configuration getConf() {
         return null;
     }
@@ -68,6 +68,7 @@ public class PigDateTimeRawComparator ex
      * IntWritable.compare() is used. If both are null then the indices are
      * compared. Otherwise the null one is defined to be less.
      */
+    @Override
     public int compare(byte[] b1, int s1, int l1, byte[] b2, int s2, int l2) {
         int rc = 0;
 
@@ -75,9 +76,10 @@ public class PigDateTimeRawComparator ex
         if (b1[s1] == 0 && b2[s2] == 0) {
             rc = mWrappedComp.compare(b1, s1 + 1, l1 - 2, b2, s2 + 1, l2 - 2);
         } else {
-            // For sorting purposes two nulls are equal.
-            if (b1[s1] != 0 && b2[s2] != 0)
-                rc = 0;
+            // Two nulls are equal if indices are same
+            if (b1[s1] != 0 && b2[s2] != 0) {
+                rc = b1[s1 + 1] - b2[s2 + 1];
+            }
             else if (b1[s1] != 0)
                 rc = -1;
             else
@@ -88,6 +90,7 @@ public class PigDateTimeRawComparator ex
         return rc;
     }
 
+    @Override
     public int compare(Object o1, Object o2) {
         NullableDateTimeWritable ndtw1 = (NullableDateTimeWritable) o1;
         NullableDateTimeWritable ndtw2 = (NullableDateTimeWritable) o2;
@@ -98,9 +101,10 @@ public class PigDateTimeRawComparator ex
             rc = ((DateTime) ndtw1.getValueAsPigType())
                     .compareTo((DateTime) ndtw2.getValueAsPigType());
         } else {
-            // For sorting purposes two nulls are equal.
-            if (ndtw1.isNull() && ndtw2.isNull())
-                rc = 0;
+            // Two nulls are equal if indices are same
+            if (ndtw1.isNull() && ndtw2.isNull()) {
+                rc = ndtw1.getIndex() - ndtw2.getIndex();
+            }
             else if (ndtw1.isNull())
                 rc = -1;
             else

Modified: 
pig/branches/spark/src/org/apache/pig/backend/hadoop/executionengine/mapReduceLayer/PigDoubleRawComparator.java
URL: 
http://svn.apache.org/viewvc/pig/branches/spark/src/org/apache/pig/backend/hadoop/executionengine/mapReduceLayer/PigDoubleRawComparator.java?rev=1733627&r1=1733626&r2=1733627&view=diff
==============================================================================
--- 
pig/branches/spark/src/org/apache/pig/backend/hadoop/executionengine/mapReduceLayer/PigDoubleRawComparator.java
 (original)
+++ 
pig/branches/spark/src/org/apache/pig/backend/hadoop/executionengine/mapReduceLayer/PigDoubleRawComparator.java
 Fri Mar  4 18:17:39 2016
@@ -21,12 +21,9 @@ import java.io.IOException;
 
 import org.apache.commons.logging.Log;
 import org.apache.commons.logging.LogFactory;
-
 import org.apache.hadoop.conf.Configurable;
 import org.apache.hadoop.conf.Configuration;
 import org.apache.hadoop.io.WritableComparator;
-import org.apache.hadoop.mapred.JobConf;
-
 import org.apache.pig.backend.hadoop.DoubleWritable;
 import org.apache.pig.impl.io.NullableDoubleWritable;
 import org.apache.pig.impl.util.ObjectSerializer;
@@ -42,6 +39,7 @@ public class PigDoubleRawComparator exte
         mWrappedComp = new DoubleWritable.Comparator();
     }
 
+    @Override
     public void setConf(Configuration conf) {
         try {
             mAsc = (boolean[])ObjectSerializer.deserialize(conf.get(
@@ -57,6 +55,7 @@ public class PigDoubleRawComparator exte
         }
     }
 
+    @Override
     public Configuration getConf() {
         return null;
     }
@@ -66,6 +65,7 @@ public class PigDoubleRawComparator exte
      * then IntWritable.compare() is used.  If both are null then the indices
      * are compared.  Otherwise the null one is defined to be less.
      */
+    @Override
     public int compare(byte[] b1, int s1, int l1, byte[] b2, int s2, int l2) {
         int rc = 0;
 
@@ -73,8 +73,10 @@ public class PigDoubleRawComparator exte
         if (b1[s1] == 0 && b2[s2] == 0) {
             rc = mWrappedComp.compare(b1, s1 + 1, l1 - 2, b2, s2 + 1, l2 - 2);
         } else {
-            // For sorting purposes two nulls are equal.
-            if (b1[s1] != 0 && b2[s2] != 0) rc = 0;
+            // Two nulls are equal if indices are same
+            if (b1[s1] != 0 && b2[s2] != 0) {
+                rc = b1[s1 + 1] - b2[s2 + 1];
+            }
             else if (b1[s1] != 0) rc = -1;
             else rc = 1;
         }
@@ -82,6 +84,7 @@ public class PigDoubleRawComparator exte
         return rc;
     }
 
+    @Override
     public int compare(Object o1, Object o2) {
         NullableDoubleWritable ndw1 = (NullableDoubleWritable)o1;
         NullableDoubleWritable ndw2 = (NullableDoubleWritable)o2;
@@ -91,8 +94,10 @@ public class PigDoubleRawComparator exte
         if (!ndw1.isNull() && !ndw2.isNull()) {
             rc = 
((Double)ndw1.getValueAsPigType()).compareTo((Double)ndw2.getValueAsPigType());
         } else {
-            // For sorting purposes two nulls are equal.
-            if (ndw1.isNull() && ndw2.isNull()) rc = 0;
+            // Two nulls are equal if indices are same
+            if (ndw1.isNull() && ndw2.isNull()) {
+                rc = ndw1.getIndex() - ndw2.getIndex();
+            }
             else if (ndw1.isNull()) rc = -1;
             else rc = 1;
         }

Modified: 
pig/branches/spark/src/org/apache/pig/backend/hadoop/executionengine/mapReduceLayer/PigFloatRawComparator.java
URL: 
http://svn.apache.org/viewvc/pig/branches/spark/src/org/apache/pig/backend/hadoop/executionengine/mapReduceLayer/PigFloatRawComparator.java?rev=1733627&r1=1733626&r2=1733627&view=diff
==============================================================================
--- 
pig/branches/spark/src/org/apache/pig/backend/hadoop/executionengine/mapReduceLayer/PigFloatRawComparator.java
 (original)
+++ 
pig/branches/spark/src/org/apache/pig/backend/hadoop/executionengine/mapReduceLayer/PigFloatRawComparator.java
 Fri Mar  4 18:17:39 2016
@@ -21,13 +21,10 @@ import java.io.IOException;
 
 import org.apache.commons.logging.Log;
 import org.apache.commons.logging.LogFactory;
-
 import org.apache.hadoop.conf.Configurable;
 import org.apache.hadoop.conf.Configuration;
 import org.apache.hadoop.io.FloatWritable;
 import org.apache.hadoop.io.WritableComparator;
-import org.apache.hadoop.mapred.JobConf;
-
 import org.apache.pig.impl.io.NullableFloatWritable;
 import org.apache.pig.impl.util.ObjectSerializer;
 
@@ -42,6 +39,7 @@ public class PigFloatRawComparator exten
         mWrappedComp = new FloatWritable.Comparator();
     }
 
+    @Override
     public void setConf(Configuration conf) {
         try {
             mAsc = (boolean[])ObjectSerializer.deserialize(conf.get(
@@ -57,6 +55,7 @@ public class PigFloatRawComparator exten
         }
     }
 
+    @Override
     public Configuration getConf() {
         return null;
     }
@@ -66,6 +65,7 @@ public class PigFloatRawComparator exten
      * then IntWritable.compare() is used.  If both are null then the indices
      * are compared.  Otherwise the null one is defined to be less.
      */
+    @Override
     public int compare(byte[] b1, int s1, int l1, byte[] b2, int s2, int l2) {
         int rc = 0;
 
@@ -73,8 +73,10 @@ public class PigFloatRawComparator exten
         if (b1[s1] == 0 && b2[s2] == 0) {
             rc = mWrappedComp.compare(b1, s1 + 1, l1 - 2, b2, s2 + 1, l2 - 2);
         } else {
-            // For sorting purposes two nulls are equal.
-            if (b1[s1] != 0 && b2[s2] != 0) rc = 0;
+            // Two nulls are equal if indices are same
+            if (b1[s1] != 0 && b2[s2] != 0) {
+                rc = b1[s1 + 1] - b2[s2 + 1];
+            }
             else if (b1[s1] != 0) rc = -1;
             else rc = 1;
         }
@@ -82,6 +84,7 @@ public class PigFloatRawComparator exten
         return rc;
     }
 
+    @Override
     public int compare(Object o1, Object o2) {
         NullableFloatWritable nfw1 = (NullableFloatWritable)o1;
         NullableFloatWritable nfw2 = (NullableFloatWritable)o2;
@@ -91,8 +94,10 @@ public class PigFloatRawComparator exten
         if (!nfw1.isNull() && !nfw2.isNull()) {
             rc = 
((Float)nfw1.getValueAsPigType()).compareTo((Float)nfw2.getValueAsPigType());
         } else {
-            // For sorting purposes two nulls are equal.
-            if (nfw1.isNull() && nfw2.isNull()) rc = 0;
+            // Two nulls are equal if indices are same
+            if (nfw1.isNull() && nfw2.isNull()) {
+                rc = nfw1.getIndex() - nfw2.getIndex();
+            }
             else if (nfw1.isNull()) rc = -1;
             else rc = 1;
         }

Modified: 
pig/branches/spark/src/org/apache/pig/backend/hadoop/executionengine/mapReduceLayer/PigGenericMapBase.java
URL: 
http://svn.apache.org/viewvc/pig/branches/spark/src/org/apache/pig/backend/hadoop/executionengine/mapReduceLayer/PigGenericMapBase.java?rev=1733627&r1=1733626&r2=1733627&view=diff
==============================================================================
--- 
pig/branches/spark/src/org/apache/pig/backend/hadoop/executionengine/mapReduceLayer/PigGenericMapBase.java
 (original)
+++ 
pig/branches/spark/src/org/apache/pig/backend/hadoop/executionengine/mapReduceLayer/PigGenericMapBase.java
 Fri Mar  4 18:17:39 2016
@@ -112,7 +112,7 @@ public abstract class PigGenericMapBase
             return;
         }
 
-        
if(PigMapReduce.sJobConfInternal.get().get(JobControlCompiler.END_OF_INP_IN_MAP,
 "false").equals("true")) {
+        
if(PigMapReduce.sJobConfInternal.get().get(JobControlCompiler.END_OF_INP_IN_MAP,
 "false").equals("true") && !mp.isEmpty()) {
             // If there is a stream in the pipeline or if this map job belongs 
to merge-join we could
             // potentially have more to process - so lets
             // set the flag stating that all map input has been sent

Modified: 
pig/branches/spark/src/org/apache/pig/backend/hadoop/executionengine/mapReduceLayer/PigGenericMapReduce.java
URL: 
http://svn.apache.org/viewvc/pig/branches/spark/src/org/apache/pig/backend/hadoop/executionengine/mapReduceLayer/PigGenericMapReduce.java?rev=1733627&r1=1733626&r2=1733627&view=diff
==============================================================================
--- 
pig/branches/spark/src/org/apache/pig/backend/hadoop/executionengine/mapReduceLayer/PigGenericMapReduce.java
 (original)
+++ 
pig/branches/spark/src/org/apache/pig/backend/hadoop/executionengine/mapReduceLayer/PigGenericMapReduce.java
 Fri Mar  4 18:17:39 2016
@@ -30,11 +30,8 @@ import org.apache.hadoop.io.Writable;
 import org.apache.hadoop.mapred.jobcontrol.Job;
 import org.apache.hadoop.mapreduce.JobContext;
 import org.apache.hadoop.mapreduce.Reducer;
-import org.apache.hadoop.mapreduce.Mapper.Context;
-import org.apache.pig.JVMReuseManager;
 import org.apache.pig.PigConstants;
 import org.apache.pig.PigException;
-import org.apache.pig.StaticDataCleanup;
 import org.apache.pig.backend.executionengine.ExecException;
 import org.apache.pig.backend.hadoop.HDataType;
 import org.apache.pig.backend.hadoop.datastorage.ConfigurationUtil;
@@ -51,7 +48,6 @@ import org.apache.pig.data.DataBag;
 import org.apache.pig.data.DataType;
 import org.apache.pig.data.SchemaTupleBackend;
 import org.apache.pig.data.Tuple;
-import org.apache.pig.data.TupleFactory;
 import org.apache.pig.impl.PigContext;
 import org.apache.pig.impl.io.NullablePartitionWritable;
 import org.apache.pig.impl.io.NullableTuple;
@@ -105,11 +101,7 @@ public class PigGenericMapReduce {
 
     public static ThreadLocal<Configuration> sJobConfInternal = new 
ThreadLocal<Configuration>();
 
-    static {
-        
JVMReuseManager.getInstance().registerForStaticDataCleanup(PigGenericMapReduce.class);
-    }
-
-    @StaticDataCleanup
+    //@StaticDataCleanup
     public static void staticDataCleanup() {
         sJobContext = null;
         sJobConf = null;
@@ -139,92 +131,6 @@ public class PigGenericMapReduce {
     }
 
     /**
-     * This map is only used for the Rollup when the RollupHIIOptimizer is 
enabled
-     *
-     */
-    public static class MapRollupHII extends PigMapBase {
-        @Override
-        public void collect(Context oc, Tuple tuple)
-                throws InterruptedException, IOException {
-
-            Byte index = (Byte)tuple.get(0);
-            PigNullableWritable key =
-                HDataType.getWritableComparableTypes(tuple.get(1), keyType);
-            NullableTuple val = new NullableTuple((Tuple)tuple.get(2));
-
-            // Both the key and the value need the index.  The key needs it so
-            // that it can be sorted on the index in addition to the key
-            // value.  The value needs it so that POPackage can properly
-            // assign the tuple to its slot in the projection.
-            key.setIndex(index);
-            val.setIndex(index);
-
-            oc.write(key, val);
-        }
-
-        @Override
-        public void cleanup(Context oc)
-                throws InterruptedException, IOException {
-
-            Configuration jConf = oc.getConfiguration();
-
-            boolean isHII = 
jConf.getBoolean(PigConstants.PIG_HII_ROLLUP_OPTIMIZABLE, false);
-            //If our rule is enabled and is using, there will be a 
PORollupHIIForEach
-            //We will create marker tuples which are considered as markers for 
reducers
-            //to calculate the remaining results when that reducer goes to the 
end of the
-            //input records. This marker tuple will have larger size than the 
defaut by one
-            //dimension. This addition dimension will be the value which are 
ranged from 0 to
-            //number of reducers. By this addition, we can make sure that 
every reducers can
-            //receive these marker tuples to finish their works.
-            if(isHII) {
-                int reducerNo = jConf.getInt("mapred.reduce.tasks", 0);
-                int length = 
jConf.getInt(PigConstants.PIG_HII_NUMBER_TOTAL_FIELD, 0);
-                int nAlgebraic = 
jConf.getInt(PigConstants.PIG_HII_NUMBER_ALGEBRAIC, 1);
-
-                if(length == 0)
-                    return;
-
-                TupleFactory mTupleFactory = TupleFactory.getInstance();
-                //An array of marker tuples which has size equals to number of 
reducers
-                Tuple group[] = new Tuple[reducerNo];
-                int count = 0;
-                //Make sure that all reducers will receive those marker tuples
-                while(count < reducerNo) {
-                    //Create marker tuple with last field is the reducer's 
index,
-                    //the rest are null.
-                    group[count] = mTupleFactory.newTuple();
-                    for (int k = 0; k <= length; k++) {
-                        if(k < length) {
-                            group[count].append(null);
-                        } else {
-                            group[count].append(count);
-                        }
-                    }
-
-                    Tuple value = mTupleFactory.newTuple();
-                    Tuple []tmp = new Tuple[nAlgebraic];
-                    long valtmp = 1;
-                    for(int i = 0; i < nAlgebraic; i++){
-                        tmp[i] = mTupleFactory.newTuple();
-                        tmp[i].append(valtmp);
-                        value.append(tmp[i]);
-                    }
-                    Tuple out = mTupleFactory.newTuple();
-                    out.append(0);
-                    out.append(group[count]);
-                    out.append(value);
-
-                    PigNullableWritable key = 
HDataType.getWritableComparableTypes(out.get(1), keyType);
-                    NullableTuple val = new NullableTuple((Tuple)out.get(2));
-                    oc.write(key, val);
-                    count++;
-                }
-            }
-            super.cleanup(oc);
-         }
-    }
-
-    /**
      * This "specialized" map class is ONLY to be used in pig queries with
      * order by a udf. A UDF used for comparison in the order by expects
      * to be handed tuples. Hence this map class ensures that the "key" used
@@ -609,7 +515,7 @@ public class PigGenericMapReduce {
                 return;
             }
 
-            if(PigMapReduce.sJobConfInternal.get().get("pig.stream.in.reduce", 
"false").equals("true")) {
+            if(PigMapReduce.sJobConfInternal.get().get("pig.stream.in.reduce", 
"false").equals("true") && !rp.isEmpty()) {
                 // If there is a stream in the pipeline we could
                 // potentially have more to process - so lets
                 // set the flag stating that all map input has been sent

Modified: 
pig/branches/spark/src/org/apache/pig/backend/hadoop/executionengine/mapReduceLayer/PigInputFormat.java
URL: 
http://svn.apache.org/viewvc/pig/branches/spark/src/org/apache/pig/backend/hadoop/executionengine/mapReduceLayer/PigInputFormat.java?rev=1733627&r1=1733626&r2=1733627&view=diff
==============================================================================
--- 
pig/branches/spark/src/org/apache/pig/backend/hadoop/executionengine/mapReduceLayer/PigInputFormat.java
 (original)
+++ 
pig/branches/spark/src/org/apache/pig/backend/hadoop/executionengine/mapReduceLayer/PigInputFormat.java
 Fri Mar  4 18:17:39 2016
@@ -57,6 +57,9 @@ public class PigInputFormat extends Inpu
             .getLog(PigInputFormat.class);
 
     public static final String PIG_INPUTS = "pig.inputs";
+    public static final String PIG_INPUT_TARGETS = "pig.inpTargets";
+    public static final String PIG_INPUT_SIGNATURES = "pig.inpSignatures";
+    public static final String PIG_INPUT_LIMITS = "pig.inpLimits";
 
     /**
      * @deprecated Use {@link UDFContext} instead in the following way to get
@@ -109,7 +112,7 @@ public class PigInputFormat extends Inpu
 
         List<Long> inpLimitLists =
                 (ArrayList<Long>)ObjectSerializer.deserialize(
-                        conf.get("pig.inpLimits"));
+                        conf.get(PIG_INPUT_LIMITS));
 
         return new PigRecordReader(inputFormat, pigSplit, loadFunc, context, 
inpLimitLists.get(pigSplit.getInputIndex()));
     }
@@ -171,7 +174,7 @@ public class PigInputFormat extends Inpu
             Configuration conf) throws IOException {
         List<String> inpSignatureLists =
                 (ArrayList<String>)ObjectSerializer.deserialize(
-                        conf.get("pig.inpSignatures"));
+                        conf.get(PIG_INPUT_SIGNATURES));
         // signature can be null for intermediate jobs where it will not
         // be required to be passed down
         if(inpSignatureLists.get(inputIndex) != null) {
@@ -197,9 +200,9 @@ public class PigInputFormat extends Inpu
         PigContext pigContext;
         try {
             inputs = (ArrayList<FileSpec>) ObjectSerializer
-                    .deserialize(conf.get("pig.inputs"));
+                    .deserialize(conf.get(PIG_INPUTS));
             inpTargets = (ArrayList<ArrayList<OperatorKey>>) ObjectSerializer
-                    .deserialize(conf.get("pig.inpTargets"));
+                    .deserialize(conf.get(PIG_INPUT_TARGETS));
             pigContext = (PigContext) ObjectSerializer.deserialize(conf
                     .get("pig.pigContext"));
             
PigContext.setPackageImportList((ArrayList<String>)ObjectSerializer.deserialize(conf.get("udf.import.list")));

Modified: 
pig/branches/spark/src/org/apache/pig/backend/hadoop/executionengine/mapReduceLayer/PigIntRawComparator.java
URL: 
http://svn.apache.org/viewvc/pig/branches/spark/src/org/apache/pig/backend/hadoop/executionengine/mapReduceLayer/PigIntRawComparator.java?rev=1733627&r1=1733626&r2=1733627&view=diff
==============================================================================
--- 
pig/branches/spark/src/org/apache/pig/backend/hadoop/executionengine/mapReduceLayer/PigIntRawComparator.java
 (original)
+++ 
pig/branches/spark/src/org/apache/pig/backend/hadoop/executionengine/mapReduceLayer/PigIntRawComparator.java
 Fri Mar  4 18:17:39 2016
@@ -21,12 +21,9 @@ import java.io.IOException;
 
 import org.apache.commons.logging.Log;
 import org.apache.commons.logging.LogFactory;
-
 import org.apache.hadoop.conf.Configurable;
 import org.apache.hadoop.conf.Configuration;
 import org.apache.hadoop.io.WritableComparator;
-import org.apache.hadoop.mapred.JobConf;
-
 import org.apache.pig.impl.io.NullableIntWritable;
 import org.apache.pig.impl.util.ObjectSerializer;
 
@@ -39,6 +36,7 @@ public class PigIntRawComparator extends
         super(NullableIntWritable.class);
     }
 
+    @Override
     public void setConf(Configuration conf) {
         try {
             mAsc = (boolean[])ObjectSerializer.deserialize(conf.get(
@@ -54,6 +52,7 @@ public class PigIntRawComparator extends
         }
     }
 
+    @Override
     public Configuration getConf() {
         return null;
     }
@@ -63,6 +62,7 @@ public class PigIntRawComparator extends
      * then IntWritable.compare() is used.  If both are null then the indices
      * are compared.  Otherwise the null one is defined to be less.
      */
+    @Override
     public int compare(byte[] b1, int s1, int l1, byte[] b2, int s2, int l2) {
         int rc = 0;
 
@@ -72,8 +72,10 @@ public class PigIntRawComparator extends
             int int2 = readInt(b2, s2 + 1);
             rc = (int1 < int2) ? -1 : ((int1 > int2) ? 1 : 0);
         } else {
-            // For sorting purposes two nulls are equal.
-            if (b1[s1] != 0 && b2[s2] != 0) rc = 0;
+            // Two nulls are equal if indices are same
+            if (b1[s1] != 0 && b2[s2] != 0) {
+                rc = b1[s1 + 1] - b2[s2 + 1];
+            }
             else if (b1[s1] != 0) rc = -1;
             else rc = 1;
         }
@@ -81,6 +83,7 @@ public class PigIntRawComparator extends
         return rc;
     }
 
+    @Override
     public int compare(Object o1, Object o2) {
         NullableIntWritable niw1 = (NullableIntWritable)o1;
         NullableIntWritable niw2 = (NullableIntWritable)o2;
@@ -90,8 +93,10 @@ public class PigIntRawComparator extends
         if (!niw1.isNull() && !niw2.isNull()) {
             rc = 
((Integer)niw1.getValueAsPigType()).compareTo((Integer)niw2.getValueAsPigType());
         } else {
-            // For sorting purposes two nulls are equal.
-            if (niw1.isNull() && niw2.isNull()) rc = 0;
+            // Two nulls are equal if indices are same
+            if (niw1.isNull() && niw2.isNull()) {
+                rc = niw1.getIndex() - niw2.getIndex();
+            }
             else if (niw1.isNull()) rc = -1;
             else rc = 1;
         }

Modified: 
pig/branches/spark/src/org/apache/pig/backend/hadoop/executionengine/mapReduceLayer/PigLongRawComparator.java
URL: 
http://svn.apache.org/viewvc/pig/branches/spark/src/org/apache/pig/backend/hadoop/executionengine/mapReduceLayer/PigLongRawComparator.java?rev=1733627&r1=1733626&r2=1733627&view=diff
==============================================================================
--- 
pig/branches/spark/src/org/apache/pig/backend/hadoop/executionengine/mapReduceLayer/PigLongRawComparator.java
 (original)
+++ 
pig/branches/spark/src/org/apache/pig/backend/hadoop/executionengine/mapReduceLayer/PigLongRawComparator.java
 Fri Mar  4 18:17:39 2016
@@ -21,22 +21,18 @@ import java.io.IOException;
 
 import org.apache.commons.logging.Log;
 import org.apache.commons.logging.LogFactory;
-
 import org.apache.hadoop.conf.Configurable;
 import org.apache.hadoop.conf.Configuration;
 import org.apache.hadoop.io.LongWritable;
 import org.apache.hadoop.io.WritableComparator;
-import org.apache.hadoop.mapred.JobConf;
-
-import org.apache.pig.impl.io.NullableLongWritable;
 import org.apache.pig.impl.io.NullableLongWritable;
 import org.apache.pig.impl.util.ObjectSerializer;
 
 public class PigLongRawComparator extends WritableComparator implements 
Configurable {
 
-    private final Log mLog = LogFactory.getLog(getClass());
-    private boolean[] mAsc;
-    private LongWritable.Comparator mWrappedComp;
+    protected final Log mLog = LogFactory.getLog(getClass());
+    protected boolean[] mAsc;
+    protected LongWritable.Comparator mWrappedComp;
 
     public PigLongRawComparator() {
         super(NullableLongWritable.class);
@@ -44,6 +40,7 @@ public class PigLongRawComparator extend
     }
 
 
+    @Override
     public void setConf(Configuration conf) {
         try {
             mAsc = (boolean[])ObjectSerializer.deserialize(conf.get(
@@ -59,6 +56,7 @@ public class PigLongRawComparator extend
         }
     }
 
+    @Override
     public Configuration getConf() {
         return null;
     }
@@ -68,6 +66,7 @@ public class PigLongRawComparator extend
      * then IntWritable.compare() is used.  If both are null then the indices
      * are compared.  Otherwise the null one is defined to be less.
      */
+    @Override
     public int compare(byte[] b1, int s1, int l1, byte[] b2, int s2, int l2) {
         int rc = 0;
 
@@ -75,8 +74,10 @@ public class PigLongRawComparator extend
         if (b1[s1] == 0 && b2[s2] == 0) {
             rc = mWrappedComp.compare(b1, s1 + 1, l1 - 2, b2, s2 + 1, l2 - 2);
         } else {
-            // For sorting purposes two nulls are equal.
-            if (b1[s1] != 0 && b2[s2] != 0) rc = 0;
+            // Two nulls are equal if indices are same
+            if (b1[s1] != 0 && b2[s2] != 0) {
+                rc = b1[s1 + 1] - b2[s2 + 1];
+            }
             else if (b1[s1] != 0) rc = -1;
             else rc = 1;
         }
@@ -84,6 +85,7 @@ public class PigLongRawComparator extend
         return rc;
     }
 
+    @Override
     public int compare(Object o1, Object o2) {
         NullableLongWritable nlw1 = (NullableLongWritable)o1;
         NullableLongWritable nlw2 = (NullableLongWritable)o2;
@@ -93,8 +95,10 @@ public class PigLongRawComparator extend
         if (!nlw1.isNull() && !nlw2.isNull()) {
             rc = 
((Long)nlw1.getValueAsPigType()).compareTo((Long)nlw2.getValueAsPigType());
         } else {
-            // For sorting purposes two nulls are equal.
-            if (nlw1.isNull() && nlw2.isNull()) rc = 0;
+            // Two nulls are equal if indices are same
+            if (nlw1.isNull() && nlw2.isNull()) {
+                rc = nlw1.getIndex() - nlw2.getIndex();
+            }
             else if (nlw1.isNull()) rc = -1;
             else rc = 1;
         }

Modified: 
pig/branches/spark/src/org/apache/pig/backend/hadoop/executionengine/mapReduceLayer/PigOutputFormat.java
URL: 
http://svn.apache.org/viewvc/pig/branches/spark/src/org/apache/pig/backend/hadoop/executionengine/mapReduceLayer/PigOutputFormat.java?rev=1733627&r1=1733626&r2=1733627&view=diff
==============================================================================
--- 
pig/branches/spark/src/org/apache/pig/backend/hadoop/executionengine/mapReduceLayer/PigOutputFormat.java
 (original)
+++ 
pig/branches/spark/src/org/apache/pig/backend/hadoop/executionengine/mapReduceLayer/PigOutputFormat.java
 Fri Mar  4 18:17:39 2016
@@ -34,6 +34,7 @@ import org.apache.pig.OverwritableStoreF
 import org.apache.pig.StoreFuncInterface;
 import org.apache.pig.backend.hadoop.datastorage.ConfigurationUtil;
 import 
org.apache.pig.backend.hadoop.executionengine.physicalLayer.relationalOperators.POStore;
+import 
org.apache.pig.backend.hadoop.executionengine.physicalLayer.relationalOperators.StoreFuncDecorator;
 import org.apache.pig.backend.hadoop.executionengine.shims.HadoopShims;
 import org.apache.pig.backend.hadoop.executionengine.util.MapRedUtil;
 import org.apache.pig.data.Tuple;
@@ -74,13 +75,14 @@ public class PigOutputFormat extends Out
                 store = reduceStores.get(0);
             }
             StoreFuncInterface sFunc = store.getStoreFunc();
+            StoreFuncDecorator decorator = store.getStoreFuncDecorator();
             // set output location
             PigOutputFormat.setLocation(taskattemptcontext, store);
             // The above call should have update the conf in the JobContext
             // to have the output location - now call checkOutputSpecs()
             RecordWriter writer = sFunc.getOutputFormat().getRecordWriter(
                     taskattemptcontext);
-            return new PigRecordWriter(writer, sFunc, Mode.SINGLE_STORE);
+            return new PigRecordWriter(writer, decorator, Mode.SINGLE_STORE);
         } else {
            // multi store case - in this case, all writing is done through
            // MapReducePOStoreImpl - set up a dummy RecordWriter
@@ -107,18 +109,24 @@ public class PigOutputFormat extends Out
         private StoreFuncInterface sFunc;
 
         /**
+         * The StoreFuncDecorator we use to write Tuples
+         */
+        private StoreFuncDecorator storeDecorator;
+
+        /**
          * Single Query or multi query
          */
         private Mode mode;
 
-        public PigRecordWriter(RecordWriter wrappedWriter, StoreFuncInterface 
sFunc,
+        public PigRecordWriter(RecordWriter wrappedWriter, StoreFuncDecorator 
storeDecorator,
                 Mode mode)
                 throws IOException {
             this.mode = mode;
 
             if(mode == Mode.SINGLE_STORE) {
                 this.wrappedWriter = wrappedWriter;
-                this.sFunc = sFunc;
+                this.sFunc = storeDecorator.getStorer();
+                this.storeDecorator = storeDecorator;
                 this.sFunc.prepareToWrite(this.wrappedWriter);
             }
         }
@@ -133,7 +141,7 @@ public class PigOutputFormat extends Out
         public void write(WritableComparable key, Tuple value)
                 throws IOException, InterruptedException {
             if(mode == Mode.SINGLE_STORE) {
-                sFunc.putNext(value);
+                storeDecorator.putNext(value);
             } else {
                 throw new IOException("Internal Error: Unexpected code path");
             }

Modified: 
pig/branches/spark/src/org/apache/pig/backend/hadoop/executionengine/mapReduceLayer/PigSecondaryKeyComparator.java
URL: 
http://svn.apache.org/viewvc/pig/branches/spark/src/org/apache/pig/backend/hadoop/executionengine/mapReduceLayer/PigSecondaryKeyComparator.java?rev=1733627&r1=1733626&r2=1733627&view=diff
==============================================================================
--- 
pig/branches/spark/src/org/apache/pig/backend/hadoop/executionengine/mapReduceLayer/PigSecondaryKeyComparator.java
 (original)
+++ 
pig/branches/spark/src/org/apache/pig/backend/hadoop/executionengine/mapReduceLayer/PigSecondaryKeyComparator.java
 Fri Mar  4 18:17:39 2016
@@ -17,19 +17,18 @@
  */
 package org.apache.pig.backend.hadoop.executionengine.mapReduceLayer;
 
-import org.apache.commons.logging.Log;
-import org.apache.commons.logging.LogFactory;
 import org.apache.hadoop.conf.Configurable;
 import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.io.WritableComparable;
 import org.apache.hadoop.io.WritableComparator;
-import org.apache.hadoop.mapred.JobConf;
+import org.apache.pig.data.Tuple;
 import org.apache.pig.data.TupleFactory;
 import org.apache.pig.data.TupleRawComparator;
 import org.apache.pig.impl.io.NullableTuple;
 import org.apache.pig.impl.io.PigNullableWritable;
 
 public class PigSecondaryKeyComparator extends WritableComparator implements 
Configurable {
-    private final Log mLog = LogFactory.getLog(getClass());
+
     private TupleRawComparator mComparator=null;
 
     @Override
@@ -54,6 +53,7 @@ public class PigSecondaryKeyComparator e
         return null;
     }
 
+    @Override
     public int compare(byte[] b1, int s1, int l1, byte[] b2, int s2, int l2) {
 
         // the last byte of a NullableTuple is its Index
@@ -84,6 +84,47 @@ public class PigSecondaryKeyComparator e
                 rc = -1;
             else
                 rc = 1;
+        }
+        return rc;
+    }
+
+    @SuppressWarnings("unchecked")
+    @Override
+    public int compare(WritableComparable a, WritableComparable b)
+    {
+        PigNullableWritable wa = (PigNullableWritable)a;
+        PigNullableWritable wb = (PigNullableWritable)b;
+
+        if ((wa.getIndex() & PigNullableWritable.mqFlag) != 0) { // this is a 
multi-query index
+            if ((wa.getIndex() & PigNullableWritable.idxSpace) < 
(wb.getIndex() & PigNullableWritable.idxSpace))
+                return -1;
+            else if ((wa.getIndex() & PigNullableWritable.idxSpace) > 
(wb.getIndex() & PigNullableWritable.idxSpace))
+                return 1;
+            // If equal, we fall through
+        }
+
+        int rc = 0;
+        // If either are null, handle differently.
+        if (!wa.isNull() && !wb.isNull()) {
+            rc = mComparator.compare((Tuple) wa.getValueAsPigType(), (Tuple) 
wb.getValueAsPigType());
+            // handle PIG-927
+            // if tuples are equal but any field inside tuple is null, then we 
do not merge keys
+            if (rc == 0 && mComparator.hasComparedTupleNull())
+                rc = (wa.getIndex() & PigNullableWritable.idxSpace) - 
(wb.getIndex() & PigNullableWritable.idxSpace);
+        } else {
+            // Two nulls are equal if indices are same
+            if (wa.isNull() && wb.isNull()) {
+                if ((wa.getIndex() & PigNullableWritable.idxSpace) < 
(wb.getIndex() & PigNullableWritable.idxSpace))
+                    rc = -1;
+                else if ((wa.getIndex() & PigNullableWritable.idxSpace) > 
(wb.getIndex() & PigNullableWritable.idxSpace))
+                    rc = 1;
+                else
+                    rc = 0;
+            }
+            else if (wa.isNull())
+                rc = -1;
+            else
+                rc = 1;
         }
         return rc;
     }

Modified: 
pig/branches/spark/src/org/apache/pig/backend/hadoop/executionengine/mapReduceLayer/PigSplit.java
URL: 
http://svn.apache.org/viewvc/pig/branches/spark/src/org/apache/pig/backend/hadoop/executionengine/mapReduceLayer/PigSplit.java?rev=1733627&r1=1733626&r2=1733627&view=diff
==============================================================================
--- 
pig/branches/spark/src/org/apache/pig/backend/hadoop/executionengine/mapReduceLayer/PigSplit.java
 (original)
+++ 
pig/branches/spark/src/org/apache/pig/backend/hadoop/executionengine/mapReduceLayer/PigSplit.java
 Fri Mar  4 18:17:39 2016
@@ -20,7 +20,9 @@ package org.apache.pig.backend.hadoop.ex
 import java.io.ByteArrayInputStream;
 import java.io.ByteArrayOutputStream;
 import java.io.DataInput;
+import java.io.DataInputStream;
 import java.io.DataOutput;
+import java.io.DataOutputStream;
 import java.io.IOException;
 import java.io.InputStream;
 import java.io.ObjectInputStream;
@@ -30,13 +32,15 @@ import java.io.Serializable;
 import java.util.ArrayList;
 import java.util.Arrays;
 import java.util.Comparator;
-import java.util.List;
-import java.util.HashSet;
 import java.util.HashMap;
+import java.util.HashSet;
+import java.util.List;
 import java.util.Map;
-import java.util.Set;
 import java.util.Map.Entry;
-import java.lang.StringBuilder;
+import java.util.Set;
+import java.util.zip.Deflater;
+import java.util.zip.DeflaterOutputStream;
+import java.util.zip.InflaterInputStream;
 
 import org.apache.hadoop.conf.Configurable;
 import org.apache.hadoop.conf.Configuration;
@@ -46,7 +50,10 @@ import org.apache.hadoop.io.serializer.S
 import org.apache.hadoop.io.serializer.Serializer;
 import org.apache.hadoop.mapreduce.InputFormat;
 import org.apache.hadoop.mapreduce.InputSplit;
+import org.apache.hadoop.mapreduce.lib.input.FileSplit;
 import org.apache.hadoop.util.ReflectionUtils;
+import org.apache.pig.PigConfiguration;
+import org.apache.pig.data.WritableByteArray;
 import org.apache.pig.impl.plan.OperatorKey;
 
 /**
@@ -59,19 +66,22 @@ import org.apache.pig.impl.plan.Operator
  * wrapped InputSplit.
  */
 public class PigSplit extends InputSplit implements Writable, Configurable {
+
+
+    private static String FILESPLIT_CLASSNAME = FileSplit.class.getName();
     //The operators to which the tuples from this
     //input file are attached. These are the successors
     //of the load operator representing this input
     private ArrayList<OperatorKey> targetOps;
 
     // index starting from 0 representing the input number
-    // So if we have 3 inputs (say for a 3 way join), then the 
+    // So if we have 3 inputs (say for a 3 way join), then the
     // splits corresponding to the first input will have an index of 0, those
     // corresponding to the second will have an index of 1 and so on
     // This will be used to get the LoadFunc corresponding to the input
     // in PigInputFormat and related code.
     private int inputIndex;
-    
+
     // The real InputSplit this split is wrapping
     private InputSplit[] wrappedSplits;
 
@@ -80,36 +90,36 @@ public class PigSplit extends InputSplit
     // This will be used by MergeJoinIndexer to record the split # in the
     // index
     private int splitIndex;
-    
+
     // index of current splits being process
     private int currentIdx;
-    
+
     // the flag indicates this is a multi-input join (i.e. join)
-    // so that custom Hadoop counters will be created in the 
+    // so that custom Hadoop counters will be created in the
     // back-end to track the number of records for each input.
     private boolean isMultiInputs = false;
-    
+
     // the flag indicates the custom Hadoop counter should be disabled.
     // This is to prevent the number of counters exceeding the limit.
     // This flag is controlled by Pig property "pig.disable.counter" (
     // the default value is 'false').
     private boolean disableCounter = false;
-    
+
     /**
      * the job Configuration
      */
     private Configuration conf;
-    
+
     /**
      * total number of splits - required by skew join
      */
     private int totalSplits;
-    
+
     /**
      * total length
      */
     private long length = -1;
-    
+
     /**
      * overall locations
      */
@@ -118,8 +128,8 @@ public class PigSplit extends InputSplit
     // this seems necessary for Hadoop to instatiate this split on the
     // backend
     public PigSplit() {}
-    
-    public PigSplit(InputSplit[] wrappedSplits, int inputIndex, 
+
+    public PigSplit(InputSplit[] wrappedSplits, int inputIndex,
             List<OperatorKey> targetOps, int splitIndex) {
         this.wrappedSplits = wrappedSplits;
         this.inputIndex = inputIndex;
@@ -127,30 +137,30 @@ public class PigSplit extends InputSplit
         this.splitIndex = splitIndex;
         this.currentIdx = 0;
     }
-    
+
     public List<OperatorKey> getTargetOps() {
         return new ArrayList<OperatorKey>(targetOps);
     }
-    
+
 
     /**
-     * This methods returns the actual InputSplit (as returned by the 
+     * This methods returns the actual InputSplit (as returned by the
      * {@link InputFormat}) which this class is wrapping.
      * @return the wrappedSplit
      */
     public InputSplit getWrappedSplit() {
         return wrappedSplits[currentIdx];
     }
-    
+
     /**
-     * 
+     *
      * @param idx the index into the wrapped splits
      * @return the specified wrapped split
      */
     public InputSplit getWrappedSplit(int idx) {
         return wrappedSplits[idx];
     }
-    
+
     @Override
     @SuppressWarnings("unchecked")
     public String[] getLocations() throws IOException, InterruptedException {
@@ -200,7 +210,7 @@ public class PigSplit extends InputSplit
         }
         return length;
     }
-    
+
     /**
      * Return the length of a wrapped split
      * @param idx the index into the wrapped splits
@@ -210,6 +220,7 @@ public class PigSplit extends InputSplit
         return wrappedSplits[idx].getLength();
     }
 
+    @Override
     @SuppressWarnings("unchecked")
     public void readFields(DataInput is) throws IOException {
         disableCounter = is.readBoolean();
@@ -220,32 +231,59 @@ public class PigSplit extends InputSplit
         targetOps = (ArrayList<OperatorKey>) readObject(is);
         int splitLen = is.readInt();
         int distinctSplitClassCount = is.readInt();
+        boolean nonFileSplit = false;
         //construct the input split class name list
+
         String[] distinctSplitClassName = new String[distinctSplitClassCount];
         for (int i = 0; i < distinctSplitClassCount; i++) {
             distinctSplitClassName[i] = is.readUTF();
+            if (!distinctSplitClassName[i].equals(FILESPLIT_CLASSNAME)) {
+                nonFileSplit = true;
+            }
         }
         try {
             SerializationFactory sf = new SerializationFactory(conf);
             // The correct call sequence for Deserializer is, we shall open, 
then deserialize, but we shall not close
             wrappedSplits = new InputSplit[splitLen];
+
+            if (splitLen <= 0) {
+                return;
+            }
+
+            // Do not compress if everything is FileSplit as it does not 
compress much
+            // but adds few seconds for 30K+ tasks
+            boolean compress = nonFileSplit && conf.getBoolean(
+                    PigConfiguration.PIG_COMPRESS_INPUT_SPLITS,
+                    PigConfiguration.PIG_COMPRESS_INPUT_SPLITS_DEFAULT);
+            DataInputStream dis = null;
+            if (compress) {
+                int numBytes = is.readInt();
+                byte[] buf = new byte[numBytes];
+                is.readFully(buf, 0, numBytes);
+                dis = new DataInputStream(new InflaterInputStream(new 
ByteArrayInputStream(buf)));
+            }
+            DataInput dataIn = compress ? dis : is;
             for (int i = 0; i < splitLen; i++)
             {
                 //read the className index
-                int index = is.readInt();
+                int index = dataIn.readInt();
                 //get the split class name
                 String splitClassName = distinctSplitClassName[index];
                 Class splitClass = conf.getClassByName(splitClassName);
                 Deserializer d = sf.getDeserializer(splitClass);
-                d.open((InputStream) is);
+                d.open((InputStream) dataIn);
                 wrappedSplits[i] = 
(InputSplit)ReflectionUtils.newInstance(splitClass, conf);
                 d.deserialize(wrappedSplits[i]);
             }
+            if (compress && splitLen > 0) {
+                dis.close();
+            }
         } catch (ClassNotFoundException e) {
             throw new IOException(e);
         }
     }
 
+    @Override
     @SuppressWarnings("unchecked")
     public void write(DataOutput os) throws IOException {
         os.writeBoolean(disableCounter);
@@ -262,6 +300,7 @@ public class PigSplit extends InputSplit
         }
         List<String> distinctSplitClassList = new ArrayList<String>();
         distinctSplitClassList.addAll(splitClassNameSet);
+        boolean nonFileSplit = distinctSplitClassList.size() > 1 || 
(!distinctSplitClassList.contains(FILESPLIT_CLASSNAME));
         //write the distinct number of split class name
         os.writeInt(distinctSplitClassList.size());
         //write each classname once
@@ -270,20 +309,43 @@ public class PigSplit extends InputSplit
         }
         SerializationFactory sf = new SerializationFactory(conf);
 
+        if (wrappedSplits.length <= 0) {
+            return;
+        }
+
+        boolean compress = nonFileSplit && conf.getBoolean(
+                PigConfiguration.PIG_COMPRESS_INPUT_SPLITS,
+                PigConfiguration.PIG_COMPRESS_INPUT_SPLITS_DEFAULT);
+        WritableByteArray byteStream = null;
+        Deflater deflater = null;
+        DataOutputStream dos = null;
+        if (compress) {
+            byteStream = new WritableByteArray(16384);
+            deflater = new Deflater(Deflater.BEST_COMPRESSION);
+            dos = new DataOutputStream(new DeflaterOutputStream(byteStream, 
deflater));
+        }
+        DataOutput dataOut = compress ? dos : os;
         for (int i = 0; i < wrappedSplits.length; i++)
         {
             //find out the index of the split class name
             int index = 
distinctSplitClassList.indexOf(wrappedSplits[i].getClass().getName());
-            os.writeInt(index);
+            dataOut.writeInt(index);
             Serializer s = sf.getSerializer(wrappedSplits[i].getClass());
             //Checks if Serializer is NULL or not before calling open() method 
on it.
             if (s == null) {
                 throw new IllegalArgumentException("Could not find Serializer 
for class "+wrappedSplits[i].getClass()+". InputSplits must implement 
Writable.");
             }
-            s.open((OutputStream) os);
+            s.open((OutputStream) dataOut);
             // The correct call sequence for Serializer is, we shall open, 
then serialize, but we shall not close
             s.serialize(wrappedSplits[i]);
         }
+        if (compress) {
+            //Get the compressed serialized bytes and write them
+            dos.close();
+            os.writeInt(byteStream.getLength());
+            os.write(byteStream.getData(), 0, byteStream.getLength());
+            deflater.end();
+        }
 
     }
 
@@ -292,6 +354,7 @@ public class PigSplit extends InputSplit
         ByteArrayOutputStream baos = new ByteArrayOutputStream();
         ObjectOutputStream oos = new ObjectOutputStream(baos);
         oos.writeObject(obj);
+        oos.flush();
         byte[] bytes = baos.toByteArray();
         os.writeInt(bytes.length);
         os.write(bytes);
@@ -323,7 +386,7 @@ public class PigSplit extends InputSplit
     public void setMultiInputs(boolean b) {
         isMultiInputs = b;
     }
-    
+
     /**
      * Returns true if the map has multiple inputs, else false
      * @return true if the map has multiple inputs, else false
@@ -331,7 +394,7 @@ public class PigSplit extends InputSplit
     public boolean isMultiInputs() {
         return isMultiInputs;
     }
-    
+
     @Override
     public Configuration getConf() {
         return conf;
@@ -340,20 +403,20 @@ public class PigSplit extends InputSplit
 
     /** (non-Javadoc)
      * @see 
org.apache.hadoop.conf.Configurable#setConf(org.apache.hadoop.conf.Configuration)
-     * 
-     * This will be called by 
+     *
+     * This will be called by
      * {@link PigInputFormat#getSplits(org.apache.hadoop.mapreduce.JobContext)}
-     * to be used in {@link #write(DataOutput)} for serializing the 
+     * to be used in {@link #write(DataOutput)} for serializing the
      * wrappedSplit
-     * 
-     * This will be called by Hadoop in the backend to set the right Job 
+     *
+     * This will be called by Hadoop in the backend to set the right Job
      * Configuration (hadoop will invoke this method because PigSplit 
implements
      * {@link Configurable} - we need this Configuration in readFields() to
-     * deserialize the wrappedSplit 
+     * deserialize the wrappedSplit
      */
     @Override
     public void setConf(Configuration conf) {
-        this.conf = conf;        
+        this.conf = conf;
     }
 
     // package level access because we don't want LoadFunc implementations
@@ -362,9 +425,9 @@ public class PigSplit extends InputSplit
     int getInputIndex() {
         return inputIndex;
     }
-    
+
     /**
-     * 
+     *
      * @return the number of wrapped splits
      */
     public int getNumPaths() {
@@ -402,7 +465,7 @@ public class PigSplit extends InputSplit
                     wrappedSplits[i].getClass().getName() + "\n   
Locations:\n");
                 for (String location :  wrappedSplits[i].getLocations())
                     st.append("    "+location+"\n");
-                st.append("\n-----------------------\n"); 
+                st.append("\n-----------------------\n");
           }
         } catch (IOException e) {
           return null;
@@ -419,7 +482,7 @@ public class PigSplit extends InputSplit
     public boolean disableCounter() {
         return disableCounter;
     }
-    
+
     public void setCurrentIdx(int idx) {
         this.currentIdx = idx;
     }

Modified: 
pig/branches/spark/src/org/apache/pig/backend/hadoop/executionengine/mapReduceLayer/PigTextRawComparator.java
URL: 
http://svn.apache.org/viewvc/pig/branches/spark/src/org/apache/pig/backend/hadoop/executionengine/mapReduceLayer/PigTextRawComparator.java?rev=1733627&r1=1733626&r2=1733627&view=diff
==============================================================================
--- 
pig/branches/spark/src/org/apache/pig/backend/hadoop/executionengine/mapReduceLayer/PigTextRawComparator.java
 (original)
+++ 
pig/branches/spark/src/org/apache/pig/backend/hadoop/executionengine/mapReduceLayer/PigTextRawComparator.java
 Fri Mar  4 18:17:39 2016
@@ -21,13 +21,10 @@ import java.io.IOException;
 
 import org.apache.commons.logging.Log;
 import org.apache.commons.logging.LogFactory;
-
 import org.apache.hadoop.conf.Configurable;
 import org.apache.hadoop.conf.Configuration;
 import org.apache.hadoop.io.Text;
 import org.apache.hadoop.io.WritableComparator;
-import org.apache.hadoop.mapred.JobConf;
-
 import org.apache.pig.impl.io.NullableText;
 import org.apache.pig.impl.util.ObjectSerializer;
 
@@ -43,6 +40,7 @@ public class PigTextRawComparator extend
     }
 
 
+    @Override
     public void setConf(Configuration conf) {
         try {
             mAsc = (boolean[])ObjectSerializer.deserialize(conf.get(
@@ -57,6 +55,7 @@ public class PigTextRawComparator extend
         }
     }
 
+    @Override
     public Configuration getConf() {
         return null;
     }
@@ -66,6 +65,7 @@ public class PigTextRawComparator extend
      * then IntWritable.compare() is used.  If both are null then the indices
      * are compared.  Otherwise the null one is defined to be less.
      */
+    @Override
     public int compare(byte[] b1, int s1, int l1, byte[] b2, int s2, int l2) {
         int rc = 0;
 
@@ -73,8 +73,10 @@ public class PigTextRawComparator extend
         if (b1[s1] == 0 && b2[s2] == 0) {
             rc = mWrappedComp.compare(b1, s1 + 1, l1 - 2, b2, s2 + 1, l2 - 2);
         } else {
-            // For sorting purposes two nulls are equal.
-            if (b1[s1] != 0 && b2[s2] != 0) rc = 0;
+            // Two nulls are equal if indices are same
+            if (b1[s1] != 0 && b2[s2] != 0) {
+                rc = b1[s1 + 1] - b2[s2 + 1];
+            }
             else if (b1[s1] != 0) rc = -1;
             else rc = 1;
         }
@@ -82,6 +84,7 @@ public class PigTextRawComparator extend
         return rc;
     }
 
+    @Override
     public int compare(Object o1, Object o2) {
         NullableText nt1 = (NullableText)o1;
         NullableText nt2 = (NullableText)o2;
@@ -91,8 +94,10 @@ public class PigTextRawComparator extend
         if (!nt1.isNull() && !nt2.isNull()) {
             rc = 
((String)nt1.getValueAsPigType()).compareTo((String)nt2.getValueAsPigType());
         } else {
-            // For sorting purposes two nulls are equal.
-            if (nt1.isNull() && nt2.isNull()) rc = 0;
+            // Two nulls are equal if indices are same
+            if (nt1.isNull() && nt2.isNull()) {
+                rc = nt1.getIndex() - nt2.getIndex();
+            }
             else if (nt1.isNull()) rc = -1;
             else rc = 1;
         }



Reply via email to