Author: cdouglas
Date: Wed Apr 29 04:10:49 2009
New Revision: 769643
URL: http://svn.apache.org/viewvc?rev=769643&view=rev
Log:
HADOOP-5657. Validate data in TestReduceFetch to improve merge test coverage.
Modified:
hadoop/core/trunk/CHANGES.txt
hadoop/core/trunk/src/test/org/apache/hadoop/mapred/TestReduceFetch.java
Modified: hadoop/core/trunk/CHANGES.txt
URL:
http://svn.apache.org/viewvc/hadoop/core/trunk/CHANGES.txt?rev=769643&r1=769642&r2=769643&view=diff
==============================================================================
--- hadoop/core/trunk/CHANGES.txt (original)
+++ hadoop/core/trunk/CHANGES.txt Wed Apr 29 04:10:49 2009
@@ -265,6 +265,9 @@
HADOOP-5734. Correct block placement policy description in HDFS
Design document. (Konstantin Boudnik via shv)
+ HADOOP-5657. Validate data in TestReduceFetch to improve merge test
+ coverage. (cdouglas)
+
OPTIMIZATIONS
HADOOP-5595. NameNode does not need to run a replicator to choose a
Modified:
hadoop/core/trunk/src/test/org/apache/hadoop/mapred/TestReduceFetch.java
URL:
http://svn.apache.org/viewvc/hadoop/core/trunk/src/test/org/apache/hadoop/mapred/TestReduceFetch.java?rev=769643&r1=769642&r2=769643&view=diff
==============================================================================
--- hadoop/core/trunk/src/test/org/apache/hadoop/mapred/TestReduceFetch.java
(original)
+++ hadoop/core/trunk/src/test/org/apache/hadoop/mapred/TestReduceFetch.java
Wed Apr 29 04:10:49 2009
@@ -20,6 +20,8 @@
import java.io.IOException;
import java.util.Arrays;
+import java.util.Formatter;
+import java.util.Iterator;
import junit.framework.Test;
import junit.framework.TestCase;
@@ -32,6 +34,7 @@
import org.apache.hadoop.hdfs.MiniDFSCluster;
import org.apache.hadoop.io.NullWritable;
import org.apache.hadoop.io.Text;
+import org.apache.hadoop.io.WritableComparator;
import org.apache.hadoop.mapred.TestMapCollection.FakeIF;
import org.apache.hadoop.mapred.TestMapCollection.FakeSplit;
import org.apache.hadoop.mapred.lib.IdentityReducer;
@@ -56,34 +59,169 @@
return setup;
}
- public static class MapMB
- implements Mapper<NullWritable,NullWritable,Text,Text> {
+ private static final String tagfmt = "%04d";
+ private static final String keyfmt = "KEYKEYKEYKEYKEYKEYKE";
+ private static final int keylen = keyfmt.length();
+ private static int getValLen(int id, int nMaps) {
+ return 4096 / nMaps * (id + 1);
+ }
+
+ /**
+ * Emit 4096 small keys, 2 "tagged" keys. Emits a fixed amount of
+ * data so the in-memory fetch semantics can be tested.
+ */
+ public static class MapMB implements
+ Mapper<NullWritable,NullWritable,Text,Text> {
+
+ private int id;
+ private int nMaps;
+ private final Text key = new Text();
+ private final Text val = new Text();
+ private final byte[] b = new byte[4096];
+ private final Formatter fmt = new Formatter(new StringBuilder(25));
+
+ @Override
+ public void configure(JobConf conf) {
+ nMaps = conf.getNumMapTasks();
+ id = nMaps - conf.getInt("mapred.task.partition", -1) - 1;
+ Arrays.fill(b, 0, 4096, (byte)'V');
+ ((StringBuilder)fmt.out()).append(keyfmt);
+ }
+
+ @Override
public void map(NullWritable nk, NullWritable nv,
OutputCollector<Text, Text> output, Reporter reporter)
throws IOException {
- Text key = new Text();
- Text val = new Text();
- key.set("KEYKEYKEYKEYKEYKEYKEYKEY");
- byte[] b = new byte[1000];
- Arrays.fill(b, (byte)'V');
- val.set(b);
- b = null;
- for (int i = 0; i < 4 * 1024; ++i) {
+ // Emit 4096 fixed-size records
+ val.set(b, 0, 1000);
+ val.getBytes()[0] = (byte) id;
+ for (int i = 0; i < 4096; ++i) {
+ key.set(fmt.format(tagfmt, i).toString());
output.collect(key, val);
+ ((StringBuilder)fmt.out()).setLength(keylen);
}
+
+ // Emit two "tagged" records from the map. To validate the merge,
segments
+ // should have both a small and large record such that reading a large
+ // record from an on-disk segment into an in-memory segment will write
+ // over the beginning of a record in the in-memory segment, causing the
+ // merge and/or validation to fail.
+
+ // Add small, tagged record
+ val.set(b, 0, getValLen(id, nMaps) - 128);
+ val.getBytes()[0] = (byte) id;
+ ((StringBuilder)fmt.out()).setLength(keylen);
+ key.set("A" + fmt.format(tagfmt, id).toString());
+ output.collect(key, val);
+ // Add large, tagged record
+ val.set(b, 0, getValLen(id, nMaps));
+ val.getBytes()[0] = (byte) id;
+ ((StringBuilder)fmt.out()).setLength(keylen);
+ key.set("B" + fmt.format(tagfmt, id).toString());
+ output.collect(key, val);
}
- public void configure(JobConf conf) { }
+
+ @Override
public void close() throws IOException { }
}
+ /**
+ * Confirm that each small key is emitted once by all maps, each tagged key
+ * is emitted by only one map, all IDs are consistent with record data, and
+ * all non-ID record data is consistent.
+ */
+ public static class MBValidate
+ implements Reducer<Text,Text,Text,Text> {
+
+ private static int nMaps;
+ private static final Text vb = new Text();
+ static {
+ byte[] v = new byte[4096];
+ Arrays.fill(v, (byte)'V');
+ vb.set(v);
+ }
+
+ private int nRec = 0;
+ private int nKey = -1;
+ private int aKey = -1;
+ private int bKey = -1;
+ private final Text kb = new Text();
+ private final Formatter fmt = new Formatter(new StringBuilder(25));
+
+ @Override
+ public void configure(JobConf conf) {
+ nMaps = conf.getNumMapTasks();
+ ((StringBuilder)fmt.out()).append(keyfmt);
+ }
+
+ @Override
+ public void reduce(Text key, Iterator<Text> values,
+ OutputCollector<Text,Text> out, Reporter reporter)
+ throws IOException {
+ int vc = 0;
+ final int vlen;
+ final int preRec = nRec;
+ final int vcCheck, recCheck;
+ ((StringBuilder)fmt.out()).setLength(keylen);
+ if (25 == key.getLength()) {
+ // tagged record
+ recCheck = 1; // expect only 1 record
+ switch ((char)key.getBytes()[0]) {
+ case 'A':
+ vlen = getValLen(++aKey, nMaps) - 128;
+ vcCheck = aKey; // expect eq id
+ break;
+ case 'B':
+ vlen = getValLen(++bKey, nMaps);
+ vcCheck = bKey; // expect eq id
+ break;
+ default:
+ vlen = vcCheck = -1;
+ fail("Unexpected tag on record: " + ((char)key.getBytes()[24]));
+ }
+ kb.set((char)key.getBytes()[0] +
fmt.format(tagfmt,vcCheck).toString());
+ } else {
+ kb.set(fmt.format(tagfmt, ++nKey).toString());
+ vlen = 1000;
+ recCheck = nMaps; // expect 1 rec per map
+ vcCheck = (nMaps * (nMaps - 1)) >>> 1; // expect eq sum(id)
+ }
+ assertEquals(kb, key);
+ while (values.hasNext()) {
+ final Text val = values.next();
+ // increment vc by map ID assoc w/ val
+ vc += val.getBytes()[0];
+ // verify that all the fixed characters 'V' match
+ assertEquals(0, WritableComparator.compareBytes(
+ vb.getBytes(), 1, vlen - 1,
+ val.getBytes(), 1, val.getLength() - 1));
+ out.collect(key, val);
+ ++nRec;
+ }
+ assertEquals("Bad rec count for " + key, recCheck, nRec - preRec);
+ assertEquals("Bad rec group for " + key, vcCheck, vc);
+ }
+
+ @Override
+ public void close() throws IOException {
+ assertEquals(4095, nKey);
+ assertEquals(nMaps - 1, aKey);
+ assertEquals(nMaps - 1, bKey);
+ assertEquals("Bad record count", nMaps * (4096 + 2), nRec);
+ }
+ }
+
public static Counters runJob(JobConf conf) throws Exception {
conf.setMapperClass(MapMB.class);
- conf.setReducerClass(IdentityReducer.class);
+ conf.setReducerClass(MBValidate.class);
conf.setOutputKeyClass(Text.class);
conf.setOutputValueClass(Text.class);
conf.setNumReduceTasks(1);
conf.setInputFormat(FakeIF.class);
+ conf.setNumTasksToExecutePerJvm(1);
+ conf.setInt("mapred.map.max.attempts", 0);
+ conf.setInt("mapred.reduce.max.attempts", 0);
FileInputFormat.setInputPaths(conf, new Path("/in"));
final Path outp = new Path("/out");
FileOutputFormat.setOutputPath(conf, outp);
@@ -100,10 +238,15 @@
return job.getCounters();
}
+ /** Verify that all segments are read from disk */
public void testReduceFromDisk() throws Exception {
JobConf job = mrCluster.createJobConf();
job.set("mapred.job.reduce.input.buffer.percent", "0.0");
- job.setNumMapTasks(3);
+ job.setNumMapTasks(8);
+ job.set("mapred.child.java.opts", "-Xmx128m");
+ job.set("mapred.job.shuffle.input.buffer.percent", "0.14");
+ job.setInt("io.sort.factor", 2);
+ job.setInt("mapred.inmem.merge.threshold", 4);
Counters c = runJob(job);
final long hdfsWritten = c.findCounter(Task.FILESYSTEM_COUNTER_GROUP,
Task.getFileSystemCounterNames("hdfs")[1]).getCounter();
@@ -114,6 +257,7 @@
hdfsWritten <= localRead);
}
+ /** Verify that at least one segment does not hit disk */
public void testReduceFromPartialMem() throws Exception {
JobConf job = mrCluster.createJobConf();
job.setNumMapTasks(5);
@@ -123,7 +267,6 @@
job.setInt("io.sort.mb", 10);
job.set("mapred.child.java.opts", "-Xmx128m");
job.set("mapred.job.shuffle.input.buffer.percent", "0.14");
- job.setNumTasksToExecutePerJvm(1);
job.set("mapred.job.shuffle.merge.percent", "1.0");
Counters c = runJob(job);
final long hdfsWritten = c.findCounter(Task.FILESYSTEM_COUNTER_GROUP,
@@ -135,6 +278,7 @@
hdfsWritten >= localRead + 1024 * 1024);
}
+ /** Verify that no segment hits disk. */
public void testReduceFromMem() throws Exception {
JobConf job = mrCluster.createJobConf();
job.set("mapred.job.reduce.input.buffer.percent", "1.0");