Author: omalley
Date: Tue Apr 29 15:47:38 2008
New Revision: 652182
URL: http://svn.apache.org/viewvc?rev=652182&view=rev
Log:
Merge -r 652178:652179 from trunk to branch-0.17 to fix HADOOP-3280.
Added:
hadoop/core/branches/branch-0.17/src/contrib/streaming/src/test/org/apache/hadoop/streaming/UlimitApp.java
- copied unchanged from r652179,
hadoop/core/trunk/src/contrib/streaming/src/test/org/apache/hadoop/streaming/UlimitApp.java
Modified:
hadoop/core/branches/branch-0.17/CHANGES.txt
hadoop/core/branches/branch-0.17/conf/hadoop-default.xml
hadoop/core/branches/branch-0.17/src/contrib/streaming/src/java/org/apache/hadoop/streaming/PipeMapRed.java
hadoop/core/branches/branch-0.17/src/contrib/streaming/src/test/org/apache/hadoop/streaming/CatApp.java
hadoop/core/branches/branch-0.17/src/contrib/streaming/src/test/org/apache/hadoop/streaming/TestUlimit.java
hadoop/core/branches/branch-0.17/src/docs/src/documentation/content/xdocs/cluster_setup.xml
hadoop/core/branches/branch-0.17/src/docs/src/documentation/content/xdocs/mapred_tutorial.xml
hadoop/core/branches/branch-0.17/src/java/org/apache/hadoop/mapred/TaskRunner.java
hadoop/core/branches/branch-0.17/src/java/org/apache/hadoop/mapred/pipes/Application.java
hadoop/core/branches/branch-0.17/src/java/org/apache/hadoop/util/Shell.java
Modified: hadoop/core/branches/branch-0.17/CHANGES.txt
URL:
http://svn.apache.org/viewvc/hadoop/core/branches/branch-0.17/CHANGES.txt?rev=652182&r1=652181&r2=652182&view=diff
==============================================================================
--- hadoop/core/branches/branch-0.17/CHANGES.txt (original)
+++ hadoop/core/branches/branch-0.17/CHANGES.txt Tue Apr 29 15:47:38 2008
@@ -92,6 +92,11 @@
HADOOP-3266. Removed HOD changes from CHANGES.txt, as they are now inside
src/contrib/hod (Hemanth Yamijala via ddas)
+ HADOOP-3280. Separate the configuration of the virtual memory size
+ (mapred.child.ulimit) from the jvm heap size, so that 64 bit
+ streaming applications are supported even when running with 32 bit
+ jvms. (acmurthy via omalley)
+
NEW FEATURES
HADOOP-1398. Add HBase in-memory block cache. (tomwhite)
Modified: hadoop/core/branches/branch-0.17/conf/hadoop-default.xml
URL:
http://svn.apache.org/viewvc/hadoop/core/branches/branch-0.17/conf/hadoop-default.xml?rev=652182&r1=652181&r2=652182&view=diff
==============================================================================
--- hadoop/core/branches/branch-0.17/conf/hadoop-default.xml (original)
+++ hadoop/core/branches/branch-0.17/conf/hadoop-default.xml Tue Apr 29
15:47:38 2008
@@ -736,8 +736,23 @@
For example, to enable verbose gc logging to a file named for the taskid in
/tmp and to set the heap maximum to be a gigabyte, pass a 'value' of:
-Xmx1024m -verbose:gc -Xloggc:/tmp/@[EMAIL PROTECTED]
- The value of -Xmx will also directly influence the amount of virtual memory
- that a streaming/pipes task gets during execution.
+
+ The configuration variable mapred.child.ulimit can be used to control the
+ maximum virtual memory of the child processes.
+ </description>
+</property>
+
+<property>
+ <name>mapred.child.ulimit</name>
+ <value></value>
+ <description>The maximum virtual memory, in KB, of a process launched by the
+ Map-Reduce framework. This can be used to control both the Mapper/Reducer
+ tasks and applications using Hadoop Pipes, Hadoop Streaming etc.
+ By default it is left unspecified to let cluster admins control it via
+ limits.conf and other such relevant mechanisms.
+
+ Note: mapred.child.ulimit must be greater than or equal to the -Xmx passed to
+ JavaVM, else the VM might not start.
</description>
</property>
Modified:
hadoop/core/branches/branch-0.17/src/contrib/streaming/src/java/org/apache/hadoop/streaming/PipeMapRed.java
URL:
http://svn.apache.org/viewvc/hadoop/core/branches/branch-0.17/src/contrib/streaming/src/java/org/apache/hadoop/streaming/PipeMapRed.java?rev=652182&r1=652181&r2=652182&view=diff
==============================================================================
---
hadoop/core/branches/branch-0.17/src/contrib/streaming/src/java/org/apache/hadoop/streaming/PipeMapRed.java
(original)
+++
hadoop/core/branches/branch-0.17/src/contrib/streaming/src/java/org/apache/hadoop/streaming/PipeMapRed.java
Tue Apr 29 15:47:38 2008
@@ -37,6 +37,7 @@
import org.apache.hadoop.mapred.OutputCollector;
import org.apache.hadoop.mapred.TaskLog;
import org.apache.hadoop.mapred.LineRecordReader.LineReader;
+import org.apache.hadoop.util.Shell;
import org.apache.hadoop.util.StringUtils;
import org.apache.hadoop.io.Text;
@@ -165,23 +166,11 @@
addEnvironment(childEnv, job_.get("stream.addenvironment"));
// add TMPDIR environment variable with the value of java.io.tmpdir
envPut(childEnv, "TMPDIR", System.getProperty("java.io.tmpdir"));
- if (StreamUtil.isCygwin()) {
- sim = Runtime.getRuntime().exec(argvSplit, childEnv.toArray());
- } else {
- List<String> cmd = new ArrayList<String>();
- for (String arg : argvSplit) {
- cmd.add(arg);
- }
- // set memory limit using ulimit.
- ProcessBuilder builder;
- List<String> setup = new ArrayList<String>();
- setup.add("ulimit");
- setup.add("-v");
- setup.add(String.valueOf(Runtime.getRuntime().maxMemory() / 1024));
- builder = new ProcessBuilder(wrapCommand(setup, cmd));
- builder.environment().putAll(childEnv.toMap());
- sim = builder.start();
- }
+
+ // Start the process
+ ProcessBuilder builder = new ProcessBuilder(argvSplit);
+ builder.environment().putAll(childEnv.toMap());
+ sim = builder.start();
clientOut_ = new DataOutputStream(new
BufferedOutputStream(sim.getOutputStream()));
clientIn_ = new DataInputStream(new
BufferedInputStream(sim.getInputStream()));
@@ -196,29 +185,6 @@
throw new RuntimeException("configuration exception", e);
}
}
-
- /**
- * Wrap command with bash -c with setup commands.
- * Setup commands such as setting memory limit can be passed which
- * will be executed before exec.
- * @param setup The setup commands for the execed process.
- * @param cmd The command and the arguments that should be run
- * @return the modified command that should be run
- */
- private List<String> wrapCommand( List<String> setup,
- List<String> cmd
- ) throws IOException {
- List<String> result = new ArrayList<String>();
- result.add("bash");
- result.add("-c");
- StringBuffer mergedCmd = new StringBuffer();
- mergedCmd.append(TaskLog.addCommand(setup, false));
- mergedCmd.append(";");
- mergedCmd.append("exec ");
- mergedCmd.append(TaskLog.addCommand(cmd, true));
- result.add(mergedCmd.toString());
- return result;
- }
void setStreamJobDetails(JobConf job) {
jobLog_ = job.get("stream.jobLog_");
Modified:
hadoop/core/branches/branch-0.17/src/contrib/streaming/src/test/org/apache/hadoop/streaming/CatApp.java
URL:
http://svn.apache.org/viewvc/hadoop/core/branches/branch-0.17/src/contrib/streaming/src/test/org/apache/hadoop/streaming/CatApp.java?rev=652182&r1=652181&r2=652182&view=diff
==============================================================================
---
hadoop/core/branches/branch-0.17/src/contrib/streaming/src/test/org/apache/hadoop/streaming/CatApp.java
(original)
+++
hadoop/core/branches/branch-0.17/src/contrib/streaming/src/test/org/apache/hadoop/streaming/CatApp.java
Tue Apr 29 15:47:38 2008
@@ -1,44 +0,0 @@
-/**
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements. See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership. The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.hadoop.streaming;
-
-import java.io.*;
-
-/** A minimal Java implementation of /bin/cat
- * The class also tries to allocate a huge array( 10MB) to test ulimits.
- * Look at [EMAIL PROTECTED] TestUlimit}
- */
-public class CatApp {
- public static void main(String args[]) throws IOException{
- char s[] = null;
- try {
- s = new char[10*1024*1024];
- BufferedReader in = new BufferedReader(
- new InputStreamReader(System.in));
- String line;
- while ((line = in.readLine()) != null) {
- System.out.println(line);
- }
- } finally {
- if (s == null) {
- System.exit(-1);
- }
- }
- }
-}
Modified:
hadoop/core/branches/branch-0.17/src/contrib/streaming/src/test/org/apache/hadoop/streaming/TestUlimit.java
URL:
http://svn.apache.org/viewvc/hadoop/core/branches/branch-0.17/src/contrib/streaming/src/test/org/apache/hadoop/streaming/TestUlimit.java?rev=652182&r1=652181&r2=652182&view=diff
==============================================================================
---
hadoop/core/branches/branch-0.17/src/contrib/streaming/src/test/org/apache/hadoop/streaming/TestUlimit.java
(original)
+++
hadoop/core/branches/branch-0.17/src/contrib/streaming/src/test/org/apache/hadoop/streaming/TestUlimit.java
Tue Apr 29 15:47:38 2008
@@ -41,9 +41,6 @@
* is expected to be a failure.
*/
public class TestUlimit extends TestCase {
- private static final Log LOG =
- LogFactory.getLog(TestUlimit.class.getName());
- enum RESULT { FAILURE, SUCCESS };
String input = "the dummy input";
Path inputPath = new Path("/testing/in");
Path outputPath = new Path("/testing/out");
@@ -51,6 +48,7 @@
MiniDFSCluster dfs = null;
MiniMRCluster mr = null;
FileSystem fs = null;
+ private static String SET_MEMORY_LIMIT = "786432"; // 768MB
String[] genArgs(String memLimit) {
return new String[] {
@@ -59,10 +57,11 @@
"-mapper", map,
"-reducer", "org.apache.hadoop.mapred.lib.IdentityReducer",
"-numReduceTasks", "0",
- "-jobconf", "mapred.child.java.opts=" + memLimit,
+ "-jobconf", "mapred.child.ulimit=" + memLimit,
"-jobconf", "mapred.job.tracker=" + "localhost:" +
mr.getJobTrackerPort(),
- "-jobconf", "fs.default.name=" + "localhost:" + dfs.getNameNodePort(),
+ "-jobconf", "fs.default.name=" + "hdfs://localhost:"
+ + dfs.getNameNodePort(),
"-jobconf", "stream.tmpdir=" +
System.getProperty("test.build.data","/tmp")
};
@@ -87,12 +86,10 @@
mr = new MiniMRCluster(numSlaves, fs.getUri().toString(), 1);
writeInputFile(fs, inputPath);
- map = StreamUtil.makeJavaCommand(CatApp.class, new String[]{});
- runProgram("-Xmx2048m", RESULT.SUCCESS);
+ map = StreamUtil.makeJavaCommand(UlimitApp.class, new String[]{});
+ runProgram(SET_MEMORY_LIMIT);
FileUtil.fullyDelete(fs, outputPath);
assertFalse("output not cleaned up", fs.exists(outputPath));
- // 100MB is not sufficient for launching jvm. This launch should fail.
- runProgram("-Xmx0.5m", RESULT.FAILURE);
mr.waitUntilIdle();
} catch(IOException e) {
fail(e.toString());
@@ -114,24 +111,14 @@
* @param result Expected result
* @throws IOException
*/
- private void runProgram(String memLimit, RESULT result
- ) throws IOException {
+ private void runProgram(String memLimit) throws IOException {
boolean mayExit = false;
- int ret = 1;
- try {
- StreamJob job = new StreamJob(genArgs(memLimit), mayExit);
- ret = job.go();
- } catch (IOException ioe) {
- LOG.warn("Job Failed! " + StringUtils.stringifyException(ioe));
- ioe.printStackTrace();
- }
+ StreamJob job = new StreamJob(genArgs(memLimit), mayExit);
+ job.go();
String output = TestMiniMRWithDFS.readOutput(outputPath,
mr.createJobConf());
- if (RESULT.SUCCESS.name().equals(result.name())){
- assertEquals("output is wrong", input, output.trim());
- } else {
- assertTrue("output is correct", !input.equals(output.trim()));
- }
+ assertEquals("output is wrong", SET_MEMORY_LIMIT,
+ output.trim());
}
public static void main(String[]args) throws Exception
Modified:
hadoop/core/branches/branch-0.17/src/docs/src/documentation/content/xdocs/cluster_setup.xml
URL:
http://svn.apache.org/viewvc/hadoop/core/branches/branch-0.17/src/docs/src/documentation/content/xdocs/cluster_setup.xml?rev=652182&r1=652181&r2=652182&view=diff
==============================================================================
---
hadoop/core/branches/branch-0.17/src/docs/src/documentation/content/xdocs/cluster_setup.xml
(original)
+++
hadoop/core/branches/branch-0.17/src/docs/src/documentation/content/xdocs/cluster_setup.xml
Tue Apr 29 15:47:38 2008
@@ -272,8 +272,7 @@
<td>mapred.child.java.opts</td>
<td>-Xmx512M</td>
<td>
- Larger heap-size for child jvms of maps/reduces. Also
controls the amount
- of virtual memory that a streaming/pipes task gets.
+ Larger heap-size for child jvms of maps/reduces.
</td>
</tr>
<tr>
Modified:
hadoop/core/branches/branch-0.17/src/docs/src/documentation/content/xdocs/mapred_tutorial.xml
URL:
http://svn.apache.org/viewvc/hadoop/core/branches/branch-0.17/src/docs/src/documentation/content/xdocs/mapred_tutorial.xml?rev=652182&r1=652181&r2=652182&view=diff
==============================================================================
---
hadoop/core/branches/branch-0.17/src/docs/src/documentation/content/xdocs/mapred_tutorial.xml
(original)
+++
hadoop/core/branches/branch-0.17/src/docs/src/documentation/content/xdocs/mapred_tutorial.xml
Tue Apr 29 15:47:38 2008
@@ -1065,6 +1065,9 @@
<code></property></code>
</p>
+ <p>Users/admins can also specify the maximum virtual memory
+ of the launched child-task using <code>mapred.child.ulimit</code>.</p>
+
<p>When the job starts, the localized job directory
<code> ${mapred.local.dir}/taskTracker/jobcache/$jobid/</code>
has the following directories: </p>
Modified:
hadoop/core/branches/branch-0.17/src/java/org/apache/hadoop/mapred/TaskRunner.java
URL:
http://svn.apache.org/viewvc/hadoop/core/branches/branch-0.17/src/java/org/apache/hadoop/mapred/TaskRunner.java?rev=652182&r1=652181&r2=652182&view=diff
==============================================================================
---
hadoop/core/branches/branch-0.17/src/java/org/apache/hadoop/mapred/TaskRunner.java
(original)
+++
hadoop/core/branches/branch-0.17/src/java/org/apache/hadoop/mapred/TaskRunner.java
Tue Apr 29 15:47:38 2008
@@ -24,8 +24,10 @@
import org.apache.hadoop.fs.FileSystem;
import org.apache.hadoop.filecache.*;
import org.apache.hadoop.util.*;
+
import java.io.*;
import java.net.InetSocketAddress;
+import java.util.ArrayList;
import java.util.List;
import java.util.Vector;
import java.net.URI;
@@ -368,12 +370,22 @@
vargs.add(Integer.toString(address.getPort()));
vargs.add(taskid); // pass task identifier
+ // set memory limit using ulimit if feasible and necessary ...
+ String[] ulimitCmd = Shell.getUlimitMemoryCommand(conf);
+ List<String> setup = null;
+ if (ulimitCmd != null) {
+ setup = new ArrayList<String>();
+ for (String arg : ulimitCmd) {
+ setup.add(arg);
+ }
+ }
+
// Set up the redirection of the task's stdout and stderr streams
File stdout = TaskLog.getTaskLogFile(taskid, TaskLog.LogName.STDOUT);
File stderr = TaskLog.getTaskLogFile(taskid, TaskLog.LogName.STDERR);
stdout.getParentFile().mkdirs();
List<String> wrappedCommand =
- TaskLog.captureOutAndError(vargs, stdout, stderr, logSize);
+ TaskLog.captureOutAndError(setup, vargs, stdout, stderr, logSize);
// Run the task as child of the parent TaskTracker process
runChild(wrappedCommand, workDir, taskid);
Modified:
hadoop/core/branches/branch-0.17/src/java/org/apache/hadoop/mapred/pipes/Application.java
URL:
http://svn.apache.org/viewvc/hadoop/core/branches/branch-0.17/src/java/org/apache/hadoop/mapred/pipes/Application.java?rev=652182&r1=652181&r2=652182&view=diff
==============================================================================
---
hadoop/core/branches/branch-0.17/src/java/org/apache/hadoop/mapred/pipes/Application.java
(original)
+++
hadoop/core/branches/branch-0.17/src/java/org/apache/hadoop/mapred/pipes/Application.java
Tue Apr 29 15:47:38 2008
@@ -84,16 +84,8 @@
File stdout = TaskLog.getTaskLogFile(taskid, TaskLog.LogName.STDOUT);
File stderr = TaskLog.getTaskLogFile(taskid, TaskLog.LogName.STDERR);
long logLength = TaskLog.getTaskLogLength(conf);
- // set memory limit using ulimit.
- if (!WINDOWS) {
- List<String> setup = new ArrayList<String>();
- setup.add("ulimit");
- setup.add("-v");
- setup.add(String.valueOf(Runtime.getRuntime().maxMemory() / 1024));
- cmd = TaskLog.captureOutAndError(setup, cmd, stdout, stderr, logLength);
- } else {
- cmd = TaskLog.captureOutAndError(cmd, stdout, stderr, logLength);
- }
+ cmd = TaskLog.captureOutAndError(cmd, stdout, stderr, logLength);
+
process = runClient(cmd, env);
clientSocket = serverSocket.accept();
handler = new OutputHandler<K2, V2>(output, reporter);
Modified:
hadoop/core/branches/branch-0.17/src/java/org/apache/hadoop/util/Shell.java
URL:
http://svn.apache.org/viewvc/hadoop/core/branches/branch-0.17/src/java/org/apache/hadoop/util/Shell.java?rev=652182&r1=652181&r2=652182&view=diff
==============================================================================
--- hadoop/core/branches/branch-0.17/src/java/org/apache/hadoop/util/Shell.java
(original)
+++ hadoop/core/branches/branch-0.17/src/java/org/apache/hadoop/util/Shell.java
Tue Apr 29 15:47:38 2008
@@ -25,6 +25,7 @@
import org.apache.commons.logging.Log;
import org.apache.commons.logging.LogFactory;
+import org.apache.hadoop.mapred.JobConf;
/**
* A base class for running a Unix command.
@@ -54,6 +55,39 @@
return new String[] {(WINDOWS ? "ls" : "/bin/ls"), "-ld"};
}
+ /**
+ * Get the Unix command for setting the maximum virtual memory available
+ * to a given child process. This is only relevant when we are forking a
+ * process from within the [EMAIL PROTECTED]
org.apache.hadoop.mapred.Mapper} or the
+ * [EMAIL PROTECTED] org.apache.hadoop.mapred.Reducer} implementations
+ * e.g. <a href="[EMAIL
PROTECTED]/org/apache/hadoop/mapred/pipes/package-summary.html">Hadoop
Pipes</a>
+ * or <a href="[EMAIL
PROTECTED]/org/apache/hadoop/streaming/package-summary.html">Hadoop
Streaming</a>.
+ *
+ * It also checks to ensure that we are running on a *nix platform else
+ * (e.g. in Cygwin/Windows) it returns <code>null</code>.
+ * @param job job configuration
+ * @return a <code>String[]</code> with the ulimit command arguments or
+ * <code>null</code> if we are running on a non *nix platform or
+ * if the limit is unspecified.
+ */
+ public static String[] getUlimitMemoryCommand(JobConf job) {
+ // ulimit isn't supported on Windows
+ if (WINDOWS) {
+ return null;
+ }
+
+ // get the memory limit from the JobConf
+ String ulimit = job.get("mapred.child.ulimit");
+ if (ulimit == null) {
+ return null;
+ }
+
+ // Parse it to ensure it is legal/sane
+ int memoryLimit = Integer.valueOf(ulimit);
+
+ return new String[] {"ulimit", "-v", String.valueOf(memoryLimit)};
+ }
+
/** Set to true on Windows platforms */
public static final boolean WINDOWS /* borrowed from Path.WINDOWS */
= System.getProperty("os.name").startsWith("Windows");