Author: gates Date: Wed Jul 9 15:20:46 2008 New Revision: 675362 URL: http://svn.apache.org/viewvc?rev=675362&view=rev Log: PIG-292 Fixes many sort issues. Primary among them is that DataByteArray.compareTo now works properly and keyComparatorClass for the haddop job is now chosen correctly in order by cases.
Modified: incubator/pig/branches/types/src/org/apache/pig/backend/hadoop/executionengine/HExecutionEngine.java incubator/pig/branches/types/src/org/apache/pig/backend/local/executionengine/LocalExecutionEngine.java incubator/pig/branches/types/src/org/apache/pig/data/DataByteArray.java incubator/pig/branches/types/src/org/apache/pig/impl/builtin/FindQuantiles.java incubator/pig/branches/types/src/org/apache/pig/impl/logicalLayer/LOForEach.java incubator/pig/branches/types/src/org/apache/pig/impl/logicalLayer/LOSort.java incubator/pig/branches/types/src/org/apache/pig/impl/logicalLayer/LogToPhyTranslationVisitor.java incubator/pig/branches/types/src/org/apache/pig/impl/mapReduceLayer/JobControlCompiler.java incubator/pig/branches/types/src/org/apache/pig/impl/mapReduceLayer/Launcher.java incubator/pig/branches/types/src/org/apache/pig/impl/mapReduceLayer/LocalLauncher.java incubator/pig/branches/types/src/org/apache/pig/impl/mapReduceLayer/MRCompiler.java incubator/pig/branches/types/src/org/apache/pig/impl/mapReduceLayer/MapReduceLauncher.java incubator/pig/branches/types/src/org/apache/pig/impl/mapReduceLayer/PigMapReduce.java incubator/pig/branches/types/src/org/apache/pig/impl/mapReduceLayer/SortPartitioner.java incubator/pig/branches/types/src/org/apache/pig/impl/mapReduceLayer/plans/MROpPlanVisitor.java incubator/pig/branches/types/src/org/apache/pig/impl/mapReduceLayer/plans/MRPrinter.java incubator/pig/branches/types/src/org/apache/pig/impl/physicalLayer/relationalOperators/POSort.java incubator/pig/branches/types/test/org/apache/pig/test/TestDataModel.java Modified: incubator/pig/branches/types/src/org/apache/pig/backend/hadoop/executionengine/HExecutionEngine.java URL: http://svn.apache.org/viewvc/incubator/pig/branches/types/src/org/apache/pig/backend/hadoop/executionengine/HExecutionEngine.java?rev=675362&r1=675361&r2=675362&view=diff ============================================================================== --- incubator/pig/branches/types/src/org/apache/pig/backend/hadoop/executionengine/HExecutionEngine.java (original) +++ incubator/pig/branches/types/src/org/apache/pig/backend/hadoop/executionengine/HExecutionEngine.java Wed Jul 9 15:20:46 2008 @@ -235,6 +235,8 @@ public ExecJob execute(PhysicalPlan plan, String jobName) throws ExecException { try { + FileSpec spec = checkLeafIsStore(plan); + /* PhysicalOperator leaf = (PhysicalOperator)plan.getLeaves().get(0); FileSpec spec = null; if(!(leaf instanceof POStore)){ @@ -251,6 +253,7 @@ else{ spec = ((POStore)leaf).getSFile(); } + */ MapReduceLauncher launcher = new MapReduceLauncher(); boolean success = launcher.launchPig(plan, jobName, pigContext); @@ -277,8 +280,13 @@ try { PlanPrinter printer = new PlanPrinter(plan); printer.visit(); - System.out.println(); - } catch (VisitorException ve) { + stream.println(); + + checkLeafIsStore(plan); + + MapReduceLauncher launcher = new MapReduceLauncher(); + launcher.explain(plan, pigContext, stream); + } catch (Exception ve) { throw new RuntimeException(ve); } } @@ -459,7 +467,31 @@ InetAddress.getByName(parts[0]); return parts[0] + ":" + parts[1]; } - + + private FileSpec checkLeafIsStore(PhysicalPlan plan) throws ExecException { + try { + PhysicalOperator leaf = (PhysicalOperator)plan.getLeaves().get(0); + FileSpec spec = null; + if(!(leaf instanceof POStore)){ + String scope = leaf.getOperatorKey().getScope(); + POStore str = new POStore(new OperatorKey(scope, + NodeIdGenerator.getGenerator().getNextNodeId(scope))); + str.setPc(pigContext); + spec = new FileSpec(FileLocalizer.getTemporaryPath(null, + pigContext).toString(), + BinStorage.class.getName()); + str.setSFile(spec); + plan.addAsLeaf(str); + } else{ + spec = ((POStore)leaf).getSFile(); + } + return spec; + } catch (Exception e) { + throw new ExecException(e); + } + } + + } Modified: incubator/pig/branches/types/src/org/apache/pig/backend/local/executionengine/LocalExecutionEngine.java URL: http://svn.apache.org/viewvc/incubator/pig/branches/types/src/org/apache/pig/backend/local/executionengine/LocalExecutionEngine.java?rev=675362&r1=675361&r2=675362&view=diff ============================================================================== --- incubator/pig/branches/types/src/org/apache/pig/backend/local/executionengine/LocalExecutionEngine.java (original) +++ incubator/pig/branches/types/src/org/apache/pig/backend/local/executionengine/LocalExecutionEngine.java Wed Jul 9 15:20:46 2008 @@ -50,6 +50,7 @@ import org.apache.pig.impl.mapReduceLayer.LocalLauncher; import org.apache.pig.impl.physicalLayer.plans.PhysicalPlan; import org.apache.pig.impl.physicalLayer.PhysicalOperator; +import org.apache.pig.impl.physicalLayer.plans.PlanPrinter; import org.apache.pig.impl.physicalLayer.relationalOperators.POStore; import org.apache.pig.impl.plan.VisitorException; import java.util.Iterator; @@ -163,7 +164,16 @@ } public void explain(PhysicalPlan plan, PrintStream stream) { - // TODO FIX + try { + PlanPrinter printer = new PlanPrinter(plan); + printer.visit(); + stream.println(); + + LocalLauncher launcher = new LocalLauncher(); + launcher.explain(plan, pigContext, stream); + } catch (Exception ve) { + throw new RuntimeException(ve); + } } public Collection<ExecJob> runningJobs(Properties properties) throws ExecException { Modified: incubator/pig/branches/types/src/org/apache/pig/data/DataByteArray.java URL: http://svn.apache.org/viewvc/incubator/pig/branches/types/src/org/apache/pig/data/DataByteArray.java?rev=675362&r1=675361&r2=675362&view=diff ============================================================================== --- incubator/pig/branches/types/src/org/apache/pig/data/DataByteArray.java (original) +++ incubator/pig/branches/types/src/org/apache/pig/data/DataByteArray.java Wed Jul 9 15:20:46 2008 @@ -110,22 +110,29 @@ return new String(mData); } + /** + * Compare two byte arrays. Comparison is done first using byte values + * then length. So "g" will be greater than "abcdefg", but "hello worlds" + * is greater than "hello world". If the other object is not a + * DataByteArray, DataType.compare will be called. + * @param other Other object to compare to. + * @return -1 if less than, 1 if greater than, 0 if equal. + */ public int compareTo(Object other) { if (other instanceof DataByteArray) { DataByteArray dba = (DataByteArray)other; int mySz = mData.length; int tSz = dba.mData.length; - if (tSz < mySz) { - return 1; - } else if (tSz > mySz) { - return -1; - } else { - for (int i = 0; i < mySz; i++) { - if (mData[i] < dba.mData[i]) return -1; - else if (mData[i] > dba.mData[i]) return 1; - } - return 0; + int i; + for (i = 0; i < mySz; i++) { + // If the other has run out of characters, we're bigger. + if (i >= tSz) return 1; + if (mData[i] < dba.mData[i]) return -1; + else if (mData[i] > dba.mData[i]) return 1; } + // If the other still has characters left, it's greater + if (i < tSz) return -1; + return 0; } else { return DataType.compare(this, other); } Modified: incubator/pig/branches/types/src/org/apache/pig/impl/builtin/FindQuantiles.java URL: http://svn.apache.org/viewvc/incubator/pig/branches/types/src/org/apache/pig/impl/builtin/FindQuantiles.java?rev=675362&r1=675361&r2=675362&view=diff ============================================================================== --- incubator/pig/branches/types/src/org/apache/pig/impl/builtin/FindQuantiles.java (original) +++ incubator/pig/branches/types/src/org/apache/pig/impl/builtin/FindQuantiles.java Wed Jul 9 15:20:46 2008 @@ -18,6 +18,7 @@ package org.apache.pig.impl.builtin; import java.io.IOException; +import java.util.Comparator; import java.util.Iterator; import org.apache.pig.EvalFunc; @@ -30,6 +31,14 @@ public class FindQuantiles extends EvalFunc<DataBag>{ BagFactory mBagFactory = BagFactory.getInstance(); + private class SortComparator implements Comparator<Tuple> { + public int compare(Tuple t1, Tuple t2) { + return t1.compareTo(t2); + } + } + + private Comparator<Tuple> mComparator = new SortComparator(); + /** * first field in the input tuple is the number of quantiles to generate * second field is the *sorted* bag of samples @@ -47,7 +56,8 @@ ioe.initCause(e); throw ioe; } - DataBag output = mBagFactory.newDefaultBag(); + // TODO If user provided a comparator we should be using that. + DataBag output = mBagFactory.newSortedBag(mComparator); long numSamples = samples.size(); Modified: incubator/pig/branches/types/src/org/apache/pig/impl/logicalLayer/LOForEach.java URL: http://svn.apache.org/viewvc/incubator/pig/branches/types/src/org/apache/pig/impl/logicalLayer/LOForEach.java?rev=675362&r1=675361&r2=675362&view=diff ============================================================================== --- incubator/pig/branches/types/src/org/apache/pig/impl/logicalLayer/LOForEach.java (original) +++ incubator/pig/branches/types/src/org/apache/pig/impl/logicalLayer/LOForEach.java Wed Jul 9 15:20:46 2008 @@ -93,21 +93,21 @@ @Override public Schema getSchema() throws FrontendException { - log.info("Entering getSchema"); + log.trace("Entering getSchema"); if (!mIsSchemaComputed) { List<Schema.FieldSchema> fss = new ArrayList<Schema.FieldSchema>( mForEachPlans.size()); for (LogicalPlan plan : mForEachPlans) { - log.info("Number of leaves in " + plan + " = " + plan.getLeaves().size()); + log.debug("Number of leaves in " + plan + " = " + plan.getLeaves().size()); for(int i = 0; i < plan.getLeaves().size(); ++i) { - log.info("Leaf" + i + "= " + plan.getLeaves().get(i)); + log.debug("Leaf" + i + "= " + plan.getLeaves().get(i)); } //LogicalOperator op = plan.getRoots().get(0); LogicalOperator op = plan.getLeaves().get(0); - log.info("op: " + op.getClass().getName() + " " + op); + log.debug("op: " + op.getClass().getName() + " " + op); } - log.info("Printed the leaves of the generate plans"); + log.debug("Printed the leaves of the generate plans"); Map<Schema.FieldSchema, String> flattenAlias = new HashMap<Schema.FieldSchema, String>(); Map<String, Boolean> inverseFlattenAlias = new HashMap<String, Boolean>(); @@ -116,19 +116,19 @@ for (int planCtr = 0; planCtr < mForEachPlans.size(); ++planCtr) { LogicalPlan plan = mForEachPlans.get(planCtr); LogicalOperator op = plan.getLeaves().get(0); - log.info("op: " + op.getClass().getName() + " " + op); - log.info("Flatten: " + mFlatten.get(planCtr)); + log.debug("op: " + op.getClass().getName() + " " + op); + log.debug("Flatten: " + mFlatten.get(planCtr)); Schema.FieldSchema planFs; try { planFs = ((ExpressionOperator)op).getFieldSchema(); - log.info("planFs: " + planFs); + log.debug("planFs: " + planFs); if(null != planFs) { String outerCanonicalAlias = op.getAlias(); if(null == outerCanonicalAlias) { outerCanonicalAlias = planFs.alias; } - log.info("Outer canonical alias: " + outerCanonicalAlias); + log.debug("Outer canonical alias: " + outerCanonicalAlias); if(mFlatten.get(planCtr)) { //need to extract the children and create the aliases //assumption here is that flatten is only for one column @@ -137,8 +137,8 @@ Schema s = planFs.schema; if(null != s) { for(Schema.FieldSchema fs: s.getFields()) { - log.info("fs: " + fs); - log.info("fs.alias: " + fs.alias); + log.debug("fs: " + fs); + log.debug("fs.alias: " + fs.alias); String innerCanonicalAlias = fs.alias; if((null != outerCanonicalAlias) && (null != innerCanonicalAlias)) { String disambiguatorAlias = outerCanonicalAlias + "::" + innerCanonicalAlias; @@ -196,19 +196,19 @@ //check for duplicate column names and throw an error if there are duplicates //ensure that flatten gets rid of duplicate column names when the checks are //being done - log.info(" flattenAlias: " + flattenAlias); - log.info(" inverseFlattenAlias: " + inverseFlattenAlias); - log.info(" aliases: " + aliases); - log.info(" fss.size: " + fss.size()); + log.debug(" flattenAlias: " + flattenAlias); + log.debug(" inverseFlattenAlias: " + inverseFlattenAlias); + log.debug(" aliases: " + aliases); + log.debug(" fss.size: " + fss.size()); boolean duplicates = false; Map<String, Integer> duplicateAliases = new HashMap<String, Integer>(); for(String alias: aliases.keySet()) { Integer count = aliases.get(alias); if(count > 1) {//not checking for null here as counts are intitalized to 1 Boolean inFlatten = false; - log.info("inFlatten: " + inFlatten + " inverseFlattenAlias: " + inverseFlattenAlias); + log.debug("inFlatten: " + inFlatten + " inverseFlattenAlias: " + inverseFlattenAlias); inFlatten = inverseFlattenAlias.get(alias); - log.info("inFlatten: " + inFlatten + " inverseFlattenAlias: " + inverseFlattenAlias); + log.debug("inFlatten: " + inFlatten + " inverseFlattenAlias: " + inverseFlattenAlias); if((null == inFlatten) || (!inFlatten)) { duplicates = true; duplicateAliases.put(alias, count); @@ -237,14 +237,14 @@ String alias = flattenAlias.get(fs); Integer count = aliases.get(alias); if (null == count) count = 1; - log.info("alias: " + alias); + log.debug("alias: " + alias); if((null != alias) && (count == 1)) { mSchema.addAlias(alias, fs); } } mIsSchemaComputed = true; } - log.info("Exiting getSchema"); + log.trace("Exiting getSchema"); return mSchema; } } Modified: incubator/pig/branches/types/src/org/apache/pig/impl/logicalLayer/LOSort.java URL: http://svn.apache.org/viewvc/incubator/pig/branches/types/src/org/apache/pig/impl/logicalLayer/LOSort.java?rev=675362&r1=675361&r2=675362&view=diff ============================================================================== --- incubator/pig/branches/types/src/org/apache/pig/impl/logicalLayer/LOSort.java (original) +++ incubator/pig/branches/types/src/org/apache/pig/impl/logicalLayer/LOSort.java Wed Jul 9 15:20:46 2008 @@ -88,6 +88,10 @@ mSortFunc = func; } + public boolean isStar() { + return mIsStar; + } + public void setStar(boolean b) { mIsStar = b; } Modified: incubator/pig/branches/types/src/org/apache/pig/impl/logicalLayer/LogToPhyTranslationVisitor.java URL: http://svn.apache.org/viewvc/incubator/pig/branches/types/src/org/apache/pig/impl/logicalLayer/LogToPhyTranslationVisitor.java?rev=675362&r1=675361&r2=675362&view=diff ============================================================================== --- incubator/pig/branches/types/src/org/apache/pig/impl/logicalLayer/LogToPhyTranslationVisitor.java (original) +++ incubator/pig/branches/types/src/org/apache/pig/impl/logicalLayer/LogToPhyTranslationVisitor.java Wed Jul 9 15:20:46 2008 @@ -108,6 +108,7 @@ } catch (PlanException e) { log.error("Invalid physical operators in the physical plan" + e.getMessage()); + throw new VisitorException(e); } } } @@ -137,6 +138,7 @@ } catch (PlanException e) { log.error("Invalid physical operators in the physical plan" + e.getMessage()); + throw new VisitorException(e); } } } @@ -165,6 +167,7 @@ } catch (PlanException e) { log.error("Invalid physical operators in the physical plan" + e.getMessage()); + throw new VisitorException(e); } } } @@ -193,6 +196,7 @@ } catch (PlanException e) { log.error("Invalid physical operators in the physical plan" + e.getMessage()); + throw new VisitorException(e); } } } @@ -221,6 +225,7 @@ } catch (PlanException e) { log.error("Invalid physical operators in the physical plan" + e.getMessage()); + throw new VisitorException(e); } } } @@ -249,6 +254,7 @@ } catch (PlanException e) { log.error("Invalid physical operators in the physical plan" + e.getMessage()); + throw new VisitorException(e); } } } @@ -276,6 +282,7 @@ } catch (PlanException e) { log.error("Invalid physical operators in the physical plan" + e.getMessage()); + throw new VisitorException(e); } } } @@ -303,6 +310,7 @@ } catch (PlanException e) { log.error("Invalid physical operators in the physical plan" + e.getMessage()); + throw new VisitorException(e); } } } @@ -330,6 +338,7 @@ } catch (PlanException e) { log.error("Invalid physical operators in the physical plan" + e.getMessage()); + throw new VisitorException(e); } } } @@ -357,6 +366,7 @@ } catch (PlanException e) { log.error("Invalid physical operators in the physical plan" + e.getMessage()); + throw new VisitorException(e); } } } @@ -384,6 +394,7 @@ } catch (PlanException e) { log.error("Invalid physical operators in the physical plan" + e.getMessage()); + throw new VisitorException(e); } } } @@ -411,6 +422,7 @@ } catch (PlanException e) { log.error("Invalid physical operators in the physical plan" + e.getMessage()); + throw new VisitorException(e); } } } @@ -434,6 +446,7 @@ currentPlan.connect(from, exprOp); } catch (PlanException e) { log.error("Invalid physical operators in the physical plan" + e.getMessage()); + throw new VisitorException(e); } } } @@ -457,6 +470,7 @@ currentPlan.connect(from, exprOp); } catch (PlanException e) { log.error("Invalid physical operators in the physical plan" + e.getMessage()); + throw new VisitorException(e); } } } @@ -478,6 +492,7 @@ currentPlan.connect(from, exprOp); } catch (PlanException e) { log.error("Invalid physical operators in the physical plan" + e.getMessage()); + throw new VisitorException(e); } } @@ -501,6 +516,7 @@ } catch (PlanException e1) { log.error("Invalid physical operators in the physical plan" + e1.getMessage()); + throw new VisitorException(e1); } int count = 0; @@ -543,6 +559,7 @@ } catch (PlanException e) { log.error("Invalid physical operators in the physical plan" + e.getMessage()); + throw new VisitorException(e); } } @@ -582,6 +599,7 @@ } catch (PlanException e) { log.error("Invalid physical operators in the physical plan" + e.getMessage()); + throw new VisitorException(e); } } @@ -612,6 +630,7 @@ log.error("Invalid physical operators in the physical plan" + e.getMessage()); + throw new VisitorException(e); } } } @@ -657,6 +676,7 @@ } catch (PlanException e) { log.error("Invalid physical operators in the physical plan" + e.getMessage()); + throw new VisitorException(e); } } @@ -703,6 +723,7 @@ currentPlan.connect(from, sort); } catch (PlanException e) { log.error("Invalid physical operator in the plan" + e.getMessage()); + throw new VisitorException(e); } sort.setResultType(s.getType()); @@ -727,6 +748,7 @@ currentPlan.connect(from, physOp); } catch (PlanException e) { log.error("Invalid physical operator in the plan" + e.getMessage()); + throw new VisitorException(e); } } @@ -744,6 +766,7 @@ currentPlan.connect(from, physOp); } catch (PlanException e) { log.error("Invalid physical operator in the plan" + e.getMessage()); + throw new VisitorException(e); } } @@ -772,6 +795,7 @@ currentPlan.connect(from, physOp); } catch (PlanException e) { log.error("Invalid physical operator in the plan" + e.getMessage()); + throw new VisitorException(e); } } @@ -799,6 +823,7 @@ } catch (PlanException e) { log.error("Invalid physical operator in the plan" + e.getMessage()); + throw new VisitorException(e); } } LogToPhyMap.put(func, p); @@ -834,6 +859,7 @@ currentPlan.connect(from, store); } catch (PlanException e) { log.error("Invalid physical operator in the plan" + e.getMessage()); + throw new VisitorException(e); } LogToPhyMap.put(loStore, store); } @@ -878,6 +904,7 @@ } catch (PlanException e) { log.error("Invalid physical operator in the plan" + e.getMessage()); + throw new VisitorException(e); } } @@ -899,6 +926,7 @@ currentPlan.connect(from, physOp); } catch (PlanException e) { log.error("Invalid physical operator in the plan" + e.getMessage()); + throw new VisitorException(e); } } @@ -918,6 +946,7 @@ currentPlan.connect(from, physOp); } catch (PlanException e) { log.error("Invalid physical operator in the plan" + e.getMessage()); + throw new VisitorException(e); } } @@ -939,6 +968,7 @@ currentPlan.connect(from, physOp); } catch (PlanException e) { log.error("Invalid physical operator in the plan" + e.getMessage()); + throw new VisitorException(e); } } @@ -959,6 +989,7 @@ currentPlan.connect(from, physOp); } catch (PlanException e) { log.error("Invalid physical operator in the plan" + e.getMessage()); + throw new VisitorException(e); } } @@ -980,6 +1011,7 @@ } catch (PlanException e) { log.error("Invalid physical operator in the plan" + e.getMessage()); + throw new VisitorException(e); } } } Modified: incubator/pig/branches/types/src/org/apache/pig/impl/mapReduceLayer/JobControlCompiler.java URL: http://svn.apache.org/viewvc/incubator/pig/branches/types/src/org/apache/pig/impl/mapReduceLayer/JobControlCompiler.java?rev=675362&r1=675361&r2=675362&view=diff ============================================================================== --- incubator/pig/branches/types/src/org/apache/pig/impl/mapReduceLayer/JobControlCompiler.java (original) +++ incubator/pig/branches/types/src/org/apache/pig/impl/mapReduceLayer/JobControlCompiler.java Wed Jul 9 15:20:46 2008 @@ -25,13 +25,24 @@ import java.util.List; import java.util.Map; +import org.apache.commons.logging.Log; +import org.apache.commons.logging.LogFactory; + import org.apache.hadoop.conf.Configuration; import org.apache.hadoop.fs.Path; +import org.apache.hadoop.io.BooleanWritable; +import org.apache.hadoop.io.BytesWritable; +import org.apache.hadoop.io.FloatWritable; +import org.apache.hadoop.io.IntWritable; +import org.apache.hadoop.io.LongWritable; +import org.apache.hadoop.io.Text; import org.apache.hadoop.io.WritableComparable; import org.apache.hadoop.io.WritableComparator; import org.apache.hadoop.mapred.JobConf; import org.apache.hadoop.mapred.jobcontrol.Job; import org.apache.hadoop.mapred.jobcontrol.JobControl; +import org.apache.pig.data.BagFactory; +import org.apache.pig.data.DataByteArray; import org.apache.pig.ComparisonFunc; import org.apache.pig.data.DataType; import org.apache.pig.data.IndexedTuple; @@ -61,6 +72,8 @@ Configuration conf; PigContext pigContext; + private final Log log = LogFactory.getLog(getClass()); + /** * The map between MapReduceOpers and their corresponding Jobs */ @@ -257,9 +270,7 @@ jobConf.set("pig.reduce.package", ObjectSerializer.serialize(pack)); Class<? extends WritableComparable> keyClass = DataType.getWritableComparableTypes(pack.getKeyType()).getClass(); jobConf.setOutputKeyClass(keyClass); - if(keyClass.equals(TupleFactory.getInstance().tupleClass())){ - jobConf.setOutputKeyComparatorClass(PigWritableComparator.class); - } + selectComparator(mro, pack.getKeyType(), jobConf); jobConf.setOutputValueClass(IndexedTuple.class); } @@ -290,12 +301,128 @@ } public static class PigWritableComparator extends WritableComparator { - public PigWritableComparator() { - super(TupleFactory.getInstance().tupleClass()); + protected PigWritableComparator(Class c) { + super(c); } public int compare(byte[] b1, int s1, int l1, byte[] b2, int s2, int l2){ return WritableComparator.compareBytes(b1, s1, l1, b2, s2, l2); } } + + public static class PigIntWritableComparator extends PigWritableComparator { + public PigIntWritableComparator() { + super(IntWritable.class); + } + } + + public static class PigLongWritableComparator extends PigWritableComparator { + public PigLongWritableComparator() { + super(LongWritable.class); + } + } + + public static class PigFloatWritableComparator extends PigWritableComparator { + public PigFloatWritableComparator() { + super(FloatWritable.class); + } + } + + /* + public static class PigDoubleWritableComparator extends PigWritableComparator { + public PigDoubleWritableComparator() { + super(Double.class); + } + } + */ + + public static class PigCharArrayWritableComparator extends PigWritableComparator { + public PigCharArrayWritableComparator() { + super(Text.class); + } + } + + public static class PigDBAWritableComparator extends PigWritableComparator { + public PigDBAWritableComparator() { + super(BytesWritable.class); + } + } + + public static class PigTupleWritableComparator extends PigWritableComparator { + public PigTupleWritableComparator() { + super(TupleFactory.getInstance().tupleClass()); + } + } + + public static class PigBagWritableComparator extends PigWritableComparator { + public PigBagWritableComparator() { + super(BagFactory.getInstance().newDefaultBag().getClass()); + } + } + + private void selectComparator( + MapReduceOper mro, + byte keyType, + JobConf jobConf) throws JobCreationException { + // If this operator is involved in an order by, use the native + // comparators. Otherwise use bytewise comparison. Have to + // look at the next operator too because if we're the quantile + // operation we need to use the native comparators. + boolean involved = false; + if (mro.isGlobalSort()) { + involved = true; + } else { + List<MapReduceOper> succs = plan.getSuccessors(mro); + if (succs != null) { + MapReduceOper succ = succs.get(0); + if (succ.isGlobalSort()) involved = true; + } + } + if (!involved) { + switch (keyType) { + case DataType.INTEGER: + jobConf.setOutputKeyComparatorClass(PigIntWritableComparator.class); + break; + + case DataType.LONG: + jobConf.setOutputKeyComparatorClass(PigLongWritableComparator.class); + break; + + case DataType.FLOAT: + jobConf.setOutputKeyComparatorClass(PigFloatWritableComparator.class); + break; + + case DataType.DOUBLE: + //jobConf.setOutputKeyComparatorClass(PigDoubleWritableComparator.class); + log.error("Waiting for Hadoop to support DoubleWritable"); + throw new JobCreationException("Waiting for Hadoop to support DoubleWritable"); + + case DataType.CHARARRAY: + jobConf.setOutputKeyComparatorClass(PigCharArrayWritableComparator.class); + break; + + case DataType.BYTEARRAY: + jobConf.setOutputKeyComparatorClass(PigDBAWritableComparator.class); + break; + + case DataType.MAP: + log.error("Using Map as key not supported."); + throw new JobCreationException("Using Map as key not supported"); + + case DataType.TUPLE: + jobConf.setOutputKeyComparatorClass(PigTupleWritableComparator.class); + break; + + case DataType.BAG: + jobConf.setOutputKeyComparatorClass(PigBagWritableComparator.class); + break; + + default: + throw new RuntimeException("Forgot case for type " + + DataType.findTypeName(keyType)); + } + + } + } + } Modified: incubator/pig/branches/types/src/org/apache/pig/impl/mapReduceLayer/Launcher.java URL: http://svn.apache.org/viewvc/incubator/pig/branches/types/src/org/apache/pig/impl/mapReduceLayer/Launcher.java?rev=675362&r1=675361&r2=675362&view=diff ============================================================================== --- incubator/pig/branches/types/src/org/apache/pig/impl/mapReduceLayer/Launcher.java (original) +++ incubator/pig/branches/types/src/org/apache/pig/impl/mapReduceLayer/Launcher.java Wed Jul 9 15:20:46 2008 @@ -1,6 +1,7 @@ package org.apache.pig.impl.mapReduceLayer; import java.io.IOException; +import java.io.PrintStream; import java.util.List; import org.apache.commons.logging.Log; @@ -62,6 +63,22 @@ public abstract boolean launchPig(PhysicalPlan php, String grpName, PigContext pc) throws PlanException, VisitorException, IOException, ExecException, JobCreationException; + + /** + * Explain how a pig job will be executed on the underlying + * infrastructure. + * @param pp PhysicalPlan to explain + * @param pc PigContext to use for configuration + * @param ps PrintStream to write output on. + * @throws VisitorException + * @throws IOException + */ + public abstract void explain( + PhysicalPlan pp, + PigContext pc, + PrintStream ps) throws PlanException, + VisitorException, + IOException; protected boolean isComplete(double prog){ return (int)(Math.ceil(prog)) == (int)1; Modified: incubator/pig/branches/types/src/org/apache/pig/impl/mapReduceLayer/LocalLauncher.java URL: http://svn.apache.org/viewvc/incubator/pig/branches/types/src/org/apache/pig/impl/mapReduceLayer/LocalLauncher.java?rev=675362&r1=675361&r2=675362&view=diff ============================================================================== --- incubator/pig/branches/types/src/org/apache/pig/impl/mapReduceLayer/LocalLauncher.java (original) +++ incubator/pig/branches/types/src/org/apache/pig/impl/mapReduceLayer/LocalLauncher.java Wed Jul 9 15:20:46 2008 @@ -1,6 +1,7 @@ package org.apache.pig.impl.mapReduceLayer; import java.io.IOException; +import java.io.PrintStream; import java.util.List; import org.apache.commons.logging.Log; @@ -13,6 +14,7 @@ import org.apache.pig.backend.executionengine.ExecException; import org.apache.pig.impl.PigContext; import org.apache.pig.impl.mapReduceLayer.plans.MROperPlan; +import org.apache.pig.impl.mapReduceLayer.plans.MRPrinter; import org.apache.pig.impl.physicalLayer.plans.PhysicalPlan; import org.apache.pig.impl.plan.PlanException; import org.apache.pig.impl.plan.VisitorException; @@ -79,7 +81,21 @@ return isComplete(lastProg); } - + + @Override + public void explain(PhysicalPlan php, + PigContext pc, + PrintStream ps) throws PlanException, + VisitorException, + IOException { + MRCompiler comp = new MRCompiler(php, pc); + comp.compile(); + MROperPlan mrp = comp.getMRPlan(); + + MRPrinter printer = new MRPrinter(ps, mrp); + printer.visit(); + } + //A purely testing method. Not to be used elsewhere public boolean launchPigWithCombinePlan(PhysicalPlan php, String grpName, PigContext pc, PhysicalPlan combinePlan) throws PlanException, Modified: incubator/pig/branches/types/src/org/apache/pig/impl/mapReduceLayer/MRCompiler.java URL: http://svn.apache.org/viewvc/incubator/pig/branches/types/src/org/apache/pig/impl/mapReduceLayer/MRCompiler.java?rev=675362&r1=675361&r2=675362&view=diff ============================================================================== --- incubator/pig/branches/types/src/org/apache/pig/impl/mapReduceLayer/MRCompiler.java (original) +++ incubator/pig/branches/types/src/org/apache/pig/impl/mapReduceLayer/MRCompiler.java Wed Jul 9 15:20:46 2008 @@ -179,14 +179,6 @@ */ public MROperPlan compile() throws IOException, PlanException, VisitorException { List<PhysicalOperator> leaves = plan.getLeaves(); - /*for (PhysicalOperator operator : leaves) { - compile(operator); - if (!curMROp.isMapDone()) { - curMROp.setMapDone(true); - } else if (!curMROp.isReduceDone()) { - curMROp.setReduceDone(true); - } - }*/ POStore store = (POStore)leaves.get(0); compile(store); Modified: incubator/pig/branches/types/src/org/apache/pig/impl/mapReduceLayer/MapReduceLauncher.java URL: http://svn.apache.org/viewvc/incubator/pig/branches/types/src/org/apache/pig/impl/mapReduceLayer/MapReduceLauncher.java?rev=675362&r1=675361&r2=675362&view=diff ============================================================================== --- incubator/pig/branches/types/src/org/apache/pig/impl/mapReduceLayer/MapReduceLauncher.java (original) +++ incubator/pig/branches/types/src/org/apache/pig/impl/mapReduceLayer/MapReduceLauncher.java Wed Jul 9 15:20:46 2008 @@ -1,6 +1,7 @@ package org.apache.pig.impl.mapReduceLayer; import java.io.IOException; +import java.io.PrintStream; import java.util.List; import org.apache.commons.logging.Log; @@ -15,6 +16,7 @@ import org.apache.pig.backend.hadoop.executionengine.HExecutionEngine; import org.apache.pig.impl.PigContext; import org.apache.pig.impl.mapReduceLayer.plans.MROperPlan; +import org.apache.pig.impl.mapReduceLayer.plans.MRPrinter; import org.apache.pig.impl.physicalLayer.PhysicalOperator; import org.apache.pig.impl.physicalLayer.plans.PhysicalPlan; import org.apache.pig.impl.plan.PlanException; @@ -83,4 +85,19 @@ return isComplete(lastProg); } + + @Override + public void explain(PhysicalPlan php, + PigContext pc, + PrintStream ps) throws PlanException, + VisitorException, + IOException { + MRCompiler comp = new MRCompiler(php, pc); + comp.compile(); + MROperPlan mrp = comp.getMRPlan(); + + MRPrinter printer = new MRPrinter(ps, mrp); + printer.visit(); + } + } Modified: incubator/pig/branches/types/src/org/apache/pig/impl/mapReduceLayer/PigMapReduce.java URL: http://svn.apache.org/viewvc/incubator/pig/branches/types/src/org/apache/pig/impl/mapReduceLayer/PigMapReduce.java?rev=675362&r1=675361&r2=675362&view=diff ============================================================================== --- incubator/pig/branches/types/src/org/apache/pig/impl/mapReduceLayer/PigMapReduce.java (original) +++ incubator/pig/branches/types/src/org/apache/pig/impl/mapReduceLayer/PigMapReduce.java Wed Jul 9 15:20:46 2008 @@ -53,7 +53,7 @@ * key and indexed tuple and collect it into the output * collector. * - * The shuffle and sort phase sorts these key & indexed tuples + * The shuffle and sort phase sorts these key & indexed tuples * and creates key, List<IndexedTuple> and passes the key and * iterator to the list. The deserialized POPackage operator * is used to package the key, List<IndexedTuple> into pigKey, Modified: incubator/pig/branches/types/src/org/apache/pig/impl/mapReduceLayer/SortPartitioner.java URL: http://svn.apache.org/viewvc/incubator/pig/branches/types/src/org/apache/pig/impl/mapReduceLayer/SortPartitioner.java?rev=675362&r1=675361&r2=675362&view=diff ============================================================================== --- incubator/pig/branches/types/src/org/apache/pig/impl/mapReduceLayer/SortPartitioner.java (original) +++ incubator/pig/branches/types/src/org/apache/pig/impl/mapReduceLayer/SortPartitioner.java Wed Jul 9 15:20:46 2008 @@ -29,6 +29,7 @@ import org.apache.hadoop.mapred.Partitioner; import org.apache.pig.backend.executionengine.ExecException; import org.apache.pig.builtin.BinStorage; +import org.apache.pig.data.DataBag; import org.apache.pig.data.Tuple; import org.apache.pig.impl.io.BufferedPositionedInputStream; import org.apache.pig.impl.io.FileLocalizer; @@ -63,10 +64,18 @@ t = loader.getNext(); if (t==null) break; - quantiles.add(t); + // Need to strip the outer tuple and bag. + Object o = t.get(0); + if (o instanceof DataBag) { + for (Tuple it : (DataBag)o) { + quantiles.add(it); + } + } else { + quantiles.add(t); + } } this.quantiles = quantiles.toArray(new Tuple[0]); - }catch (IOException e){ + }catch (Exception e){ throw new RuntimeException(e); } Modified: incubator/pig/branches/types/src/org/apache/pig/impl/mapReduceLayer/plans/MROpPlanVisitor.java URL: http://svn.apache.org/viewvc/incubator/pig/branches/types/src/org/apache/pig/impl/mapReduceLayer/plans/MROpPlanVisitor.java?rev=675362&r1=675361&r2=675362&view=diff ============================================================================== --- incubator/pig/branches/types/src/org/apache/pig/impl/mapReduceLayer/plans/MROpPlanVisitor.java (original) +++ incubator/pig/branches/types/src/org/apache/pig/impl/mapReduceLayer/plans/MROpPlanVisitor.java Wed Jul 9 15:20:46 2008 @@ -32,13 +32,7 @@ super(plan, walker); } - @Override - public void visit() throws VisitorException { - // TODO Auto-generated method stub - - } - - public void visitMROp(MapReduceOper mr) { + public void visitMROp(MapReduceOper mr) throws VisitorException { // do nothing } Modified: incubator/pig/branches/types/src/org/apache/pig/impl/mapReduceLayer/plans/MRPrinter.java URL: http://svn.apache.org/viewvc/incubator/pig/branches/types/src/org/apache/pig/impl/mapReduceLayer/plans/MRPrinter.java?rev=675362&r1=675361&r2=675362&view=diff ============================================================================== --- incubator/pig/branches/types/src/org/apache/pig/impl/mapReduceLayer/plans/MRPrinter.java (original) +++ incubator/pig/branches/types/src/org/apache/pig/impl/mapReduceLayer/plans/MRPrinter.java Wed Jul 9 15:20:46 2008 @@ -22,6 +22,8 @@ import java.util.List; import java.io.PrintStream; +import org.apache.pig.impl.mapReduceLayer.MapReduceOper; +import org.apache.pig.impl.physicalLayer.plans.PlanPrinter; import org.apache.pig.impl.plan.DepthFirstWalker; import org.apache.pig.impl.plan.VisitorException; @@ -38,252 +40,39 @@ * @param plan MR plan to print */ public MRPrinter(PrintStream ps, MROperPlan plan) { - super(plan, new DepthFirstWalker(plan)); - } - - /* TODO FIX - public void visit(LOAdd a) throws VisitorException { - visitBinary(a, "+"); - } - - public void visit(LOAnd a) throws VisitorException { - visitBinary(a, "AND"); - } - - public void visit(LOBinCond bc) throws VisitorException { - print(bc); - mStream.print(" COND: ("); - bc.getCond().visit(this); - mStream.print(") TRUE: ("); - bc.getLhsOp().visit(this); - mStream.print(") FALSE ("); - bc.getRhsOp().visit(this); - mStream.print(")"); - } - - public void visit(LOCogroup g) throws VisitorException { - print(g); - mStream.print("GROUP BY PLANS:"); - MultiMap<LogicalOperator, LogicalPlan> plans = g.getGroupByPlans(); - for (LogicalOperator lo : plans.keySet()) { - // Visit the associated plans - for (LogicalPlan plan : plans.get(lo)) { - mIndent++; - pushWalker(new DepthFirstWalker(plan)); - visit(); - popWalker(); - mIndent--; - } - mStream.println(); - } - // Visit input operators - for (LogicalOperator lo : plans.keySet()) { - // Visit the operator - lo.visit(this); - } - } - - public void visit(LOConst c) throws VisitorException { - print(c); - mStream.print(" VALUE (" + c.getValue() + ")"); - } - - public void visit(LOCross c) throws VisitorException { - print(c); - mStream.println(); - super.visit(c); - } - - public void visit(LODistinct d) throws VisitorException { - print(d); - mStream.println(); - super.visit(d); - } - - public void visit(LODivide d) throws VisitorException { - visitBinary(d, "/"); - } - - public void visit(LOEqual e) throws VisitorException { - visitBinary(e, "=="); - } - - public void visit(LOFilter f) throws VisitorException { - print(f); - mStream.print(" COMP: "); - mIndent++; - pushWalker(new DepthFirstWalker(f.getComparisonPlan())); - visit(); - mIndent--; - mStream.println(); - f.getInput().visit(this); - } - - public void visit(LOForEach f) throws VisitorException { - print(f); - mStream.print(" PLAN: "); - mIndent++; - pushWalker(new DepthFirstWalker(f.getForEachPlan())); - visit(); - mIndent--; - mStream.println(); - // Visit our input - mPlan.getPredecessors((LogicalOperator)f).get(0).visit(this); - } - - public void visit(LOGreaterThan gt) throws VisitorException { - visitBinary(gt, ">"); - } - - public void visit(LOGreaterThanEqual gte) throws VisitorException { - visitBinary(gte, ">="); - } - - public void visit(LOLesserThan lt) throws VisitorException { - visitBinary(lt, "<"); - } - - public void visit(LOLesserThanEqual lte) throws VisitorException { - visitBinary(lte, "<="); - } - - public void visit(LOLoad load) throws VisitorException { - print(load); - mStream.print(" FILE: " + load.getInputFile().getFileName()); - mStream.print(" FUNC: " + load.getLoadFunc().getClass().getName()); - mStream.println(); - } - - public void visit(LOMapLookup mlu) throws VisitorException { - print(mlu); - mStream.print("("); - mlu.getMap().visit(this); - mStream.print(")# " + mlu.getKey()); - } - - public void visit(LOMod m) throws VisitorException { - visitBinary(m, "MOD"); - } - - public void visit(LOMultiply m) throws VisitorException { - visitBinary(m, "*"); - } - - public void visit(LONegative n) throws VisitorException { - visitUnary(n, "-"); - } - - public void visit(LONot n) throws VisitorException { - visitUnary(n, "NOT"); - } - - public void visit(LONotEqual ne) throws VisitorException { - visitBinary(ne, "!="); - } - - public void visit(LOOr or) throws VisitorException { - visitBinary(or, "OR"); - } - - public void visit(LOProject p) throws VisitorException { - print(p); - if (p.isStar()) { - mStream.print(" ALL "); - } else { - List<Integer> cols = p.getProjection(); - mStream.print(" COL"); - if (cols.size() > 1) mStream.print("S"); - mStream.print(" ("); - for (int i = 0; i < cols.size(); i++) { - if (i > 0) mStream.print(", "); - mStream.print(cols.get(i)); - } - mStream.print(")"); - } - mStream.print(" FROM "); - if (p.getSentinel()) { - // This project is connected to some other relation, don't follow - // that path or we'll cycle in the graph. - p.getExpression().name(); - } else { - mIndent++; - p.getExpression().visit(this); - mIndent--; - } - } - - public void visit(LORegexp r) throws VisitorException { - print(r); - mStream.print(" REGEX (" + r.getRegexp() + ") LOOKING IN ("); - r.getOperand().visit(this); - mStream.print(")"); - } - - private void print(LogicalOperator lo, String name) { - List<EvalSpec> empty = new ArrayList<EvalSpec>(); - print(lo, name, empty); - } - - private void visitBinary( - BinaryExpressionOperator b, - String op) throws VisitorException { - print(b); - mStream.print(" ("); - b.getLhsOperand().visit(this); - mStream.print(") " + op + " ("); - b.getRhsOperand().visit(this); - mStream.print(") "); - } - - private void visitUnary( - UnaryExpressionOperator e, - String op) throws VisitorException { - print(e); - mStream.print(op + " ("); - e.getOperand().visit(this); - mStream.print(") "); - } - - private void print(LogicalOperator lo) { - for (int i = 0; i < mIndent; i++) mStream.print(" "); - - printName(lo); - - if (!(lo instanceof ExpressionOperator)) { - mStream.print("Inputs: "); - for (LogicalOperator predecessor : mPlan.getPredecessors(lo)) { - printName(predecessor); - } - mStream.print("Schema: "); - try { - printSchema(lo.getSchema()); - } catch (FrontendException fe) { - // ignore it, nothing we can do - mStream.print("()"); - } - } - mStream.print(" : "); - } - - private void printName(LogicalOperator lo) { - mStream.println(lo.name() + " key(" + lo.getOperatorKey().scope + - ", " + lo.getOperatorKey().id + ") "); - } - - private void printSchema(Schema schema) { - mStream.print("("); - for (Schema.FieldSchema fs : schema.getFields()) { - if (fs.alias != null) mStream.print(fs.alias + ": "); - mStream.print(DataType.findTypeName(fs.type)); - if (fs.schema != null) { - if (fs.type == DataType.BAG) mStream.print("{"); - printSchema(fs.schema); - if (fs.type == DataType.BAG) mStream.print("}"); - } + super(plan, new DepthFirstWalker<MapReduceOper, MROperPlan>(plan)); + mStream = ps; + mStream.println("--------------------------------------------------"); + mStream.println("| Map Reduce Plan |"); + mStream.println("--------------------------------------------------"); + } + + @Override + public void visitMROp(MapReduceOper mr) throws VisitorException { + mStream.println("MapReduce node " + mr.getOperatorKey().toString()); + if (mr.mapPlan != null && mr.mapPlan.size() > 0) { + mStream.println("Map Plan"); + PlanPrinter printer = new PlanPrinter(mr.mapPlan); + printer.visit(); + mStream.println("--------"); + } + if (mr.combinePlan != null && mr.combinePlan.size() > 0) { + mStream.println("Combine Plan"); + PlanPrinter printer = new PlanPrinter(mr.combinePlan); + printer.visit(); + mStream.println("--------"); + } + if (mr.reducePlan != null && mr.reducePlan.size() > 0) { + mStream.println("Reduce Plan"); + PlanPrinter printer = new PlanPrinter(mr.reducePlan); + printer.visit(); + mStream.println("--------"); + } + mStream.println("Global sort: " + mr.isGlobalSort()); + if (mr.getQuantFile() != null) { + mStream.println("Quantile file: " + mr.getQuantFile()); } - mStream.print(")"); + mStream.println("----------------"); } - */ } - Modified: incubator/pig/branches/types/src/org/apache/pig/impl/physicalLayer/relationalOperators/POSort.java URL: http://svn.apache.org/viewvc/incubator/pig/branches/types/src/org/apache/pig/impl/physicalLayer/relationalOperators/POSort.java?rev=675362&r1=675361&r2=675362&view=diff ============================================================================== --- incubator/pig/branches/types/src/org/apache/pig/impl/physicalLayer/relationalOperators/POSort.java (original) +++ incubator/pig/branches/types/src/org/apache/pig/impl/physicalLayer/relationalOperators/POSort.java Wed Jul 9 15:20:46 2008 @@ -66,6 +66,7 @@ private List<Boolean> mAscCols; private POUserComparisonFunc mSortFunc; private final Log log = LogFactory.getLog(getClass()); + private Comparator<Tuple> mComparator; private boolean inputsAccumulated = false; public boolean isUDFComparatorUsed = false; @@ -80,16 +81,18 @@ this.mAscCols = mAscCols; this.mSortFunc = mSortFunc; if (mSortFunc == null) { - sortedBag = BagFactory.getInstance().newSortedBag( - new SortComparator()); + mComparator = new SortComparator(); + /*sortedBag = BagFactory.getInstance().newSortedBag( + new SortComparator());*/ ExprOutputTypes = new ArrayList<Byte>(sortPlans.size()); for(PhysicalPlan plan : sortPlans) { ExprOutputTypes.add(plan.getLeaves().get(0).getResultType()); } } else { - sortedBag = BagFactory.getInstance().newSortedBag( - new UDFSortComparator()); + /*sortedBag = BagFactory.getInstance().newSortedBag( + new UDFSortComparator());*/ + mComparator = new UDFSortComparator(); isUDFComparatorUsed = true; } } @@ -171,6 +174,15 @@ case DataType.LONG: res = Op.getNext(dummyLong); break; + case DataType.TUPLE: + res = Op.getNext(dummyTuple); + break; + + default: + String msg = new String("Did not expect result of type " + + DataType.findTypeName(resultType)); + log.error(msg); + throw new RuntimeException(msg); } return res; } @@ -220,6 +232,7 @@ Result res = new Result(); if (!inputsAccumulated) { res = processInput(); + sortedBag = BagFactory.getInstance().newSortedBag(mComparator); while (res.returnStatus != POStatus.STATUS_EOP) { if (res.returnStatus == POStatus.STATUS_ERR) { log.error("Error in reading from the inputs"); Modified: incubator/pig/branches/types/test/org/apache/pig/test/TestDataModel.java URL: http://svn.apache.org/viewvc/incubator/pig/branches/types/test/org/apache/pig/test/TestDataModel.java?rev=675362&r1=675361&r2=675362&view=diff ============================================================================== --- incubator/pig/branches/types/test/org/apache/pig/test/TestDataModel.java (original) +++ incubator/pig/branches/types/test/org/apache/pig/test/TestDataModel.java Wed Jul 9 15:20:46 2008 @@ -314,6 +314,67 @@ } @Test + public void testMultiFieldTupleCompareTo() throws Exception { + TupleFactory tf = TupleFactory.getInstance(); + + Tuple t1 = tf.newTuple(); + Tuple t2 = tf.newTuple(); + + t1.append(new DataByteArray("bbb")); + t1.append(new DataByteArray("bbb")); + t2.append(new DataByteArray("bbb")); + t2.append(new DataByteArray("bbb")); + + assertEquals("same data equal", 0, t1.compareTo(t2)); + + t2 = tf.newTuple(); + t2.append(new DataByteArray("aaa")); + t2.append(new DataByteArray("aaa")); + assertEquals("greater than tuple with lesser value", 1, t1.compareTo(t2)); + + t2 = tf.newTuple(); + t2.append(new DataByteArray("ddd")); + t2.append(new DataByteArray("ddd")); + assertEquals("less than tuple with greater value", -1, t1.compareTo(t2)); + + // First column same, second lesser + t2 = tf.newTuple(); + t2.append(new DataByteArray("bbb")); + t2.append(new DataByteArray("aaa")); + assertEquals("greater than tuple with lesser value", 1, t1.compareTo(t2)); + + // First column same, second greater + t2 = tf.newTuple(); + t2.append(new DataByteArray("bbb")); + t2.append(new DataByteArray("ccc")); + assertEquals("greater than tuple with lesser value", -1, t1.compareTo(t2)); + + // First column less, second same + t2 = tf.newTuple(); + t2.append(new DataByteArray("aaa")); + t2.append(new DataByteArray("bbb")); + assertEquals("greater than tuple with lesser value", 1, t1.compareTo(t2)); + + // First column greater, second same + t2 = tf.newTuple(); + t2.append(new DataByteArray("ccc")); + t2.append(new DataByteArray("bbb")); + assertEquals("greater than tuple with lesser value", -1, t1.compareTo(t2)); + + // First column less, second greater + t2 = tf.newTuple(); + t2.append(new DataByteArray("aaa")); + t2.append(new DataByteArray("ccc")); + assertEquals("greater than tuple with lesser value", 1, t1.compareTo(t2)); + + // First column greater, second same + t2 = tf.newTuple(); + t2.append(new DataByteArray("ccc")); + t2.append(new DataByteArray("aaa")); + assertEquals("greater than tuple with lesser value", -1, t1.compareTo(t2)); + } + + @Test public void testByteArrayToString() throws Exception { DataByteArray ba = new DataByteArray("hello world"); @@ -350,10 +411,23 @@ assertTrue("same data", ba1.compareTo(ba2) == 0); - assertFalse("lexically lower value less than", + assertTrue("different length lexically lower value less than", ba3.compareTo(ba1) < 0); - assertFalse("lexically higher value greater than", + assertTrue("different length lexically higher value greater than", ba1.compareTo(ba3) > 0); + + ba2 = new DataByteArray("hello worlc"); + assertTrue("same length lexically lower value less than", + ba2.compareTo(ba1) < 0); + assertTrue("same length lexically higher value greater than", + ba1.compareTo(ba2) > 0); + + ba2 = new DataByteArray("hello worlds"); + assertTrue("shorter lexically same value less than", + ba1.compareTo(ba2) < 0); + assertTrue("longer lexically same value greater than", + ba2.compareTo(ba1) > 0); + } private Tuple giveMeOneOfEach() throws Exception {