Author: omalley
Date: Fri Mar 4 03:51:26 2011
New Revision: 1077202
URL: http://svn.apache.org/viewvc?rev=1077202&view=rev
Log:
commit 2d65ac5b02f05d4a3cc00fcc488835d43cc3fed0
Author: Chris Douglas <[email protected]>
Date: Mon Feb 22 22:32:40 2010 -0800
MAPREDUCE:433 from
https://issues.apache.org/jira/secure/attachment/12436678/M433-1y20.patch
Modified:
hadoop/common/branches/branch-0.20-security-patches/src/mapred/org/apache/hadoop/mapred/ReduceTask.java
hadoop/common/branches/branch-0.20-security-patches/src/test/org/apache/hadoop/mapred/TestReduceFetch.java
Modified:
hadoop/common/branches/branch-0.20-security-patches/src/mapred/org/apache/hadoop/mapred/ReduceTask.java
URL:
http://svn.apache.org/viewvc/hadoop/common/branches/branch-0.20-security-patches/src/mapred/org/apache/hadoop/mapred/ReduceTask.java?rev=1077202&r1=1077201&r2=1077202&view=diff
==============================================================================
---
hadoop/common/branches/branch-0.20-security-patches/src/mapred/org/apache/hadoop/mapred/ReduceTask.java
(original)
+++
hadoop/common/branches/branch-0.20-security-patches/src/mapred/org/apache/hadoop/mapred/ReduceTask.java
Fri Mar 4 03:51:26 2011
@@ -1011,9 +1011,10 @@ class ReduceTask extends Task {
throw new IOException("mapred.job.shuffle.input.buffer.percent" +
maxInMemCopyUse);
}
- maxSize = (long)Math.min(
- Runtime.getRuntime().maxMemory() * maxInMemCopyUse,
- Integer.MAX_VALUE);
+ // Allow unit tests to fix Runtime memory
+ maxSize = (int)(conf.getInt("mapred.job.reduce.total.mem.bytes",
+ (int)Math.min(Runtime.getRuntime().maxMemory(), Integer.MAX_VALUE))
+ * maxInMemCopyUse);
maxSingleShuffleLimit = (long)(maxSize *
MAX_SINGLE_SHUFFLE_SEGMENT_FRACTION);
LOG.info("ShuffleRamManager: MemoryLimit=" + maxSize +
", MaxSingleShuffleLimit=" + maxSingleShuffleLimit);
Modified:
hadoop/common/branches/branch-0.20-security-patches/src/test/org/apache/hadoop/mapred/TestReduceFetch.java
URL:
http://svn.apache.org/viewvc/hadoop/common/branches/branch-0.20-security-patches/src/test/org/apache/hadoop/mapred/TestReduceFetch.java?rev=1077202&r1=1077201&r2=1077202&view=diff
==============================================================================
---
hadoop/common/branches/branch-0.20-security-patches/src/test/org/apache/hadoop/mapred/TestReduceFetch.java
(original)
+++
hadoop/common/branches/branch-0.20-security-patches/src/test/org/apache/hadoop/mapred/TestReduceFetch.java
Fri Mar 4 03:51:26 2011
@@ -101,48 +101,53 @@ public class TestReduceFetch extends Tes
}
public void testReduceFromDisk() throws Exception {
+ final int MAP_TASKS = 8;
JobConf job = mrCluster.createJobConf();
job.set("mapred.job.reduce.input.buffer.percent", "0.0");
- job.setNumMapTasks(3);
+ job.setNumMapTasks(MAP_TASKS);
+ job.setInt("mapred.job.reduce.total.mem.bytes", 128 << 20);
+ job.set("mapred.job.shuffle.input.buffer.percent", "0.05");
+ job.setInt("io.sort.factor", 2);
+ job.setInt("mapred.inmem.merge.threshold", 4);
Counters c = runJob(job);
- final long hdfsWritten = c.findCounter(Task.FILESYSTEM_COUNTER_GROUP,
- Task.getFileSystemCounterNames("hdfs")[1]).getCounter();
- final long localRead = c.findCounter(Task.FILESYSTEM_COUNTER_GROUP,
- Task.getFileSystemCounterNames("file")[0]).getCounter();
- assertTrue("Expected more bytes read from local (" +
- localRead + ") than written to HDFS (" + hdfsWritten + ")",
- hdfsWritten <= localRead);
+ final long spill =
c.findCounter(Task.Counter.SPILLED_RECORDS).getCounter();
+ final long out =
c.findCounter(Task.Counter.MAP_OUTPUT_RECORDS).getCounter();
+ assertTrue("Expected all records spilled during reduce (" + spill + ")",
+ spill >= 2 * out); // all records spill at map, reduce
+ assertTrue("Expected intermediate merges (" + spill + ")",
+ spill >= 2 * out + (out / MAP_TASKS)); // some records hit twice
}
public void testReduceFromPartialMem() throws Exception {
+ final int MAP_TASKS = 7;
JobConf job = mrCluster.createJobConf();
- job.setNumMapTasks(5);
+ job.setNumMapTasks(MAP_TASKS);
job.setInt("mapred.inmem.merge.threshold", 0);
job.set("mapred.job.reduce.input.buffer.percent", "1.0");
job.setInt("mapred.reduce.parallel.copies", 1);
job.setInt("io.sort.mb", 10);
- job.set(JobConf.MAPRED_REDUCE_TASK_JAVA_OPTS, "-Xmx128m");
+ job.setInt("mapred.job.reduce.total.mem.bytes", 128 << 20);
job.set("mapred.job.shuffle.input.buffer.percent", "0.14");
job.setNumTasksToExecutePerJvm(1);
job.set("mapred.job.shuffle.merge.percent", "1.0");
Counters c = runJob(job);
- final long hdfsWritten = c.findCounter(Task.FILESYSTEM_COUNTER_GROUP,
- Task.getFileSystemCounterNames("hdfs")[1]).getCounter();
- final long localRead = c.findCounter(Task.FILESYSTEM_COUNTER_GROUP,
- Task.getFileSystemCounterNames("file")[0]).getCounter();
- assertTrue("Expected at least 1MB fewer bytes read from local (" +
- localRead + ") than written to HDFS (" + hdfsWritten + ")",
- hdfsWritten >= localRead + 1024 * 1024);
+ final long out =
c.findCounter(Task.Counter.MAP_OUTPUT_RECORDS).getCounter();
+ final long spill =
c.findCounter(Task.Counter.SPILLED_RECORDS).getCounter();
+ assertTrue("Expected some records not spilled during reduce" + spill + ")",
+ spill < 2 * out); // spilled map records, some records at the reduce
}
public void testReduceFromMem() throws Exception {
+ final int MAP_TASKS = 3;
JobConf job = mrCluster.createJobConf();
job.set("mapred.job.reduce.input.buffer.percent", "1.0");
- job.setNumMapTasks(3);
+ job.set("mapred.job.shuffle.input.buffer.percent", "1.0");
+ job.setInt("mapred.job.reduce.total.mem.bytes", 128 << 20);
+ job.setNumMapTasks(MAP_TASKS);
Counters c = runJob(job);
- final long localRead = c.findCounter(Task.FILESYSTEM_COUNTER_GROUP,
- Task.getFileSystemCounterNames("file")[0]).getCounter();
- assertTrue("Non-zero read from local: " + localRead, localRead == 0);
+ final long spill =
c.findCounter(Task.Counter.SPILLED_RECORDS).getCounter();
+ final long out =
c.findCounter(Task.Counter.MAP_OUTPUT_RECORDS).getCounter();
+ assertEquals("Spilled records: " + spill, out, spill); // no reduce spill
}
}