Author: cutting Date: Mon May 14 11:00:10 2007 New Revision: 537926 URL: http://svn.apache.org/viewvc?view=rev&rev=537926 Log: HADOOP-1342. In aggregators, permit one to limit the number of unique values per key. Contributed by Runping.
Modified: lucene/hadoop/trunk/CHANGES.txt lucene/hadoop/trunk/src/java/org/apache/hadoop/mapred/lib/aggregate/UniqValueCount.java lucene/hadoop/trunk/src/java/org/apache/hadoop/mapred/lib/aggregate/ValueAggregatorBaseDescriptor.java lucene/hadoop/trunk/src/test/org/apache/hadoop/mapred/TestAggregates.java Modified: lucene/hadoop/trunk/CHANGES.txt URL: http://svn.apache.org/viewvc/lucene/hadoop/trunk/CHANGES.txt?view=diff&rev=537926&r1=537925&r2=537926 ============================================================================== --- lucene/hadoop/trunk/CHANGES.txt (original) +++ lucene/hadoop/trunk/CHANGES.txt Mon May 14 11:00:10 2007 @@ -11,6 +11,9 @@ 3. HADOOP-1344. Add RunningJob#getJobName(). (Michael Bieniosek via cutting) + 4. HADOOP-1342. In aggregators, permit one to limit the number of + unique values per key. (Runping Qi via cutting) + Branch 0.13 (unreleased changes) Modified: lucene/hadoop/trunk/src/java/org/apache/hadoop/mapred/lib/aggregate/UniqValueCount.java URL: http://svn.apache.org/viewvc/lucene/hadoop/trunk/src/java/org/apache/hadoop/mapred/lib/aggregate/UniqValueCount.java?view=diff&rev=537926&r1=537925&r2=537926 ============================================================================== --- lucene/hadoop/trunk/src/java/org/apache/hadoop/mapred/lib/aggregate/UniqValueCount.java (original) +++ lucene/hadoop/trunk/src/java/org/apache/hadoop/mapred/lib/aggregate/UniqValueCount.java Mon May 14 11:00:10 2007 @@ -29,17 +29,49 @@ */ public class UniqValueCount implements ValueAggregator { - TreeMap<Object, Object> uniqItems = null; + private TreeMap<Object, Object> uniqItems = null; + + private long numItems = 0; + + private long maxNumItems = Long.MAX_VALUE; /** * the default constructor * */ public UniqValueCount() { + this(Long.MAX_VALUE); + } + + /** + * constructor + * @param maxNum the limit in the number of unique values to keep. + * + */ + public UniqValueCount(long maxNum) { uniqItems = new TreeMap<Object, Object>(); + this.numItems = 0; + maxNumItems = Long.MAX_VALUE; + if (maxNum > 0 ) { + this.maxNumItems = maxNum; + } } /** + * Set the limit on the number of unique values + * @param n the desired limit on the number of unique values + * @return the new limit on the number of unique values + */ + public long setMaxItems(long n) { + if (n >= numItems) { + this.maxNumItems = n; + } else if (this.maxNumItems >= this.numItems) { + this.maxNumItems = this.numItems; + } + return this.maxNumItems; + } + + /** * add a value to the aggregator * * @param val @@ -47,8 +79,10 @@ * */ public void addNextValue(Object val) { - uniqItems.put(val, "1"); - + if (this.numItems <= this.maxNumItems) { + uniqItems.put(val, "1"); + this.numItems = this.uniqItems.size(); + } } /** Modified: lucene/hadoop/trunk/src/java/org/apache/hadoop/mapred/lib/aggregate/ValueAggregatorBaseDescriptor.java URL: http://svn.apache.org/viewvc/lucene/hadoop/trunk/src/java/org/apache/hadoop/mapred/lib/aggregate/ValueAggregatorBaseDescriptor.java?view=diff&rev=537926&r1=537925&r2=537926 ============================================================================== --- lucene/hadoop/trunk/src/java/org/apache/hadoop/mapred/lib/aggregate/ValueAggregatorBaseDescriptor.java (original) +++ lucene/hadoop/trunk/src/java/org/apache/hadoop/mapred/lib/aggregate/ValueAggregatorBaseDescriptor.java Mon May 14 11:00:10 2007 @@ -46,7 +46,8 @@ static public final String STRING_VALUE_MIN = "StringValueMin"; - + private static long maxNumItems = Long.MAX_VALUE; + public String inputFile = null; private static class MyEntry implements Entry { @@ -106,7 +107,7 @@ } else if (type.compareToIgnoreCase(DOUBLE_VALUE_SUM) == 0) { retv = new DoubleValueSum(); } else if (type.compareToIgnoreCase(UNIQ_VALUE_COUNT) == 0) { - retv = new UniqValueCount(); + retv = new UniqValueCount(maxNumItems); } else if (type.compareToIgnoreCase(VALUE_HISTOGRAM) == 0) { retv = new ValueHistogram(); } @@ -153,5 +154,6 @@ */ public void configure(JobConf job) { this.inputFile = job.get("map.input.file"); + maxNumItems = job.getLong("aggregate.max.num.unique.values", Long.MAX_VALUE); } } Modified: lucene/hadoop/trunk/src/test/org/apache/hadoop/mapred/TestAggregates.java URL: http://svn.apache.org/viewvc/lucene/hadoop/trunk/src/test/org/apache/hadoop/mapred/TestAggregates.java?view=diff&rev=537926&r1=537925&r2=537926 ============================================================================== --- lucene/hadoop/trunk/src/test/org/apache/hadoop/mapred/TestAggregates.java (original) +++ lucene/hadoop/trunk/src/test/org/apache/hadoop/mapred/TestAggregates.java Mon May 14 11:00:10 2007 @@ -69,7 +69,7 @@ } expectedOutput.append("value_as_string_max\t9\n"); expectedOutput.append("value_as_string_min\t1\n"); - expectedOutput.append("uniq_count\t19\n"); + expectedOutput.append("uniq_count\t15\n"); fileOut.write(inputData.toString().getBytes("utf-8")); @@ -96,6 +96,7 @@ job.setInt("aggregator.descriptor.num", 1); job.set("aggregator.descriptor.0", "UserDefined,org.apache.hadoop.mapred.lib.aggregate.AggregatorTests"); + job.setLong("aggregate.max.num.unique.values", 14); JobClient.runJob(job);