Author: omalley
Date: Mon Apr 6 06:25:38 2009
New Revision: 762216
URL: http://svn.apache.org/viewvc?rev=762216&view=rev
Log:
HADOOP-5437. Fix TestMiniMRDFSSort to properly test jvm-reuse. (omalley)
Modified:
hadoop/core/trunk/CHANGES.txt
hadoop/core/trunk/src/test/org/apache/hadoop/mapred/TestMiniMRDFSSort.java
Modified: hadoop/core/trunk/CHANGES.txt
URL:
http://svn.apache.org/viewvc/hadoop/core/trunk/CHANGES.txt?rev=762216&r1=762215&r2=762216&view=diff
==============================================================================
--- hadoop/core/trunk/CHANGES.txt (original)
+++ hadoop/core/trunk/CHANGES.txt Mon Apr 6 06:25:38 2009
@@ -722,6 +722,8 @@
HADOOP-5468. Add sub-menus to forrest documentation and make some minor
edits. (Corinne Chandel via szetszwo)
+ HADOOP-5437. Fix TestMiniMRDFSSort to properly test jvm-reuse. (omalley)
+
OPTIMIZATIONS
HADOOP-3293. Fixes FileInputFormat to do provide locations for splits
Modified:
hadoop/core/trunk/src/test/org/apache/hadoop/mapred/TestMiniMRDFSSort.java
URL:
http://svn.apache.org/viewvc/hadoop/core/trunk/src/test/org/apache/hadoop/mapred/TestMiniMRDFSSort.java?rev=762216&r1=762215&r2=762216&view=diff
==============================================================================
--- hadoop/core/trunk/src/test/org/apache/hadoop/mapred/TestMiniMRDFSSort.java
(original)
+++ hadoop/core/trunk/src/test/org/apache/hadoop/mapred/TestMiniMRDFSSort.java
Mon Apr 6 06:25:38 2009
@@ -18,10 +18,18 @@
package org.apache.hadoop.mapred;
+import java.io.IOException;
+
+import junit.extensions.TestSetup;
+import junit.framework.Test;
import junit.framework.TestCase;
+import junit.framework.TestSuite;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.hdfs.MiniDFSCluster;
+import org.apache.hadoop.io.BytesWritable;
+import org.apache.hadoop.io.Text;
+import org.apache.hadoop.mapred.lib.NullOutputFormat;
import org.apache.hadoop.fs.FileSystem;
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.util.ToolRunner;
@@ -39,9 +47,30 @@
// Knobs to control randomwriter; and hence sort
private static final int NUM_HADOOP_SLAVES = 3;
- private static final int RW_BYTES_PER_MAP = 2 * 1024 * 1024;
+ // make it big enough to cause a spill in the map
+ private static final int RW_BYTES_PER_MAP = 3 * 1024 * 1024;
private static final int RW_MAPS_PER_HOST = 2;
+ private static MiniMRCluster mrCluster = null;
+ private static MiniDFSCluster dfsCluster = null;
+ private static FileSystem dfs = null;
+ public static Test suite() {
+ TestSetup setup = new TestSetup(new TestSuite(TestMiniMRDFSSort.class)) {
+ protected void setUp() throws Exception {
+ Configuration conf = new Configuration();
+ dfsCluster = new MiniDFSCluster(conf, NUM_HADOOP_SLAVES, true, null);
+ dfs = dfsCluster.getFileSystem();
+ mrCluster = new MiniMRCluster(NUM_HADOOP_SLAVES,
+ dfs.getUri().toString(), 1);
+ }
+ protected void tearDown() throws Exception {
+ if (dfsCluster != null) { dfsCluster.shutdown(); }
+ if (mrCluster != null) { mrCluster.shutdown(); }
+ }
+ };
+ return setup;
+ }
+
private static void runRandomWriter(JobConf job, Path sortInput)
throws Exception {
// Scale down the default settings for RandomWriter for the test-case
@@ -57,8 +86,10 @@
private static void runSort(JobConf job, Path sortInput, Path sortOutput)
throws Exception {
+ job.setInt("mapred.job.reuse.jvm.num.tasks", -1);
job.setInt("io.sort.mb", 1);
- job.setLong("mapred.min.split.size", Long.MAX_VALUE);
+ job.setNumMapTasks(12);
+
// Setup command-line arguments to 'sort'
String[] sortArgs = {sortInput.toString(), sortOutput.toString()};
@@ -75,34 +106,66 @@
// Run Sort-Validator
assertEquals(ToolRunner.run(job, new SortValidator(), svArgs), 0);
}
- Configuration conf = new Configuration();
- public void testMapReduceSort() throws Exception {
- MiniDFSCluster dfs = null;
- MiniMRCluster mr = null;
- FileSystem fileSys = null;
- try {
-
- // Start the mini-MR and mini-DFS clusters
- dfs = new MiniDFSCluster(conf, NUM_HADOOP_SLAVES, true, null);
- fileSys = dfs.getFileSystem();
- mr = new MiniMRCluster(NUM_HADOOP_SLAVES, fileSys.getUri().toString(),
1);
-
- // Run randomwriter to generate input for 'sort'
- runRandomWriter(mr.createJobConf(), SORT_INPUT_PATH);
-
- // Run sort
- runSort(mr.createJobConf(), SORT_INPUT_PATH, SORT_OUTPUT_PATH);
-
- // Run sort-validator to check if sort worked correctly
- runSortValidator(mr.createJobConf(), SORT_INPUT_PATH, SORT_OUTPUT_PATH);
- } finally {
- if (dfs != null) { dfs.shutdown(); }
- if (mr != null) { mr.shutdown();
- }
+
+ private static class ReuseDetector extends MapReduceBase
+ implements Mapper<BytesWritable,BytesWritable, Text, Text> {
+ static int instances = 0;
+ Reporter reporter = null;
+
+ @Override
+ public void map(BytesWritable key, BytesWritable value,
+ OutputCollector<Text, Text> output,
+ Reporter reporter) throws IOException {
+ this.reporter = reporter;
+ }
+
+ public void close() throws IOException {
+ reporter.incrCounter("jvm", "use", ++instances);
}
}
- public void testMapReduceSortWithJvmReuse() throws Exception {
- conf.setInt("mapred.job.reuse.jvm.num.tasks", -1);
- testMapReduceSort();
+
+ private static void runJvmReuseTest(JobConf job,
+ boolean reuse) throws IOException {
+ // setup a map-only job that reads the input and only sets the counters
+ // based on how many times the jvm was reused.
+ job.setInt("mapred.job.reuse.jvm.num.tasks", reuse ? -1 : 1);
+ FileInputFormat.setInputPaths(job, SORT_INPUT_PATH);
+ job.setInputFormat(SequenceFileInputFormat.class);
+ job.setOutputFormat(NullOutputFormat.class);
+ job.setMapperClass(ReuseDetector.class);
+ job.setOutputKeyClass(Text.class);
+ job.setOutputValueClass(Text.class);
+ job.setNumMapTasks(24);
+ job.setNumReduceTasks(0);
+ RunningJob result = JobClient.runJob(job);
+ long uses = result.getCounters().findCounter("jvm", "use").getValue();
+ System.out.println("maps = " + job.getNumMapTasks());
+ System.out.println(result.getCounters());
+ int maps = job.getNumMapTasks();
+ if (reuse) {
+ assertTrue("maps = " + maps + ", uses = " + uses, maps < uses);
+ } else {
+ assertEquals("uses should be number of maps", job.getNumMapTasks(),
uses);
+ }
+ }
+
+ public void testMapReduceSort() throws Exception {
+ // Run randomwriter to generate input for 'sort'
+ runRandomWriter(mrCluster.createJobConf(), SORT_INPUT_PATH);
+
+ // Run sort
+ runSort(mrCluster.createJobConf(), SORT_INPUT_PATH, SORT_OUTPUT_PATH);
+
+ // Run sort-validator to check if sort worked correctly
+ runSortValidator(mrCluster.createJobConf(), SORT_INPUT_PATH,
+ SORT_OUTPUT_PATH);
+ }
+
+ public void testJvmReuse() throws Exception {
+ runJvmReuseTest(mrCluster.createJobConf(), true);
+ }
+
+ public void testNoJvmReuse() throws Exception {
+ runJvmReuseTest(mrCluster.createJobConf(), false);
}
}