Modified: pig/branches/spark/src/org/apache/pig/backend/hadoop/executionengine/Launcher.java URL: http://svn.apache.org/viewvc/pig/branches/spark/src/org/apache/pig/backend/hadoop/executionengine/Launcher.java?rev=1733627&r1=1733626&r2=1733627&view=diff ============================================================================== --- pig/branches/spark/src/org/apache/pig/backend/hadoop/executionengine/Launcher.java (original) +++ pig/branches/spark/src/org/apache/pig/backend/hadoop/executionengine/Launcher.java Fri Mar 4 18:17:39 2016 @@ -288,7 +288,7 @@ public abstract class Launcher { try { jobControlException = getExceptionFromString(jobControlExceptionStackTrace); } catch (Exception e) { - String errMsg = "Could not resolve error that occured when launching job: " + String errMsg = "Could not resolve error that occurred when launching job: " + jobControlExceptionStackTrace; jobControlException = new RuntimeException(errMsg, throwable); }
Modified: pig/branches/spark/src/org/apache/pig/backend/hadoop/executionengine/fetch/FetchOptimizer.java URL: http://svn.apache.org/viewvc/pig/branches/spark/src/org/apache/pig/backend/hadoop/executionengine/fetch/FetchOptimizer.java?rev=1733627&r1=1733626&r2=1733627&view=diff ============================================================================== --- pig/branches/spark/src/org/apache/pig/backend/hadoop/executionengine/fetch/FetchOptimizer.java (original) +++ pig/branches/spark/src/org/apache/pig/backend/hadoop/executionengine/fetch/FetchOptimizer.java Fri Mar 4 18:17:39 2016 @@ -176,7 +176,6 @@ public class FetchOptimizer { @Override public void visit() throws VisitorException { - new PhyPlanSetter(mPlan).visit(); super.visit(); } Modified: pig/branches/spark/src/org/apache/pig/backend/hadoop/executionengine/mapReduceLayer/InputSizeReducerEstimator.java URL: http://svn.apache.org/viewvc/pig/branches/spark/src/org/apache/pig/backend/hadoop/executionengine/mapReduceLayer/InputSizeReducerEstimator.java?rev=1733627&r1=1733626&r2=1733627&view=diff ============================================================================== --- pig/branches/spark/src/org/apache/pig/backend/hadoop/executionengine/mapReduceLayer/InputSizeReducerEstimator.java (original) +++ pig/branches/spark/src/org/apache/pig/backend/hadoop/executionengine/mapReduceLayer/InputSizeReducerEstimator.java Fri Mar 4 18:17:39 2016 @@ -137,11 +137,11 @@ public class InputSizeReducerEstimator i } } else { // If file is not found, we should report -1 - return -1; + continue; } } else { // If we cannot estimate size of a location, we should report -1 - return -1; + continue; } } } Modified: pig/branches/spark/src/org/apache/pig/backend/hadoop/executionengine/mapReduceLayer/JobControlCompiler.java URL: http://svn.apache.org/viewvc/pig/branches/spark/src/org/apache/pig/backend/hadoop/executionengine/mapReduceLayer/JobControlCompiler.java?rev=1733627&r1=1733626&r2=1733627&view=diff ============================================================================== --- pig/branches/spark/src/org/apache/pig/backend/hadoop/executionengine/mapReduceLayer/JobControlCompiler.java (original) +++ pig/branches/spark/src/org/apache/pig/backend/hadoop/executionengine/mapReduceLayer/JobControlCompiler.java Fri Mar 4 18:17:39 2016 @@ -38,6 +38,7 @@ import java.util.List; import java.util.Map; import java.util.Map.Entry; import java.util.Properties; +import java.util.TreeMap; import java.util.regex.Matcher; import java.util.regex.Pattern; @@ -66,14 +67,12 @@ import org.apache.pig.FuncSpec; import org.apache.pig.LoadFunc; import org.apache.pig.OverwritableStoreFunc; import org.apache.pig.PigConfiguration; -import org.apache.pig.PigConstants; import org.apache.pig.PigException; import org.apache.pig.StoreFuncInterface; import org.apache.pig.backend.executionengine.ExecException; import org.apache.pig.backend.hadoop.HDataType; import org.apache.pig.backend.hadoop.datastorage.ConfigurationUtil; import org.apache.pig.backend.hadoop.executionengine.JobCreationException; -import org.apache.pig.backend.hadoop.executionengine.mapReduceLayer.partitioners.RollupHIIPartitioner; import org.apache.pig.backend.hadoop.executionengine.mapReduceLayer.partitioners.SecondaryKeyPartitioner; import org.apache.pig.backend.hadoop.executionengine.mapReduceLayer.partitioners.SkewedPartitioner; import org.apache.pig.backend.hadoop.executionengine.mapReduceLayer.partitioners.WeightedRangePartitioner; @@ -185,13 +184,10 @@ public class JobControlCompiler{ public static final String PIG_MAP_STORES = "pig.map.stores"; public static final String PIG_REDUCE_STORES = "pig.reduce.stores"; - private static final String ROLLUP_PARTITIONER = RollupHIIPartitioner.class.getName(); - // A mapping of job to pair of store locations and tmp locations for that job private Map<Job, Pair<List<POStore>, Path>> jobStoreMap; private Map<Job, MapReduceOper> jobMroMap; - private int counterSize; public JobControlCompiler(PigContext pigContext, Configuration conf) { this(pigContext, conf, null); @@ -360,7 +356,7 @@ public class JobControlCompiler{ { MapReduceOper mro = jobMroMap.get(job); if (!pigContext.inIllustrator && mro.isCounterOperation()) - saveCounters(job,mro.getOperationID()); + saveCounters(job,mro.getOperationID(), mro.isRowNumber()); plan.remove(mro); } } @@ -378,10 +374,11 @@ public class JobControlCompiler{ * these values are passed via configuration file to PORank, by using the unique * operation identifier */ - private void saveCounters(Job job, String operationID) { + private void saveCounters(Job job, String operationID, boolean isRowNumber ) { Counters counters; Group groupCounters; + int counterSize = -1; Long previousValue = 0L; Long previousSum = 0L; ArrayList<Pair<String,Long>> counterPairs; @@ -407,24 +404,28 @@ public class JobControlCompiler{ } groupCounters = counters.getGroup(groupName); - Iterator<Counter> it = groupCounters.iterator(); - HashMap<Integer,Long> counterList = new HashMap<Integer, Long>(); + TreeMap<Integer,Long> counterList = new TreeMap<Integer, Long>(); - while(it.hasNext()) { - try{ + Iterator<Counter> it = groupCounters.iterator(); + while (it.hasNext()) { + try { Counter c = it.next(); counterList.put(Integer.valueOf(c.getDisplayName()), c.getValue()); } catch (Exception ex) { ex.printStackTrace(); } } + counterSize = counterList.size(); counterPairs = new ArrayList<Pair<String,Long>>(); - for(int i = 0; i < counterSize; i++){ + // There could be empty tasks with no counters. That is not an issue + // and we only need to calculate offsets for non-empty task ids + // which will be accessed in PORank. + for (Entry<Integer, Long> entry : counterList.entrySet()) { previousSum += previousValue; - previousValue = counterList.get(Integer.valueOf(i)); - counterPairs.add(new Pair<String, Long>(JobControlCompiler.PIG_MAP_COUNTER + operationID + JobControlCompiler.PIG_MAP_SEPARATOR + i, previousSum)); + previousValue = entry.getValue(); + counterPairs.add(new Pair<String, Long>(JobControlCompiler.PIG_MAP_COUNTER + operationID + JobControlCompiler.PIG_MAP_SEPARATOR + entry.getKey(), previousSum)); } globalCounters.put(operationID, counterPairs); @@ -528,9 +529,6 @@ public class JobControlCompiler{ configureCompression(conf); try{ - //Set default value for PIG_HII_ROLLUP_OPTIMIZABLE to false - conf.setBoolean(PigConstants.PIG_HII_ROLLUP_OPTIMIZABLE, false); - //Process the POLoads List<POLoad> lds = PlanHelper.getPhysicalOperators(mro.mapPlan, POLoad.class); @@ -646,7 +644,11 @@ public class JobControlCompiler{ } } if (!predeployed) { - putJarOnClassPathThroughDistributedCache(pigContext, conf, jar); + if (jar.getFile().toLowerCase().endsWith(".jar")) { + putJarOnClassPathThroughDistributedCache(pigContext, conf, jar); + } else { + setupDistributedCache(pigContext, conf, new String[] {jar.getPath()}, true); + } } } @@ -691,10 +693,15 @@ public class JobControlCompiler{ if(Utils.isLocal(pigContext, conf)) { ConfigurationUtil.replaceConfigForLocalMode(conf); } - conf.set("pig.inputs", ObjectSerializer.serialize(inp)); - conf.set("pig.inpTargets", ObjectSerializer.serialize(inpTargets)); - conf.set("pig.inpSignatures", ObjectSerializer.serialize(inpSignatureLists)); - conf.set("pig.inpLimits", ObjectSerializer.serialize(inpLimits)); + conf.set(PigInputFormat.PIG_INPUTS, ObjectSerializer.serialize(inp)); + conf.set(PigInputFormat.PIG_INPUT_TARGETS, ObjectSerializer.serialize(inpTargets)); + conf.set(PigInputFormat.PIG_INPUT_SIGNATURES, ObjectSerializer.serialize(inpSignatureLists)); + conf.set(PigInputFormat.PIG_INPUT_LIMITS, ObjectSerializer.serialize(inpLimits)); + + // Removing job credential entry before serializing pigcontext into jobconf + // since this path would be invalid for the new job being created + pigContext.getProperties().remove("mapreduce.job.credentials.binary"); + conf.set("pig.pigContext", ObjectSerializer.serialize(pigContext)); conf.set("udf.import.list", ObjectSerializer.serialize(PigContext.getPackageImportList())); // this is for unit tests since some don't create PigServer @@ -807,6 +814,7 @@ public class JobControlCompiler{ // set parent plan in all operators in map and reduce plans // currently the parent plan is really used only when POStream is present in the plan new PhyPlanSetter(mro.mapPlan).visit(); + new PhyPlanSetter(mro.combinePlan).visit(); new PhyPlanSetter(mro.reducePlan).visit(); // this call modifies the ReplFiles names of POFRJoin operators @@ -844,51 +852,14 @@ public class JobControlCompiler{ } pack = (POPackage)mro.reducePlan.getRoots().get(0); - if(pack!=null) { - if(pack.getPivot()!=-1) { - //Set value for PIG_HII_ROLLUP_OPTIMIZABLE to true - conf.setBoolean(PigConstants.PIG_HII_ROLLUP_OPTIMIZABLE, true); - //Set the pivot value - conf.setInt(PigConstants.PIG_HII_ROLLUP_PIVOT, pack.getPivot()); - //Set the index of the first field involves in ROLLUP - conf.setInt(PigConstants.PIG_HII_ROLLUP_FIELD_INDEX, pack.getRollupFieldIndex()); - //Set the original index of the first field involves in ROLLUP in case it was moved to the end - //(if we have the combination of cube and rollup) - conf.setInt(PigConstants.PIG_HII_ROLLUP_OLD_FIELD_INDEX, pack.getRollupOldFieldIndex()); - //Set the size of total fields that involve in CUBE clause - conf.setInt(PigConstants.PIG_HII_NUMBER_TOTAL_FIELD, pack.getDimensionSize()); - //Set number of algebraic functions that used after rollup - conf.setInt(PigConstants.PIG_HII_NUMBER_ALGEBRAIC, pack.getNumberAlgebraic()); - //Set number of reducer to 1 due to using IRG algorithm - if(pack.getPivot() == 0 && !mro.reducePlan.isEmpty()) { - updateNumReducers(plan, mro, nwJob); - } - } - } - if (!pigContext.inIllustrator) { mro.reducePlan.remove(pack); } - - if (pack!=null && pack.getPivot()!=-1) { - nwJob.setMapperClass(PigMapReduce.MapRollupHII.class); - } else { - nwJob.setMapperClass(PigMapReduce.Map.class); - } - + nwJob.setMapperClass(PigMapReduce.Map.class); nwJob.setReducerClass(PigMapReduce.Reduce.class); - // Set Rollup Partitioner in case the pivot is not equal to -1 - // and the custormPartitioner name is our rollup partitioner. - if (mro.customPartitioner != null) { - if (mro.customPartitioner.equals(ROLLUP_PARTITIONER)) { - if (pack.getPivot()!=-1) { - nwJob.setPartitionerClass(PigContext.resolveClassName(mro.customPartitioner)); - } - } else { - nwJob.setPartitionerClass(PigContext.resolveClassName(mro.customPartitioner)); - } - } + if (mro.customPartitioner != null) + nwJob.setPartitionerClass(PigContext.resolveClassName(mro.customPartitioner)); if(!pigContext.inIllustrator) conf.set("pig.mapPlan", ObjectSerializer.serialize(mro.mapPlan)); @@ -1104,26 +1075,6 @@ public class JobControlCompiler{ } /** - * If pivot position is zero, we use only one reducer - * @param plan the MR plan - * @param mro the MR operator - * @param nwJob the current job - * @throws IOException - */ - public void updateNumReducers(MROperPlan plan, MapReduceOper mro, - org.apache.hadoop.mapreduce.Job nwJob) throws IOException { - // Change number of reducer to 1 if only IRG is used - if (mro.customPartitioner != null && mro.customPartitioner.equals(ROLLUP_PARTITIONER)) { - log.info("Changing Parallelism to 1 due to using IRG"); - } - conf.setInt("pig.info.reducers.default.parallel", 1); - conf.setInt("pig.info.reducers.requested.parallel", 1); - conf.setInt("pig.info.reducers.estimated.parallel", 1); - conf.setInt(MRConfiguration.REDUCE_TASKS, 1); - nwJob.setNumReduceTasks(1); - } - - /** * Calculate the runtime #reducers based on the default_parallel, requested parallel and estimated * parallel, and save it to MapReduceOper's runtimeParallelism. * @return the runtimeParallelism @@ -1762,7 +1713,7 @@ public class JobControlCompiler{ return null; } - private static Path getCacheStagingDir(Configuration conf) throws IOException { + public static Path getCacheStagingDir(Configuration conf) throws IOException { String pigTempDir = conf.get(PigConfiguration.PIG_USER_CACHE_LOCATION, conf.get(PigConfiguration.PIG_TEMP_DIR, "/tmp")); String currentUser = System.getProperty("user.name"); @@ -1773,7 +1724,7 @@ public class JobControlCompiler{ return stagingDir; } - private static Path getFromCache(PigContext pigContext, + public static Path getFromCache(PigContext pigContext, Configuration conf, URL url) throws IOException { InputStream is1 = null; @@ -1799,7 +1750,10 @@ public class JobControlCompiler{ // attempt to copy to cache else return null fs.mkdirs(cacheDir, FileLocalizer.OWNER_ONLY_PERMS); is2 = url.openStream(); - os = FileSystem.create(fs, cacheFile, FileLocalizer.OWNER_ONLY_PERMS); + short replication = (short)conf.getInt(PigConfiguration.PIG_USER_CACHE_REPLICATION, + conf.getInt("mapred.submit.replication", 10)); + os = fs.create(cacheFile, replication); + fs.setPermission(cacheFile, FileLocalizer.OWNER_ONLY_PERMS); IOUtils.copyBytes(is2, os, 4096, true); return cacheFile; Modified: pig/branches/spark/src/org/apache/pig/backend/hadoop/executionengine/mapReduceLayer/MRCompiler.java URL: http://svn.apache.org/viewvc/pig/branches/spark/src/org/apache/pig/backend/hadoop/executionengine/mapReduceLayer/MRCompiler.java?rev=1733627&r1=1733626&r2=1733627&view=diff ============================================================================== --- pig/branches/spark/src/org/apache/pig/backend/hadoop/executionengine/mapReduceLayer/MRCompiler.java (original) +++ pig/branches/spark/src/org/apache/pig/backend/hadoop/executionengine/mapReduceLayer/MRCompiler.java Fri Mar 4 18:17:39 2016 @@ -76,7 +76,6 @@ import org.apache.pig.backend.hadoop.exe import org.apache.pig.backend.hadoop.executionengine.physicalLayer.relationalOperators.POPackage; import org.apache.pig.backend.hadoop.executionengine.physicalLayer.relationalOperators.POPartitionRearrange; import org.apache.pig.backend.hadoop.executionengine.physicalLayer.relationalOperators.PORank; -import org.apache.pig.backend.hadoop.executionengine.physicalLayer.relationalOperators.PORollupHIIForEach; import org.apache.pig.backend.hadoop.executionengine.physicalLayer.relationalOperators.POSkewedJoin; import org.apache.pig.backend.hadoop.executionengine.physicalLayer.relationalOperators.POSort; import org.apache.pig.backend.hadoop.executionengine.physicalLayer.relationalOperators.POSplit; @@ -93,6 +92,7 @@ import org.apache.pig.impl.PigContext; import org.apache.pig.impl.builtin.DefaultIndexableLoader; import org.apache.pig.impl.builtin.FindQuantiles; import org.apache.pig.impl.builtin.GetMemNumRows; +import org.apache.pig.impl.builtin.IsFirstReduceOfKey; import org.apache.pig.impl.builtin.PartitionSkewedKeys; import org.apache.pig.impl.builtin.PoissonSampleLoader; import org.apache.pig.impl.builtin.RandomSampleLoader; @@ -1099,11 +1099,6 @@ public class MRCompiler extends PhyPlanV } @Override - public void visitPORollupHIIForEach(PORollupHIIForEach op) throws VisitorException { - visitPOForEach(op); - } - - @Override public void visitGlobalRearrange(POGlobalRearrange op) throws VisitorException{ try{ blocking(op); @@ -1950,8 +1945,13 @@ public class MRCompiler extends PhyPlanV ep.add(prj); eps.add(ep); if (!inner[i]) { - // Add an empty bag for outer join - CompilerUtils.addEmptyBagOuterJoin(ep, op.getSchema(i)); + // Add an empty bag for outer join. + if (i == 0) { + // For right outer, add IsFirstReduceOfKey UDF as well + CompilerUtils.addEmptyBagOuterJoin(ep, op.getSchema(i), true, IsFirstReduceOfKey.class.getName()); + } else { + CompilerUtils.addEmptyBagOuterJoin(ep, op.getSchema(i), false, IsFirstReduceOfKey.class.getName()); + } } flat.add(true); } Modified: pig/branches/spark/src/org/apache/pig/backend/hadoop/executionengine/mapReduceLayer/PigBigDecimalRawComparator.java URL: http://svn.apache.org/viewvc/pig/branches/spark/src/org/apache/pig/backend/hadoop/executionengine/mapReduceLayer/PigBigDecimalRawComparator.java?rev=1733627&r1=1733626&r2=1733627&view=diff ============================================================================== --- pig/branches/spark/src/org/apache/pig/backend/hadoop/executionengine/mapReduceLayer/PigBigDecimalRawComparator.java (original) +++ pig/branches/spark/src/org/apache/pig/backend/hadoop/executionengine/mapReduceLayer/PigBigDecimalRawComparator.java Fri Mar 4 18:17:39 2016 @@ -25,7 +25,6 @@ import org.apache.commons.logging.LogFac import org.apache.hadoop.conf.Configurable; import org.apache.hadoop.conf.Configuration; import org.apache.hadoop.io.WritableComparator; -import org.apache.hadoop.mapred.JobConf; import org.apache.pig.backend.hadoop.BigDecimalWritable; import org.apache.pig.impl.io.NullableBigDecimalWritable; import org.apache.pig.impl.util.ObjectSerializer; @@ -74,8 +73,10 @@ public class PigBigDecimalRawComparator if (b1[s1] == 0 && b2[s2] == 0) { rc = mWrappedComp.compare(b1, s1 + 1, l1 - 2, b2, s2 + 1, l2 - 2); } else { - // For sorting purposes two nulls are equal. - if (b1[s1] != 0 && b2[s2] != 0) rc = 0; + // Two nulls are equal if indices are same + if (b1[s1] != 0 && b2[s2] != 0) { + rc = b1[s1 + 1] - b2[s2 + 1]; + } else if (b1[s1] != 0) rc = -1; else rc = 1; } @@ -93,8 +94,10 @@ public class PigBigDecimalRawComparator if (!ndw1.isNull() && !ndw2.isNull()) { rc = ((BigDecimal)ndw1.getValueAsPigType()).compareTo((BigDecimal)ndw2.getValueAsPigType()); } else { - // For sorting purposes two nulls are equal. - if (ndw1.isNull() && ndw2.isNull()) rc = 0; + // Two nulls are equal if indices are same + if (ndw1.isNull() && ndw2.isNull()) { + rc = ndw1.getIndex() - ndw2.getIndex(); + } else if (ndw1.isNull()) rc = -1; else rc = 1; } Modified: pig/branches/spark/src/org/apache/pig/backend/hadoop/executionengine/mapReduceLayer/PigBigIntegerRawComparator.java URL: http://svn.apache.org/viewvc/pig/branches/spark/src/org/apache/pig/backend/hadoop/executionengine/mapReduceLayer/PigBigIntegerRawComparator.java?rev=1733627&r1=1733626&r2=1733627&view=diff ============================================================================== --- pig/branches/spark/src/org/apache/pig/backend/hadoop/executionengine/mapReduceLayer/PigBigIntegerRawComparator.java (original) +++ pig/branches/spark/src/org/apache/pig/backend/hadoop/executionengine/mapReduceLayer/PigBigIntegerRawComparator.java Fri Mar 4 18:17:39 2016 @@ -25,7 +25,6 @@ import org.apache.commons.logging.LogFac import org.apache.hadoop.conf.Configurable; import org.apache.hadoop.conf.Configuration; import org.apache.hadoop.io.WritableComparator; -import org.apache.hadoop.mapred.JobConf; import org.apache.pig.backend.hadoop.BigIntegerWritable; import org.apache.pig.impl.io.NullableBigIntegerWritable; import org.apache.pig.impl.util.ObjectSerializer; @@ -74,8 +73,10 @@ public class PigBigIntegerRawComparator if (b1[s1] == 0 && b2[s2] == 0) { rc = mWrappedComp.compare(b1, s1 + 1, l1 - 2, b2, s2 + 1, l2 - 2); } else { - // For sorting purposes two nulls are equal. - if (b1[s1] != 0 && b2[s2] != 0) rc = 0; + // Two nulls are equal if indices are same + if (b1[s1] != 0 && b2[s2] != 0) { + rc = b1[s1 + 1] - b2[s2 + 1]; + } else if (b1[s1] != 0) rc = -1; else rc = 1; } @@ -93,8 +94,10 @@ public class PigBigIntegerRawComparator if (!ndw1.isNull() && !ndw2.isNull()) { rc = ((BigInteger)ndw1.getValueAsPigType()).compareTo((BigInteger)ndw2.getValueAsPigType()); } else { - // For sorting purposes two nulls are equal. - if (ndw1.isNull() && ndw2.isNull()) rc = 0; + // Two nulls are equal if indices are same + if (ndw1.isNull() && ndw2.isNull()) { + rc = ndw1.getIndex() - ndw2.getIndex(); + } else if (ndw1.isNull()) rc = -1; else rc = 1; } Modified: pig/branches/spark/src/org/apache/pig/backend/hadoop/executionengine/mapReduceLayer/PigBooleanRawComparator.java URL: http://svn.apache.org/viewvc/pig/branches/spark/src/org/apache/pig/backend/hadoop/executionengine/mapReduceLayer/PigBooleanRawComparator.java?rev=1733627&r1=1733626&r2=1733627&view=diff ============================================================================== --- pig/branches/spark/src/org/apache/pig/backend/hadoop/executionengine/mapReduceLayer/PigBooleanRawComparator.java (original) +++ pig/branches/spark/src/org/apache/pig/backend/hadoop/executionengine/mapReduceLayer/PigBooleanRawComparator.java Fri Mar 4 18:17:39 2016 @@ -25,7 +25,6 @@ import org.apache.hadoop.conf.Configurab import org.apache.hadoop.conf.Configuration; import org.apache.hadoop.io.BooleanWritable; import org.apache.hadoop.io.WritableComparator; -import org.apache.hadoop.mapred.JobConf; import org.apache.pig.impl.io.NullableBooleanWritable; import org.apache.pig.impl.util.ObjectSerializer; @@ -39,6 +38,7 @@ public class PigBooleanRawComparator ext super(NullableBooleanWritable.class); mWrappedComp = new BooleanWritable.Comparator(); } + @Override public void setConf(Configuration conf) { try { mAsc = (boolean[])ObjectSerializer.deserialize(conf.get( @@ -54,6 +54,7 @@ public class PigBooleanRawComparator ext } } + @Override public Configuration getConf() { return null; } @@ -63,6 +64,7 @@ public class PigBooleanRawComparator ext * then BooleanWritable.compare() is used. If both are null then the indices * are compared. Otherwise the null one is defined to be less. */ + @Override public int compare(byte[] b1, int s1, int l1, byte[] b2, int s2, int l2) { int rc = 0; @@ -72,8 +74,10 @@ public class PigBooleanRawComparator ext byte byte2 = b2[s2 + 1]; rc = (byte1 < byte2) ? -1 : ((byte1 > byte2) ? 1 : 0); } else { - // For sorting purposes two nulls are equal. - if (b1[s1] != 0 && b2[s2] != 0) rc = 0; + // Two nulls are equal if indices are same + if (b1[s1] != 0 && b2[s2] != 0) { + rc = b1[s1 + 1] - b2[s2 + 1]; + } else if (b1[s1] != 0) rc = -1; else rc = 1; } @@ -81,6 +85,7 @@ public class PigBooleanRawComparator ext return rc; } + @Override public int compare(Object o1, Object o2) { NullableBooleanWritable nbw1 = (NullableBooleanWritable)o1; NullableBooleanWritable nbw2 = (NullableBooleanWritable)o2; @@ -90,8 +95,10 @@ public class PigBooleanRawComparator ext if (!nbw1.isNull() && !nbw2.isNull()) { rc = ((Boolean)nbw1.getValueAsPigType()).compareTo((Boolean)nbw2.getValueAsPigType()); } else { - // For sorting purposes two nulls are equal. - if (nbw1.isNull() && nbw2.isNull()) rc = 0; + // Two nulls are equal if indices are same + if (nbw1.isNull() && nbw2.isNull()) { + rc = nbw1.getIndex() - nbw2.getIndex(); + } else if (nbw1.isNull()) rc = -1; else rc = 1; } Modified: pig/branches/spark/src/org/apache/pig/backend/hadoop/executionengine/mapReduceLayer/PigBytesRawComparator.java URL: http://svn.apache.org/viewvc/pig/branches/spark/src/org/apache/pig/backend/hadoop/executionengine/mapReduceLayer/PigBytesRawComparator.java?rev=1733627&r1=1733626&r2=1733627&view=diff ============================================================================== --- pig/branches/spark/src/org/apache/pig/backend/hadoop/executionengine/mapReduceLayer/PigBytesRawComparator.java (original) +++ pig/branches/spark/src/org/apache/pig/backend/hadoop/executionengine/mapReduceLayer/PigBytesRawComparator.java Fri Mar 4 18:17:39 2016 @@ -24,7 +24,6 @@ import org.apache.commons.logging.LogFac import org.apache.hadoop.conf.Configurable; import org.apache.hadoop.conf.Configuration; import org.apache.hadoop.io.WritableComparator; -import org.apache.hadoop.mapred.JobConf; import org.apache.pig.data.BinInterSedes; import org.apache.pig.data.DataType; import org.apache.pig.impl.io.NullableBytesWritable; @@ -34,13 +33,14 @@ public class PigBytesRawComparator exten private final Log mLog = LogFactory.getLog(getClass()); private boolean[] mAsc; - private WritableComparator mWrappedComp; + private BinInterSedes.BinInterSedesTupleRawComparator mWrappedComp; public PigBytesRawComparator() { super(NullableBytesWritable.class); mWrappedComp = new BinInterSedes.BinInterSedesTupleRawComparator(); } + @Override public void setConf(Configuration conf) { try { mAsc = (boolean[])ObjectSerializer.deserialize(conf.get( @@ -57,6 +57,7 @@ public class PigBytesRawComparator exten ((BinInterSedes.BinInterSedesTupleRawComparator)mWrappedComp).setConf(conf); } + @Override public Configuration getConf() { return null; } @@ -69,6 +70,7 @@ public class PigBytesRawComparator exten * For non-bytearrays, we use BinInterSedesTupleRawComparator. * If either is null, null one is defined to be less. */ + @Override public int compare(byte[] b1, int s1, int l1, byte[] b2, int s2, int l2) { int rc = 0; @@ -122,14 +124,26 @@ public class PigBytesRawComparator exten if( dataByteArraysCompare ) { rc = WritableComparator.compareBytes(b1, offset1, length1, b2, offset2, length2); } else { - // Subtract 2, one for null byte and one for index byte. Also, do not reverse the sign - // of rc when mAsc[0] is false because BinInterSedesTupleRawComparator.compare() already - // takes that into account. - return mWrappedComp.compare(b1, s1 + 1, l1 - 2, b2, s2 + 1, l2 - 2); + // Subtract 2, one for null byte and one for index byte. + rc = mWrappedComp.compare(b1, s1 + 1, l1 - 2, b2, s2 + 1, l2 - 2); + // handle PIG-927. If tuples are equal but any field inside tuple is null, + // then we do not merge keys if indices are not same + if (rc == 0 && mWrappedComp.hasComparedTupleNull()) { + rc = b1[s1 + 1] - b2[s2 + 1]; + // Redundant as there will not be any sort order with multiple indices + // But just for sake of completeness. + if (!mAsc[0]) rc *= -1; + } + // PIG-4298 - Return here to avoid reversing the sign of rc when + // mAsc[0] is false because BinInterSedesTupleRawComparator.compare() + // already takes that into account. + return rc; } } else { - // For sorting purposes two nulls are equal. - if (b1[s1] != 0 && b2[s2] != 0) rc = 0; + // Two nulls are equal if indices are same + if (b1[s1] != 0 && b2[s2] != 0) { + rc = b1[s1 + 1] - b2[s2 + 1]; + } else if (b1[s1] != 0) rc = -1; else rc = 1; } @@ -137,6 +151,7 @@ public class PigBytesRawComparator exten return rc; } + @Override public int compare(Object o1, Object o2) { NullableBytesWritable nbw1 = (NullableBytesWritable)o1; NullableBytesWritable nbw2 = (NullableBytesWritable)o2; @@ -146,8 +161,10 @@ public class PigBytesRawComparator exten if (!nbw1.isNull() && !nbw2.isNull()) { rc = DataType.compare(nbw1.getValueAsPigType(), nbw2.getValueAsPigType()); } else { - // For sorting purposes two nulls are equal. - if (nbw1.isNull() && nbw2.isNull()) rc = 0; + // Two nulls are equal if indices are same + if (nbw1.isNull() && nbw2.isNull()) { + rc = nbw1.getIndex() - nbw2.getIndex(); + } else if (nbw1.isNull()) rc = -1; else rc = 1; } Modified: pig/branches/spark/src/org/apache/pig/backend/hadoop/executionengine/mapReduceLayer/PigCombiner.java URL: http://svn.apache.org/viewvc/pig/branches/spark/src/org/apache/pig/backend/hadoop/executionengine/mapReduceLayer/PigCombiner.java?rev=1733627&r1=1733626&r2=1733627&view=diff ============================================================================== --- pig/branches/spark/src/org/apache/pig/backend/hadoop/executionengine/mapReduceLayer/PigCombiner.java (original) +++ pig/branches/spark/src/org/apache/pig/backend/hadoop/executionengine/mapReduceLayer/PigCombiner.java Fri Mar 4 18:17:39 2016 @@ -27,9 +27,7 @@ import org.apache.hadoop.conf.Configurat import org.apache.hadoop.io.Writable; import org.apache.hadoop.mapreduce.Reducer; import org.apache.log4j.PropertyConfigurator; -import org.apache.pig.JVMReuseManager; import org.apache.pig.PigException; -import org.apache.pig.StaticDataCleanup; import org.apache.pig.backend.executionengine.ExecException; import org.apache.pig.backend.hadoop.HDataType; import org.apache.pig.backend.hadoop.executionengine.physicalLayer.POStatus; @@ -38,11 +36,13 @@ import org.apache.pig.backend.hadoop.exe import org.apache.pig.backend.hadoop.executionengine.physicalLayer.plans.PhysicalPlan; import org.apache.pig.backend.hadoop.executionengine.physicalLayer.relationalOperators.JoinPackager; import org.apache.pig.backend.hadoop.executionengine.physicalLayer.relationalOperators.POPackage; +import org.apache.pig.backend.hadoop.executionengine.util.MapRedUtil; import org.apache.pig.data.Tuple; import org.apache.pig.impl.PigContext; import org.apache.pig.impl.io.NullableTuple; import org.apache.pig.impl.io.PigNullableWritable; import org.apache.pig.impl.util.ObjectSerializer; +import org.apache.pig.impl.util.UDFContext; import org.apache.pig.tools.pigstats.PigStatusReporter; public class PigCombiner { @@ -75,11 +75,7 @@ public class PigCombiner { PigContext pigContext = null; private volatile boolean initialized = false; - static { - JVMReuseManager.getInstance().registerForStaticDataCleanup(Combine.class); - } - - @StaticDataCleanup + //@StaticDataCleanup public static void staticDataCleanup() { firstTime = true; } @@ -98,6 +94,8 @@ public class PigCombiner { pigContext = (PigContext)ObjectSerializer.deserialize(jConf.get("pig.pigContext")); if (pigContext.getLog4jProperties()!=null) PropertyConfigurator.configure(pigContext.getLog4jProperties()); + UDFContext.getUDFContext().reset(); + MapRedUtil.setupUDFContext(context.getConfiguration()); cp = (PhysicalPlan) ObjectSerializer.deserialize(jConf .get("pig.combinePlan")); Modified: pig/branches/spark/src/org/apache/pig/backend/hadoop/executionengine/mapReduceLayer/PigDateTimeRawComparator.java URL: http://svn.apache.org/viewvc/pig/branches/spark/src/org/apache/pig/backend/hadoop/executionengine/mapReduceLayer/PigDateTimeRawComparator.java?rev=1733627&r1=1733626&r2=1733627&view=diff ============================================================================== --- pig/branches/spark/src/org/apache/pig/backend/hadoop/executionengine/mapReduceLayer/PigDateTimeRawComparator.java (original) +++ pig/branches/spark/src/org/apache/pig/backend/hadoop/executionengine/mapReduceLayer/PigDateTimeRawComparator.java Fri Mar 4 18:17:39 2016 @@ -20,17 +20,15 @@ package org.apache.pig.backend.hadoop.ex import java.io.IOException; -import org.joda.time.DateTime; - import org.apache.commons.logging.Log; import org.apache.commons.logging.LogFactory; import org.apache.hadoop.conf.Configurable; import org.apache.hadoop.conf.Configuration; import org.apache.hadoop.io.WritableComparator; -import org.apache.hadoop.mapred.JobConf; import org.apache.pig.backend.hadoop.DateTimeWritable; import org.apache.pig.impl.io.NullableDateTimeWritable; import org.apache.pig.impl.util.ObjectSerializer; +import org.joda.time.DateTime; public class PigDateTimeRawComparator extends WritableComparator implements Configurable { @@ -44,6 +42,7 @@ public class PigDateTimeRawComparator ex mWrappedComp = new DateTimeWritable.Comparator(); } + @Override public void setConf(Configuration conf) { try { mAsc = (boolean[]) ObjectSerializer.deserialize(conf @@ -59,6 +58,7 @@ public class PigDateTimeRawComparator ex } } + @Override public Configuration getConf() { return null; } @@ -68,6 +68,7 @@ public class PigDateTimeRawComparator ex * IntWritable.compare() is used. If both are null then the indices are * compared. Otherwise the null one is defined to be less. */ + @Override public int compare(byte[] b1, int s1, int l1, byte[] b2, int s2, int l2) { int rc = 0; @@ -75,9 +76,10 @@ public class PigDateTimeRawComparator ex if (b1[s1] == 0 && b2[s2] == 0) { rc = mWrappedComp.compare(b1, s1 + 1, l1 - 2, b2, s2 + 1, l2 - 2); } else { - // For sorting purposes two nulls are equal. - if (b1[s1] != 0 && b2[s2] != 0) - rc = 0; + // Two nulls are equal if indices are same + if (b1[s1] != 0 && b2[s2] != 0) { + rc = b1[s1 + 1] - b2[s2 + 1]; + } else if (b1[s1] != 0) rc = -1; else @@ -88,6 +90,7 @@ public class PigDateTimeRawComparator ex return rc; } + @Override public int compare(Object o1, Object o2) { NullableDateTimeWritable ndtw1 = (NullableDateTimeWritable) o1; NullableDateTimeWritable ndtw2 = (NullableDateTimeWritable) o2; @@ -98,9 +101,10 @@ public class PigDateTimeRawComparator ex rc = ((DateTime) ndtw1.getValueAsPigType()) .compareTo((DateTime) ndtw2.getValueAsPigType()); } else { - // For sorting purposes two nulls are equal. - if (ndtw1.isNull() && ndtw2.isNull()) - rc = 0; + // Two nulls are equal if indices are same + if (ndtw1.isNull() && ndtw2.isNull()) { + rc = ndtw1.getIndex() - ndtw2.getIndex(); + } else if (ndtw1.isNull()) rc = -1; else Modified: pig/branches/spark/src/org/apache/pig/backend/hadoop/executionengine/mapReduceLayer/PigDoubleRawComparator.java URL: http://svn.apache.org/viewvc/pig/branches/spark/src/org/apache/pig/backend/hadoop/executionengine/mapReduceLayer/PigDoubleRawComparator.java?rev=1733627&r1=1733626&r2=1733627&view=diff ============================================================================== --- pig/branches/spark/src/org/apache/pig/backend/hadoop/executionengine/mapReduceLayer/PigDoubleRawComparator.java (original) +++ pig/branches/spark/src/org/apache/pig/backend/hadoop/executionengine/mapReduceLayer/PigDoubleRawComparator.java Fri Mar 4 18:17:39 2016 @@ -21,12 +21,9 @@ import java.io.IOException; import org.apache.commons.logging.Log; import org.apache.commons.logging.LogFactory; - import org.apache.hadoop.conf.Configurable; import org.apache.hadoop.conf.Configuration; import org.apache.hadoop.io.WritableComparator; -import org.apache.hadoop.mapred.JobConf; - import org.apache.pig.backend.hadoop.DoubleWritable; import org.apache.pig.impl.io.NullableDoubleWritable; import org.apache.pig.impl.util.ObjectSerializer; @@ -42,6 +39,7 @@ public class PigDoubleRawComparator exte mWrappedComp = new DoubleWritable.Comparator(); } + @Override public void setConf(Configuration conf) { try { mAsc = (boolean[])ObjectSerializer.deserialize(conf.get( @@ -57,6 +55,7 @@ public class PigDoubleRawComparator exte } } + @Override public Configuration getConf() { return null; } @@ -66,6 +65,7 @@ public class PigDoubleRawComparator exte * then IntWritable.compare() is used. If both are null then the indices * are compared. Otherwise the null one is defined to be less. */ + @Override public int compare(byte[] b1, int s1, int l1, byte[] b2, int s2, int l2) { int rc = 0; @@ -73,8 +73,10 @@ public class PigDoubleRawComparator exte if (b1[s1] == 0 && b2[s2] == 0) { rc = mWrappedComp.compare(b1, s1 + 1, l1 - 2, b2, s2 + 1, l2 - 2); } else { - // For sorting purposes two nulls are equal. - if (b1[s1] != 0 && b2[s2] != 0) rc = 0; + // Two nulls are equal if indices are same + if (b1[s1] != 0 && b2[s2] != 0) { + rc = b1[s1 + 1] - b2[s2 + 1]; + } else if (b1[s1] != 0) rc = -1; else rc = 1; } @@ -82,6 +84,7 @@ public class PigDoubleRawComparator exte return rc; } + @Override public int compare(Object o1, Object o2) { NullableDoubleWritable ndw1 = (NullableDoubleWritable)o1; NullableDoubleWritable ndw2 = (NullableDoubleWritable)o2; @@ -91,8 +94,10 @@ public class PigDoubleRawComparator exte if (!ndw1.isNull() && !ndw2.isNull()) { rc = ((Double)ndw1.getValueAsPigType()).compareTo((Double)ndw2.getValueAsPigType()); } else { - // For sorting purposes two nulls are equal. - if (ndw1.isNull() && ndw2.isNull()) rc = 0; + // Two nulls are equal if indices are same + if (ndw1.isNull() && ndw2.isNull()) { + rc = ndw1.getIndex() - ndw2.getIndex(); + } else if (ndw1.isNull()) rc = -1; else rc = 1; } Modified: pig/branches/spark/src/org/apache/pig/backend/hadoop/executionengine/mapReduceLayer/PigFloatRawComparator.java URL: http://svn.apache.org/viewvc/pig/branches/spark/src/org/apache/pig/backend/hadoop/executionengine/mapReduceLayer/PigFloatRawComparator.java?rev=1733627&r1=1733626&r2=1733627&view=diff ============================================================================== --- pig/branches/spark/src/org/apache/pig/backend/hadoop/executionengine/mapReduceLayer/PigFloatRawComparator.java (original) +++ pig/branches/spark/src/org/apache/pig/backend/hadoop/executionengine/mapReduceLayer/PigFloatRawComparator.java Fri Mar 4 18:17:39 2016 @@ -21,13 +21,10 @@ import java.io.IOException; import org.apache.commons.logging.Log; import org.apache.commons.logging.LogFactory; - import org.apache.hadoop.conf.Configurable; import org.apache.hadoop.conf.Configuration; import org.apache.hadoop.io.FloatWritable; import org.apache.hadoop.io.WritableComparator; -import org.apache.hadoop.mapred.JobConf; - import org.apache.pig.impl.io.NullableFloatWritable; import org.apache.pig.impl.util.ObjectSerializer; @@ -42,6 +39,7 @@ public class PigFloatRawComparator exten mWrappedComp = new FloatWritable.Comparator(); } + @Override public void setConf(Configuration conf) { try { mAsc = (boolean[])ObjectSerializer.deserialize(conf.get( @@ -57,6 +55,7 @@ public class PigFloatRawComparator exten } } + @Override public Configuration getConf() { return null; } @@ -66,6 +65,7 @@ public class PigFloatRawComparator exten * then IntWritable.compare() is used. If both are null then the indices * are compared. Otherwise the null one is defined to be less. */ + @Override public int compare(byte[] b1, int s1, int l1, byte[] b2, int s2, int l2) { int rc = 0; @@ -73,8 +73,10 @@ public class PigFloatRawComparator exten if (b1[s1] == 0 && b2[s2] == 0) { rc = mWrappedComp.compare(b1, s1 + 1, l1 - 2, b2, s2 + 1, l2 - 2); } else { - // For sorting purposes two nulls are equal. - if (b1[s1] != 0 && b2[s2] != 0) rc = 0; + // Two nulls are equal if indices are same + if (b1[s1] != 0 && b2[s2] != 0) { + rc = b1[s1 + 1] - b2[s2 + 1]; + } else if (b1[s1] != 0) rc = -1; else rc = 1; } @@ -82,6 +84,7 @@ public class PigFloatRawComparator exten return rc; } + @Override public int compare(Object o1, Object o2) { NullableFloatWritable nfw1 = (NullableFloatWritable)o1; NullableFloatWritable nfw2 = (NullableFloatWritable)o2; @@ -91,8 +94,10 @@ public class PigFloatRawComparator exten if (!nfw1.isNull() && !nfw2.isNull()) { rc = ((Float)nfw1.getValueAsPigType()).compareTo((Float)nfw2.getValueAsPigType()); } else { - // For sorting purposes two nulls are equal. - if (nfw1.isNull() && nfw2.isNull()) rc = 0; + // Two nulls are equal if indices are same + if (nfw1.isNull() && nfw2.isNull()) { + rc = nfw1.getIndex() - nfw2.getIndex(); + } else if (nfw1.isNull()) rc = -1; else rc = 1; } Modified: pig/branches/spark/src/org/apache/pig/backend/hadoop/executionengine/mapReduceLayer/PigGenericMapBase.java URL: http://svn.apache.org/viewvc/pig/branches/spark/src/org/apache/pig/backend/hadoop/executionengine/mapReduceLayer/PigGenericMapBase.java?rev=1733627&r1=1733626&r2=1733627&view=diff ============================================================================== --- pig/branches/spark/src/org/apache/pig/backend/hadoop/executionengine/mapReduceLayer/PigGenericMapBase.java (original) +++ pig/branches/spark/src/org/apache/pig/backend/hadoop/executionengine/mapReduceLayer/PigGenericMapBase.java Fri Mar 4 18:17:39 2016 @@ -112,7 +112,7 @@ public abstract class PigGenericMapBase return; } - if(PigMapReduce.sJobConfInternal.get().get(JobControlCompiler.END_OF_INP_IN_MAP, "false").equals("true")) { + if(PigMapReduce.sJobConfInternal.get().get(JobControlCompiler.END_OF_INP_IN_MAP, "false").equals("true") && !mp.isEmpty()) { // If there is a stream in the pipeline or if this map job belongs to merge-join we could // potentially have more to process - so lets // set the flag stating that all map input has been sent Modified: pig/branches/spark/src/org/apache/pig/backend/hadoop/executionengine/mapReduceLayer/PigGenericMapReduce.java URL: http://svn.apache.org/viewvc/pig/branches/spark/src/org/apache/pig/backend/hadoop/executionengine/mapReduceLayer/PigGenericMapReduce.java?rev=1733627&r1=1733626&r2=1733627&view=diff ============================================================================== --- pig/branches/spark/src/org/apache/pig/backend/hadoop/executionengine/mapReduceLayer/PigGenericMapReduce.java (original) +++ pig/branches/spark/src/org/apache/pig/backend/hadoop/executionengine/mapReduceLayer/PigGenericMapReduce.java Fri Mar 4 18:17:39 2016 @@ -30,11 +30,8 @@ import org.apache.hadoop.io.Writable; import org.apache.hadoop.mapred.jobcontrol.Job; import org.apache.hadoop.mapreduce.JobContext; import org.apache.hadoop.mapreduce.Reducer; -import org.apache.hadoop.mapreduce.Mapper.Context; -import org.apache.pig.JVMReuseManager; import org.apache.pig.PigConstants; import org.apache.pig.PigException; -import org.apache.pig.StaticDataCleanup; import org.apache.pig.backend.executionengine.ExecException; import org.apache.pig.backend.hadoop.HDataType; import org.apache.pig.backend.hadoop.datastorage.ConfigurationUtil; @@ -51,7 +48,6 @@ import org.apache.pig.data.DataBag; import org.apache.pig.data.DataType; import org.apache.pig.data.SchemaTupleBackend; import org.apache.pig.data.Tuple; -import org.apache.pig.data.TupleFactory; import org.apache.pig.impl.PigContext; import org.apache.pig.impl.io.NullablePartitionWritable; import org.apache.pig.impl.io.NullableTuple; @@ -105,11 +101,7 @@ public class PigGenericMapReduce { public static ThreadLocal<Configuration> sJobConfInternal = new ThreadLocal<Configuration>(); - static { - JVMReuseManager.getInstance().registerForStaticDataCleanup(PigGenericMapReduce.class); - } - - @StaticDataCleanup + //@StaticDataCleanup public static void staticDataCleanup() { sJobContext = null; sJobConf = null; @@ -139,92 +131,6 @@ public class PigGenericMapReduce { } /** - * This map is only used for the Rollup when the RollupHIIOptimizer is enabled - * - */ - public static class MapRollupHII extends PigMapBase { - @Override - public void collect(Context oc, Tuple tuple) - throws InterruptedException, IOException { - - Byte index = (Byte)tuple.get(0); - PigNullableWritable key = - HDataType.getWritableComparableTypes(tuple.get(1), keyType); - NullableTuple val = new NullableTuple((Tuple)tuple.get(2)); - - // Both the key and the value need the index. The key needs it so - // that it can be sorted on the index in addition to the key - // value. The value needs it so that POPackage can properly - // assign the tuple to its slot in the projection. - key.setIndex(index); - val.setIndex(index); - - oc.write(key, val); - } - - @Override - public void cleanup(Context oc) - throws InterruptedException, IOException { - - Configuration jConf = oc.getConfiguration(); - - boolean isHII = jConf.getBoolean(PigConstants.PIG_HII_ROLLUP_OPTIMIZABLE, false); - //If our rule is enabled and is using, there will be a PORollupHIIForEach - //We will create marker tuples which are considered as markers for reducers - //to calculate the remaining results when that reducer goes to the end of the - //input records. This marker tuple will have larger size than the defaut by one - //dimension. This addition dimension will be the value which are ranged from 0 to - //number of reducers. By this addition, we can make sure that every reducers can - //receive these marker tuples to finish their works. - if(isHII) { - int reducerNo = jConf.getInt("mapred.reduce.tasks", 0); - int length = jConf.getInt(PigConstants.PIG_HII_NUMBER_TOTAL_FIELD, 0); - int nAlgebraic = jConf.getInt(PigConstants.PIG_HII_NUMBER_ALGEBRAIC, 1); - - if(length == 0) - return; - - TupleFactory mTupleFactory = TupleFactory.getInstance(); - //An array of marker tuples which has size equals to number of reducers - Tuple group[] = new Tuple[reducerNo]; - int count = 0; - //Make sure that all reducers will receive those marker tuples - while(count < reducerNo) { - //Create marker tuple with last field is the reducer's index, - //the rest are null. - group[count] = mTupleFactory.newTuple(); - for (int k = 0; k <= length; k++) { - if(k < length) { - group[count].append(null); - } else { - group[count].append(count); - } - } - - Tuple value = mTupleFactory.newTuple(); - Tuple []tmp = new Tuple[nAlgebraic]; - long valtmp = 1; - for(int i = 0; i < nAlgebraic; i++){ - tmp[i] = mTupleFactory.newTuple(); - tmp[i].append(valtmp); - value.append(tmp[i]); - } - Tuple out = mTupleFactory.newTuple(); - out.append(0); - out.append(group[count]); - out.append(value); - - PigNullableWritable key = HDataType.getWritableComparableTypes(out.get(1), keyType); - NullableTuple val = new NullableTuple((Tuple)out.get(2)); - oc.write(key, val); - count++; - } - } - super.cleanup(oc); - } - } - - /** * This "specialized" map class is ONLY to be used in pig queries with * order by a udf. A UDF used for comparison in the order by expects * to be handed tuples. Hence this map class ensures that the "key" used @@ -609,7 +515,7 @@ public class PigGenericMapReduce { return; } - if(PigMapReduce.sJobConfInternal.get().get("pig.stream.in.reduce", "false").equals("true")) { + if(PigMapReduce.sJobConfInternal.get().get("pig.stream.in.reduce", "false").equals("true") && !rp.isEmpty()) { // If there is a stream in the pipeline we could // potentially have more to process - so lets // set the flag stating that all map input has been sent Modified: pig/branches/spark/src/org/apache/pig/backend/hadoop/executionengine/mapReduceLayer/PigInputFormat.java URL: http://svn.apache.org/viewvc/pig/branches/spark/src/org/apache/pig/backend/hadoop/executionengine/mapReduceLayer/PigInputFormat.java?rev=1733627&r1=1733626&r2=1733627&view=diff ============================================================================== --- pig/branches/spark/src/org/apache/pig/backend/hadoop/executionengine/mapReduceLayer/PigInputFormat.java (original) +++ pig/branches/spark/src/org/apache/pig/backend/hadoop/executionengine/mapReduceLayer/PigInputFormat.java Fri Mar 4 18:17:39 2016 @@ -57,6 +57,9 @@ public class PigInputFormat extends Inpu .getLog(PigInputFormat.class); public static final String PIG_INPUTS = "pig.inputs"; + public static final String PIG_INPUT_TARGETS = "pig.inpTargets"; + public static final String PIG_INPUT_SIGNATURES = "pig.inpSignatures"; + public static final String PIG_INPUT_LIMITS = "pig.inpLimits"; /** * @deprecated Use {@link UDFContext} instead in the following way to get @@ -109,7 +112,7 @@ public class PigInputFormat extends Inpu List<Long> inpLimitLists = (ArrayList<Long>)ObjectSerializer.deserialize( - conf.get("pig.inpLimits")); + conf.get(PIG_INPUT_LIMITS)); return new PigRecordReader(inputFormat, pigSplit, loadFunc, context, inpLimitLists.get(pigSplit.getInputIndex())); } @@ -171,7 +174,7 @@ public class PigInputFormat extends Inpu Configuration conf) throws IOException { List<String> inpSignatureLists = (ArrayList<String>)ObjectSerializer.deserialize( - conf.get("pig.inpSignatures")); + conf.get(PIG_INPUT_SIGNATURES)); // signature can be null for intermediate jobs where it will not // be required to be passed down if(inpSignatureLists.get(inputIndex) != null) { @@ -197,9 +200,9 @@ public class PigInputFormat extends Inpu PigContext pigContext; try { inputs = (ArrayList<FileSpec>) ObjectSerializer - .deserialize(conf.get("pig.inputs")); + .deserialize(conf.get(PIG_INPUTS)); inpTargets = (ArrayList<ArrayList<OperatorKey>>) ObjectSerializer - .deserialize(conf.get("pig.inpTargets")); + .deserialize(conf.get(PIG_INPUT_TARGETS)); pigContext = (PigContext) ObjectSerializer.deserialize(conf .get("pig.pigContext")); PigContext.setPackageImportList((ArrayList<String>)ObjectSerializer.deserialize(conf.get("udf.import.list"))); Modified: pig/branches/spark/src/org/apache/pig/backend/hadoop/executionengine/mapReduceLayer/PigIntRawComparator.java URL: http://svn.apache.org/viewvc/pig/branches/spark/src/org/apache/pig/backend/hadoop/executionengine/mapReduceLayer/PigIntRawComparator.java?rev=1733627&r1=1733626&r2=1733627&view=diff ============================================================================== --- pig/branches/spark/src/org/apache/pig/backend/hadoop/executionengine/mapReduceLayer/PigIntRawComparator.java (original) +++ pig/branches/spark/src/org/apache/pig/backend/hadoop/executionengine/mapReduceLayer/PigIntRawComparator.java Fri Mar 4 18:17:39 2016 @@ -21,12 +21,9 @@ import java.io.IOException; import org.apache.commons.logging.Log; import org.apache.commons.logging.LogFactory; - import org.apache.hadoop.conf.Configurable; import org.apache.hadoop.conf.Configuration; import org.apache.hadoop.io.WritableComparator; -import org.apache.hadoop.mapred.JobConf; - import org.apache.pig.impl.io.NullableIntWritable; import org.apache.pig.impl.util.ObjectSerializer; @@ -39,6 +36,7 @@ public class PigIntRawComparator extends super(NullableIntWritable.class); } + @Override public void setConf(Configuration conf) { try { mAsc = (boolean[])ObjectSerializer.deserialize(conf.get( @@ -54,6 +52,7 @@ public class PigIntRawComparator extends } } + @Override public Configuration getConf() { return null; } @@ -63,6 +62,7 @@ public class PigIntRawComparator extends * then IntWritable.compare() is used. If both are null then the indices * are compared. Otherwise the null one is defined to be less. */ + @Override public int compare(byte[] b1, int s1, int l1, byte[] b2, int s2, int l2) { int rc = 0; @@ -72,8 +72,10 @@ public class PigIntRawComparator extends int int2 = readInt(b2, s2 + 1); rc = (int1 < int2) ? -1 : ((int1 > int2) ? 1 : 0); } else { - // For sorting purposes two nulls are equal. - if (b1[s1] != 0 && b2[s2] != 0) rc = 0; + // Two nulls are equal if indices are same + if (b1[s1] != 0 && b2[s2] != 0) { + rc = b1[s1 + 1] - b2[s2 + 1]; + } else if (b1[s1] != 0) rc = -1; else rc = 1; } @@ -81,6 +83,7 @@ public class PigIntRawComparator extends return rc; } + @Override public int compare(Object o1, Object o2) { NullableIntWritable niw1 = (NullableIntWritable)o1; NullableIntWritable niw2 = (NullableIntWritable)o2; @@ -90,8 +93,10 @@ public class PigIntRawComparator extends if (!niw1.isNull() && !niw2.isNull()) { rc = ((Integer)niw1.getValueAsPigType()).compareTo((Integer)niw2.getValueAsPigType()); } else { - // For sorting purposes two nulls are equal. - if (niw1.isNull() && niw2.isNull()) rc = 0; + // Two nulls are equal if indices are same + if (niw1.isNull() && niw2.isNull()) { + rc = niw1.getIndex() - niw2.getIndex(); + } else if (niw1.isNull()) rc = -1; else rc = 1; } Modified: pig/branches/spark/src/org/apache/pig/backend/hadoop/executionengine/mapReduceLayer/PigLongRawComparator.java URL: http://svn.apache.org/viewvc/pig/branches/spark/src/org/apache/pig/backend/hadoop/executionengine/mapReduceLayer/PigLongRawComparator.java?rev=1733627&r1=1733626&r2=1733627&view=diff ============================================================================== --- pig/branches/spark/src/org/apache/pig/backend/hadoop/executionengine/mapReduceLayer/PigLongRawComparator.java (original) +++ pig/branches/spark/src/org/apache/pig/backend/hadoop/executionengine/mapReduceLayer/PigLongRawComparator.java Fri Mar 4 18:17:39 2016 @@ -21,22 +21,18 @@ import java.io.IOException; import org.apache.commons.logging.Log; import org.apache.commons.logging.LogFactory; - import org.apache.hadoop.conf.Configurable; import org.apache.hadoop.conf.Configuration; import org.apache.hadoop.io.LongWritable; import org.apache.hadoop.io.WritableComparator; -import org.apache.hadoop.mapred.JobConf; - -import org.apache.pig.impl.io.NullableLongWritable; import org.apache.pig.impl.io.NullableLongWritable; import org.apache.pig.impl.util.ObjectSerializer; public class PigLongRawComparator extends WritableComparator implements Configurable { - private final Log mLog = LogFactory.getLog(getClass()); - private boolean[] mAsc; - private LongWritable.Comparator mWrappedComp; + protected final Log mLog = LogFactory.getLog(getClass()); + protected boolean[] mAsc; + protected LongWritable.Comparator mWrappedComp; public PigLongRawComparator() { super(NullableLongWritable.class); @@ -44,6 +40,7 @@ public class PigLongRawComparator extend } + @Override public void setConf(Configuration conf) { try { mAsc = (boolean[])ObjectSerializer.deserialize(conf.get( @@ -59,6 +56,7 @@ public class PigLongRawComparator extend } } + @Override public Configuration getConf() { return null; } @@ -68,6 +66,7 @@ public class PigLongRawComparator extend * then IntWritable.compare() is used. If both are null then the indices * are compared. Otherwise the null one is defined to be less. */ + @Override public int compare(byte[] b1, int s1, int l1, byte[] b2, int s2, int l2) { int rc = 0; @@ -75,8 +74,10 @@ public class PigLongRawComparator extend if (b1[s1] == 0 && b2[s2] == 0) { rc = mWrappedComp.compare(b1, s1 + 1, l1 - 2, b2, s2 + 1, l2 - 2); } else { - // For sorting purposes two nulls are equal. - if (b1[s1] != 0 && b2[s2] != 0) rc = 0; + // Two nulls are equal if indices are same + if (b1[s1] != 0 && b2[s2] != 0) { + rc = b1[s1 + 1] - b2[s2 + 1]; + } else if (b1[s1] != 0) rc = -1; else rc = 1; } @@ -84,6 +85,7 @@ public class PigLongRawComparator extend return rc; } + @Override public int compare(Object o1, Object o2) { NullableLongWritable nlw1 = (NullableLongWritable)o1; NullableLongWritable nlw2 = (NullableLongWritable)o2; @@ -93,8 +95,10 @@ public class PigLongRawComparator extend if (!nlw1.isNull() && !nlw2.isNull()) { rc = ((Long)nlw1.getValueAsPigType()).compareTo((Long)nlw2.getValueAsPigType()); } else { - // For sorting purposes two nulls are equal. - if (nlw1.isNull() && nlw2.isNull()) rc = 0; + // Two nulls are equal if indices are same + if (nlw1.isNull() && nlw2.isNull()) { + rc = nlw1.getIndex() - nlw2.getIndex(); + } else if (nlw1.isNull()) rc = -1; else rc = 1; } Modified: pig/branches/spark/src/org/apache/pig/backend/hadoop/executionengine/mapReduceLayer/PigOutputFormat.java URL: http://svn.apache.org/viewvc/pig/branches/spark/src/org/apache/pig/backend/hadoop/executionengine/mapReduceLayer/PigOutputFormat.java?rev=1733627&r1=1733626&r2=1733627&view=diff ============================================================================== --- pig/branches/spark/src/org/apache/pig/backend/hadoop/executionengine/mapReduceLayer/PigOutputFormat.java (original) +++ pig/branches/spark/src/org/apache/pig/backend/hadoop/executionengine/mapReduceLayer/PigOutputFormat.java Fri Mar 4 18:17:39 2016 @@ -34,6 +34,7 @@ import org.apache.pig.OverwritableStoreF import org.apache.pig.StoreFuncInterface; import org.apache.pig.backend.hadoop.datastorage.ConfigurationUtil; import org.apache.pig.backend.hadoop.executionengine.physicalLayer.relationalOperators.POStore; +import org.apache.pig.backend.hadoop.executionengine.physicalLayer.relationalOperators.StoreFuncDecorator; import org.apache.pig.backend.hadoop.executionengine.shims.HadoopShims; import org.apache.pig.backend.hadoop.executionengine.util.MapRedUtil; import org.apache.pig.data.Tuple; @@ -74,13 +75,14 @@ public class PigOutputFormat extends Out store = reduceStores.get(0); } StoreFuncInterface sFunc = store.getStoreFunc(); + StoreFuncDecorator decorator = store.getStoreFuncDecorator(); // set output location PigOutputFormat.setLocation(taskattemptcontext, store); // The above call should have update the conf in the JobContext // to have the output location - now call checkOutputSpecs() RecordWriter writer = sFunc.getOutputFormat().getRecordWriter( taskattemptcontext); - return new PigRecordWriter(writer, sFunc, Mode.SINGLE_STORE); + return new PigRecordWriter(writer, decorator, Mode.SINGLE_STORE); } else { // multi store case - in this case, all writing is done through // MapReducePOStoreImpl - set up a dummy RecordWriter @@ -107,18 +109,24 @@ public class PigOutputFormat extends Out private StoreFuncInterface sFunc; /** + * The StoreFuncDecorator we use to write Tuples + */ + private StoreFuncDecorator storeDecorator; + + /** * Single Query or multi query */ private Mode mode; - public PigRecordWriter(RecordWriter wrappedWriter, StoreFuncInterface sFunc, + public PigRecordWriter(RecordWriter wrappedWriter, StoreFuncDecorator storeDecorator, Mode mode) throws IOException { this.mode = mode; if(mode == Mode.SINGLE_STORE) { this.wrappedWriter = wrappedWriter; - this.sFunc = sFunc; + this.sFunc = storeDecorator.getStorer(); + this.storeDecorator = storeDecorator; this.sFunc.prepareToWrite(this.wrappedWriter); } } @@ -133,7 +141,7 @@ public class PigOutputFormat extends Out public void write(WritableComparable key, Tuple value) throws IOException, InterruptedException { if(mode == Mode.SINGLE_STORE) { - sFunc.putNext(value); + storeDecorator.putNext(value); } else { throw new IOException("Internal Error: Unexpected code path"); } Modified: pig/branches/spark/src/org/apache/pig/backend/hadoop/executionengine/mapReduceLayer/PigSecondaryKeyComparator.java URL: http://svn.apache.org/viewvc/pig/branches/spark/src/org/apache/pig/backend/hadoop/executionengine/mapReduceLayer/PigSecondaryKeyComparator.java?rev=1733627&r1=1733626&r2=1733627&view=diff ============================================================================== --- pig/branches/spark/src/org/apache/pig/backend/hadoop/executionengine/mapReduceLayer/PigSecondaryKeyComparator.java (original) +++ pig/branches/spark/src/org/apache/pig/backend/hadoop/executionengine/mapReduceLayer/PigSecondaryKeyComparator.java Fri Mar 4 18:17:39 2016 @@ -17,19 +17,18 @@ */ package org.apache.pig.backend.hadoop.executionengine.mapReduceLayer; -import org.apache.commons.logging.Log; -import org.apache.commons.logging.LogFactory; import org.apache.hadoop.conf.Configurable; import org.apache.hadoop.conf.Configuration; +import org.apache.hadoop.io.WritableComparable; import org.apache.hadoop.io.WritableComparator; -import org.apache.hadoop.mapred.JobConf; +import org.apache.pig.data.Tuple; import org.apache.pig.data.TupleFactory; import org.apache.pig.data.TupleRawComparator; import org.apache.pig.impl.io.NullableTuple; import org.apache.pig.impl.io.PigNullableWritable; public class PigSecondaryKeyComparator extends WritableComparator implements Configurable { - private final Log mLog = LogFactory.getLog(getClass()); + private TupleRawComparator mComparator=null; @Override @@ -54,6 +53,7 @@ public class PigSecondaryKeyComparator e return null; } + @Override public int compare(byte[] b1, int s1, int l1, byte[] b2, int s2, int l2) { // the last byte of a NullableTuple is its Index @@ -84,6 +84,47 @@ public class PigSecondaryKeyComparator e rc = -1; else rc = 1; + } + return rc; + } + + @SuppressWarnings("unchecked") + @Override + public int compare(WritableComparable a, WritableComparable b) + { + PigNullableWritable wa = (PigNullableWritable)a; + PigNullableWritable wb = (PigNullableWritable)b; + + if ((wa.getIndex() & PigNullableWritable.mqFlag) != 0) { // this is a multi-query index + if ((wa.getIndex() & PigNullableWritable.idxSpace) < (wb.getIndex() & PigNullableWritable.idxSpace)) + return -1; + else if ((wa.getIndex() & PigNullableWritable.idxSpace) > (wb.getIndex() & PigNullableWritable.idxSpace)) + return 1; + // If equal, we fall through + } + + int rc = 0; + // If either are null, handle differently. + if (!wa.isNull() && !wb.isNull()) { + rc = mComparator.compare((Tuple) wa.getValueAsPigType(), (Tuple) wb.getValueAsPigType()); + // handle PIG-927 + // if tuples are equal but any field inside tuple is null, then we do not merge keys + if (rc == 0 && mComparator.hasComparedTupleNull()) + rc = (wa.getIndex() & PigNullableWritable.idxSpace) - (wb.getIndex() & PigNullableWritable.idxSpace); + } else { + // Two nulls are equal if indices are same + if (wa.isNull() && wb.isNull()) { + if ((wa.getIndex() & PigNullableWritable.idxSpace) < (wb.getIndex() & PigNullableWritable.idxSpace)) + rc = -1; + else if ((wa.getIndex() & PigNullableWritable.idxSpace) > (wb.getIndex() & PigNullableWritable.idxSpace)) + rc = 1; + else + rc = 0; + } + else if (wa.isNull()) + rc = -1; + else + rc = 1; } return rc; } Modified: pig/branches/spark/src/org/apache/pig/backend/hadoop/executionengine/mapReduceLayer/PigSplit.java URL: http://svn.apache.org/viewvc/pig/branches/spark/src/org/apache/pig/backend/hadoop/executionengine/mapReduceLayer/PigSplit.java?rev=1733627&r1=1733626&r2=1733627&view=diff ============================================================================== --- pig/branches/spark/src/org/apache/pig/backend/hadoop/executionengine/mapReduceLayer/PigSplit.java (original) +++ pig/branches/spark/src/org/apache/pig/backend/hadoop/executionengine/mapReduceLayer/PigSplit.java Fri Mar 4 18:17:39 2016 @@ -20,7 +20,9 @@ package org.apache.pig.backend.hadoop.ex import java.io.ByteArrayInputStream; import java.io.ByteArrayOutputStream; import java.io.DataInput; +import java.io.DataInputStream; import java.io.DataOutput; +import java.io.DataOutputStream; import java.io.IOException; import java.io.InputStream; import java.io.ObjectInputStream; @@ -30,13 +32,15 @@ import java.io.Serializable; import java.util.ArrayList; import java.util.Arrays; import java.util.Comparator; -import java.util.List; -import java.util.HashSet; import java.util.HashMap; +import java.util.HashSet; +import java.util.List; import java.util.Map; -import java.util.Set; import java.util.Map.Entry; -import java.lang.StringBuilder; +import java.util.Set; +import java.util.zip.Deflater; +import java.util.zip.DeflaterOutputStream; +import java.util.zip.InflaterInputStream; import org.apache.hadoop.conf.Configurable; import org.apache.hadoop.conf.Configuration; @@ -46,7 +50,10 @@ import org.apache.hadoop.io.serializer.S import org.apache.hadoop.io.serializer.Serializer; import org.apache.hadoop.mapreduce.InputFormat; import org.apache.hadoop.mapreduce.InputSplit; +import org.apache.hadoop.mapreduce.lib.input.FileSplit; import org.apache.hadoop.util.ReflectionUtils; +import org.apache.pig.PigConfiguration; +import org.apache.pig.data.WritableByteArray; import org.apache.pig.impl.plan.OperatorKey; /** @@ -59,19 +66,22 @@ import org.apache.pig.impl.plan.Operator * wrapped InputSplit. */ public class PigSplit extends InputSplit implements Writable, Configurable { + + + private static String FILESPLIT_CLASSNAME = FileSplit.class.getName(); //The operators to which the tuples from this //input file are attached. These are the successors //of the load operator representing this input private ArrayList<OperatorKey> targetOps; // index starting from 0 representing the input number - // So if we have 3 inputs (say for a 3 way join), then the + // So if we have 3 inputs (say for a 3 way join), then the // splits corresponding to the first input will have an index of 0, those // corresponding to the second will have an index of 1 and so on // This will be used to get the LoadFunc corresponding to the input // in PigInputFormat and related code. private int inputIndex; - + // The real InputSplit this split is wrapping private InputSplit[] wrappedSplits; @@ -80,36 +90,36 @@ public class PigSplit extends InputSplit // This will be used by MergeJoinIndexer to record the split # in the // index private int splitIndex; - + // index of current splits being process private int currentIdx; - + // the flag indicates this is a multi-input join (i.e. join) - // so that custom Hadoop counters will be created in the + // so that custom Hadoop counters will be created in the // back-end to track the number of records for each input. private boolean isMultiInputs = false; - + // the flag indicates the custom Hadoop counter should be disabled. // This is to prevent the number of counters exceeding the limit. // This flag is controlled by Pig property "pig.disable.counter" ( // the default value is 'false'). private boolean disableCounter = false; - + /** * the job Configuration */ private Configuration conf; - + /** * total number of splits - required by skew join */ private int totalSplits; - + /** * total length */ private long length = -1; - + /** * overall locations */ @@ -118,8 +128,8 @@ public class PigSplit extends InputSplit // this seems necessary for Hadoop to instatiate this split on the // backend public PigSplit() {} - - public PigSplit(InputSplit[] wrappedSplits, int inputIndex, + + public PigSplit(InputSplit[] wrappedSplits, int inputIndex, List<OperatorKey> targetOps, int splitIndex) { this.wrappedSplits = wrappedSplits; this.inputIndex = inputIndex; @@ -127,30 +137,30 @@ public class PigSplit extends InputSplit this.splitIndex = splitIndex; this.currentIdx = 0; } - + public List<OperatorKey> getTargetOps() { return new ArrayList<OperatorKey>(targetOps); } - + /** - * This methods returns the actual InputSplit (as returned by the + * This methods returns the actual InputSplit (as returned by the * {@link InputFormat}) which this class is wrapping. * @return the wrappedSplit */ public InputSplit getWrappedSplit() { return wrappedSplits[currentIdx]; } - + /** - * + * * @param idx the index into the wrapped splits * @return the specified wrapped split */ public InputSplit getWrappedSplit(int idx) { return wrappedSplits[idx]; } - + @Override @SuppressWarnings("unchecked") public String[] getLocations() throws IOException, InterruptedException { @@ -200,7 +210,7 @@ public class PigSplit extends InputSplit } return length; } - + /** * Return the length of a wrapped split * @param idx the index into the wrapped splits @@ -210,6 +220,7 @@ public class PigSplit extends InputSplit return wrappedSplits[idx].getLength(); } + @Override @SuppressWarnings("unchecked") public void readFields(DataInput is) throws IOException { disableCounter = is.readBoolean(); @@ -220,32 +231,59 @@ public class PigSplit extends InputSplit targetOps = (ArrayList<OperatorKey>) readObject(is); int splitLen = is.readInt(); int distinctSplitClassCount = is.readInt(); + boolean nonFileSplit = false; //construct the input split class name list + String[] distinctSplitClassName = new String[distinctSplitClassCount]; for (int i = 0; i < distinctSplitClassCount; i++) { distinctSplitClassName[i] = is.readUTF(); + if (!distinctSplitClassName[i].equals(FILESPLIT_CLASSNAME)) { + nonFileSplit = true; + } } try { SerializationFactory sf = new SerializationFactory(conf); // The correct call sequence for Deserializer is, we shall open, then deserialize, but we shall not close wrappedSplits = new InputSplit[splitLen]; + + if (splitLen <= 0) { + return; + } + + // Do not compress if everything is FileSplit as it does not compress much + // but adds few seconds for 30K+ tasks + boolean compress = nonFileSplit && conf.getBoolean( + PigConfiguration.PIG_COMPRESS_INPUT_SPLITS, + PigConfiguration.PIG_COMPRESS_INPUT_SPLITS_DEFAULT); + DataInputStream dis = null; + if (compress) { + int numBytes = is.readInt(); + byte[] buf = new byte[numBytes]; + is.readFully(buf, 0, numBytes); + dis = new DataInputStream(new InflaterInputStream(new ByteArrayInputStream(buf))); + } + DataInput dataIn = compress ? dis : is; for (int i = 0; i < splitLen; i++) { //read the className index - int index = is.readInt(); + int index = dataIn.readInt(); //get the split class name String splitClassName = distinctSplitClassName[index]; Class splitClass = conf.getClassByName(splitClassName); Deserializer d = sf.getDeserializer(splitClass); - d.open((InputStream) is); + d.open((InputStream) dataIn); wrappedSplits[i] = (InputSplit)ReflectionUtils.newInstance(splitClass, conf); d.deserialize(wrappedSplits[i]); } + if (compress && splitLen > 0) { + dis.close(); + } } catch (ClassNotFoundException e) { throw new IOException(e); } } + @Override @SuppressWarnings("unchecked") public void write(DataOutput os) throws IOException { os.writeBoolean(disableCounter); @@ -262,6 +300,7 @@ public class PigSplit extends InputSplit } List<String> distinctSplitClassList = new ArrayList<String>(); distinctSplitClassList.addAll(splitClassNameSet); + boolean nonFileSplit = distinctSplitClassList.size() > 1 || (!distinctSplitClassList.contains(FILESPLIT_CLASSNAME)); //write the distinct number of split class name os.writeInt(distinctSplitClassList.size()); //write each classname once @@ -270,20 +309,43 @@ public class PigSplit extends InputSplit } SerializationFactory sf = new SerializationFactory(conf); + if (wrappedSplits.length <= 0) { + return; + } + + boolean compress = nonFileSplit && conf.getBoolean( + PigConfiguration.PIG_COMPRESS_INPUT_SPLITS, + PigConfiguration.PIG_COMPRESS_INPUT_SPLITS_DEFAULT); + WritableByteArray byteStream = null; + Deflater deflater = null; + DataOutputStream dos = null; + if (compress) { + byteStream = new WritableByteArray(16384); + deflater = new Deflater(Deflater.BEST_COMPRESSION); + dos = new DataOutputStream(new DeflaterOutputStream(byteStream, deflater)); + } + DataOutput dataOut = compress ? dos : os; for (int i = 0; i < wrappedSplits.length; i++) { //find out the index of the split class name int index = distinctSplitClassList.indexOf(wrappedSplits[i].getClass().getName()); - os.writeInt(index); + dataOut.writeInt(index); Serializer s = sf.getSerializer(wrappedSplits[i].getClass()); //Checks if Serializer is NULL or not before calling open() method on it. if (s == null) { throw new IllegalArgumentException("Could not find Serializer for class "+wrappedSplits[i].getClass()+". InputSplits must implement Writable."); } - s.open((OutputStream) os); + s.open((OutputStream) dataOut); // The correct call sequence for Serializer is, we shall open, then serialize, but we shall not close s.serialize(wrappedSplits[i]); } + if (compress) { + //Get the compressed serialized bytes and write them + dos.close(); + os.writeInt(byteStream.getLength()); + os.write(byteStream.getData(), 0, byteStream.getLength()); + deflater.end(); + } } @@ -292,6 +354,7 @@ public class PigSplit extends InputSplit ByteArrayOutputStream baos = new ByteArrayOutputStream(); ObjectOutputStream oos = new ObjectOutputStream(baos); oos.writeObject(obj); + oos.flush(); byte[] bytes = baos.toByteArray(); os.writeInt(bytes.length); os.write(bytes); @@ -323,7 +386,7 @@ public class PigSplit extends InputSplit public void setMultiInputs(boolean b) { isMultiInputs = b; } - + /** * Returns true if the map has multiple inputs, else false * @return true if the map has multiple inputs, else false @@ -331,7 +394,7 @@ public class PigSplit extends InputSplit public boolean isMultiInputs() { return isMultiInputs; } - + @Override public Configuration getConf() { return conf; @@ -340,20 +403,20 @@ public class PigSplit extends InputSplit /** (non-Javadoc) * @see org.apache.hadoop.conf.Configurable#setConf(org.apache.hadoop.conf.Configuration) - * - * This will be called by + * + * This will be called by * {@link PigInputFormat#getSplits(org.apache.hadoop.mapreduce.JobContext)} - * to be used in {@link #write(DataOutput)} for serializing the + * to be used in {@link #write(DataOutput)} for serializing the * wrappedSplit - * - * This will be called by Hadoop in the backend to set the right Job + * + * This will be called by Hadoop in the backend to set the right Job * Configuration (hadoop will invoke this method because PigSplit implements * {@link Configurable} - we need this Configuration in readFields() to - * deserialize the wrappedSplit + * deserialize the wrappedSplit */ @Override public void setConf(Configuration conf) { - this.conf = conf; + this.conf = conf; } // package level access because we don't want LoadFunc implementations @@ -362,9 +425,9 @@ public class PigSplit extends InputSplit int getInputIndex() { return inputIndex; } - + /** - * + * * @return the number of wrapped splits */ public int getNumPaths() { @@ -402,7 +465,7 @@ public class PigSplit extends InputSplit wrappedSplits[i].getClass().getName() + "\n Locations:\n"); for (String location : wrappedSplits[i].getLocations()) st.append(" "+location+"\n"); - st.append("\n-----------------------\n"); + st.append("\n-----------------------\n"); } } catch (IOException e) { return null; @@ -419,7 +482,7 @@ public class PigSplit extends InputSplit public boolean disableCounter() { return disableCounter; } - + public void setCurrentIdx(int idx) { this.currentIdx = idx; } Modified: pig/branches/spark/src/org/apache/pig/backend/hadoop/executionengine/mapReduceLayer/PigTextRawComparator.java URL: http://svn.apache.org/viewvc/pig/branches/spark/src/org/apache/pig/backend/hadoop/executionengine/mapReduceLayer/PigTextRawComparator.java?rev=1733627&r1=1733626&r2=1733627&view=diff ============================================================================== --- pig/branches/spark/src/org/apache/pig/backend/hadoop/executionengine/mapReduceLayer/PigTextRawComparator.java (original) +++ pig/branches/spark/src/org/apache/pig/backend/hadoop/executionengine/mapReduceLayer/PigTextRawComparator.java Fri Mar 4 18:17:39 2016 @@ -21,13 +21,10 @@ import java.io.IOException; import org.apache.commons.logging.Log; import org.apache.commons.logging.LogFactory; - import org.apache.hadoop.conf.Configurable; import org.apache.hadoop.conf.Configuration; import org.apache.hadoop.io.Text; import org.apache.hadoop.io.WritableComparator; -import org.apache.hadoop.mapred.JobConf; - import org.apache.pig.impl.io.NullableText; import org.apache.pig.impl.util.ObjectSerializer; @@ -43,6 +40,7 @@ public class PigTextRawComparator extend } + @Override public void setConf(Configuration conf) { try { mAsc = (boolean[])ObjectSerializer.deserialize(conf.get( @@ -57,6 +55,7 @@ public class PigTextRawComparator extend } } + @Override public Configuration getConf() { return null; } @@ -66,6 +65,7 @@ public class PigTextRawComparator extend * then IntWritable.compare() is used. If both are null then the indices * are compared. Otherwise the null one is defined to be less. */ + @Override public int compare(byte[] b1, int s1, int l1, byte[] b2, int s2, int l2) { int rc = 0; @@ -73,8 +73,10 @@ public class PigTextRawComparator extend if (b1[s1] == 0 && b2[s2] == 0) { rc = mWrappedComp.compare(b1, s1 + 1, l1 - 2, b2, s2 + 1, l2 - 2); } else { - // For sorting purposes two nulls are equal. - if (b1[s1] != 0 && b2[s2] != 0) rc = 0; + // Two nulls are equal if indices are same + if (b1[s1] != 0 && b2[s2] != 0) { + rc = b1[s1 + 1] - b2[s2 + 1]; + } else if (b1[s1] != 0) rc = -1; else rc = 1; } @@ -82,6 +84,7 @@ public class PigTextRawComparator extend return rc; } + @Override public int compare(Object o1, Object o2) { NullableText nt1 = (NullableText)o1; NullableText nt2 = (NullableText)o2; @@ -91,8 +94,10 @@ public class PigTextRawComparator extend if (!nt1.isNull() && !nt2.isNull()) { rc = ((String)nt1.getValueAsPigType()).compareTo((String)nt2.getValueAsPigType()); } else { - // For sorting purposes two nulls are equal. - if (nt1.isNull() && nt2.isNull()) rc = 0; + // Two nulls are equal if indices are same + if (nt1.isNull() && nt2.isNull()) { + rc = nt1.getIndex() - nt2.getIndex(); + } else if (nt1.isNull()) rc = -1; else rc = 1; }
