Author: ryota
Date: Sat Aug 31 08:45:42 2013
New Revision: 1519159
URL: http://svn.apache.org/r1519159
Log:
OOZIE-1372 When using uber mode, Oozie should also make the AM container size
larger (ryota)
Modified:
oozie/trunk/core/src/main/java/org/apache/oozie/action/hadoop/JavaActionExecutor.java
oozie/trunk/core/src/test/java/org/apache/oozie/action/hadoop/TestJavaActionExecutor.java
oozie/trunk/release-log.txt
Modified:
oozie/trunk/core/src/main/java/org/apache/oozie/action/hadoop/JavaActionExecutor.java
URL:
http://svn.apache.org/viewvc/oozie/trunk/core/src/main/java/org/apache/oozie/action/hadoop/JavaActionExecutor.java?rev=1519159&r1=1519158&r2=1519159&view=diff
==============================================================================
---
oozie/trunk/core/src/main/java/org/apache/oozie/action/hadoop/JavaActionExecutor.java
(original)
+++
oozie/trunk/core/src/main/java/org/apache/oozie/action/hadoop/JavaActionExecutor.java
Sat Aug 31 08:45:42 2013
@@ -36,6 +36,8 @@ import java.util.Map;
import java.util.Properties;
import java.util.Set;
import java.util.Map.Entry;
+import java.util.regex.Matcher;
+import java.util.regex.Pattern;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.filecache.DistributedCache;
@@ -89,6 +91,15 @@ public class JavaActionExecutor extends
public static final String ACL_MODIFY_JOB = "mapreduce.job.acl-modify-job";
private static final String HADOOP_YARN_UBER_MODE =
"mapreduce.job.ubertask.enable";
public static final String OOZIE_ACTION_SHIP_LAUNCHER_JAR =
"oozie.action.ship.launcher.jar";
+ public static final String HADOOP_MAP_MEMORY_MB =
"mapreduce.map.memory.mb";
+ public static final String HADOOP_CHILD_JAVA_OPTS =
"mapred.child.java.opts";
+ public static final String HADOOP_MAP_JAVA_OPTS =
"mapreduce.map.java.opts";
+ public static final String HADOOP_CHILD_JAVA_ENV = "mapred.child.env";
+ public static final String HADOOP_MAP_JAVA_ENV = "mapreduce.map.env";
+ public static final String YARN_AM_RESOURCE_MB =
"yarn.app.mapreduce.am.resource.mb";
+ public static final String YARN_AM_COMMAND_OPTS =
"yarn.app.mapreduce.am.command-opts";
+ public static final String YARN_AM_ENV = "yarn.app.mapreduce.am.env";
+ public static final int YARN_MEMORY_MB_MIN = 512;
private boolean useLauncherJar;
private static int maxActionOutputLen;
private static int maxExternalStatsSize;
@@ -99,6 +110,7 @@ public class JavaActionExecutor extends
private static final String FAILED_KILLED = "FAILED/KILLED";
private static final String RUNNING = "RUNNING";
protected XLog log = XLog.getLog(getClass());
+ private static final Pattern heapPattern =
Pattern.compile("-Xmx(([0-9]+)[mMgG])");
static {
DISALLOWED_PROPERTIES.add(HADOOP_USER);
@@ -263,10 +275,95 @@ public class JavaActionExecutor extends
if (launcherConf.get(HADOOP_YARN_UBER_MODE) == null) {
if
(getOozieConf().getBoolean("oozie.action.launcher.mapreduce.job.ubertask.enable",
false)) {
launcherConf.setBoolean(HADOOP_YARN_UBER_MODE, true);
+ updateConfForUberMode(launcherConf);
}
}
}
+ void updateConfForUberMode(Configuration launcherConf) {
+
+ // memory.mb
+ int launcherMapMemoryMB = launcherConf.getInt(HADOOP_MAP_MEMORY_MB,
1536);
+ int amMemoryMB = launcherConf.getInt(YARN_AM_RESOURCE_MB, 1536);
+ // YARN_MEMORY_MB_MIN to provide buffer.
+ // suppose launcher map aggressively use high memory, need some
headroom for AM
+ int memoryMB = Math.max(launcherMapMemoryMB, amMemoryMB) +
YARN_MEMORY_MB_MIN;
+ // limit to 4096 in case of 32 bit
+ if(launcherMapMemoryMB < 4096 && amMemoryMB < 4096 && memoryMB > 4096){
+ memoryMB = 4096;
+ }
+ launcherConf.setInt(YARN_AM_RESOURCE_MB, memoryMB);
+
+ // child.java.opts
+ String launcherMapOpts = launcherConf.get(HADOOP_MAP_JAVA_OPTS);
+ if (launcherMapOpts == null) {
+ launcherMapOpts = launcherConf.get(HADOOP_CHILD_JAVA_OPTS);
+ }
+ String amChildOpts = launcherConf.get(YARN_AM_COMMAND_OPTS);
+ StringBuffer optsStr = new StringBuffer();
+ int heapSizeForMap = extractHeapSizeMB(launcherMapOpts);
+ int heapSizeForAm = extractHeapSizeMB(amChildOpts);
+ int heapSize = Math.max(heapSizeForMap, heapSizeForAm) +
YARN_MEMORY_MB_MIN;
+ // limit to 3584 in case of 32 bit
+ if(heapSizeForMap < 4096 && heapSizeForAm < 4096 && heapSize > 3584) {
+ heapSize = 3584;
+ }
+ if (amChildOpts != null) {
+ optsStr.append(amChildOpts);
+ }
+ if (launcherMapOpts != null) {
+ optsStr.append(" ");
+ optsStr.append(launcherMapOpts);
+ }
+ if (heapSize > 0) {
+ // append calculated total heap size to the end
+ optsStr.append(" ");
+ optsStr.append("-Xmx" + heapSize + "m");
+ }
+ launcherConf.set(YARN_AM_COMMAND_OPTS, optsStr.toString());
+
+ // child.env
+ String launcherMapEnv = launcherConf.get(HADOOP_MAP_JAVA_ENV);
+ if (launcherMapEnv == null) {
+ launcherMapEnv = launcherConf.get(HADOOP_CHILD_JAVA_ENV);
+ }
+ String envForAm = launcherConf.get(YARN_AM_ENV);
+ StringBuffer envStr = new StringBuffer();
+ if (envForAm != null) {
+ envStr.append(envForAm);
+ }
+ if (launcherMapEnv != null) {
+ if (envForAm != null) {
+ envStr.append(",");
+ }
+ envStr.append(launcherMapEnv);
+ }
+ launcherConf.set(YARN_AM_ENV, envStr.toString());
+ }
+
+ public int extractHeapSizeMB(String input) {
+ int ret = 0;
+ if(input == null || input.equals(""))
+ return ret;
+ Matcher m = heapPattern.matcher(input);
+ String heapStr = null;
+ String heapNum = null;
+ // Grabs the last match which takes effect (in case that multiple Xmx
options specified)
+ while (m.find()) {
+ heapStr = m.group(1);
+ heapNum = m.group(2);
+ }
+ if (heapStr != null) {
+ // when Xmx specified in Gigabyte
+ if(heapStr.endsWith("g") || heapStr.endsWith("G")) {
+ ret = Integer.parseInt(heapNum) * 1024;
+ } else {
+ ret = Integer.parseInt(heapNum);
+ }
+ }
+ return ret;
+ }
+
public static void parseJobXmlAndConfiguration(Context context, Element
element, Path appPath, Configuration conf)
throws IOException, ActionExecutorException,
HadoopAccessorException, URISyntaxException {
Namespace ns = element.getNamespace();
@@ -983,7 +1080,7 @@ public class JavaActionExecutor extends
*
* @param context
* @param jobConf
- * @return
+ * @return JobClient
* @throws HadoopAccessorException
*/
protected JobClient createJobClient(Context context, JobConf jobConf)
throws HadoopAccessorException {
Modified:
oozie/trunk/core/src/test/java/org/apache/oozie/action/hadoop/TestJavaActionExecutor.java
URL:
http://svn.apache.org/viewvc/oozie/trunk/core/src/test/java/org/apache/oozie/action/hadoop/TestJavaActionExecutor.java?rev=1519159&r1=1519158&r2=1519159&view=diff
==============================================================================
---
oozie/trunk/core/src/test/java/org/apache/oozie/action/hadoop/TestJavaActionExecutor.java
(original)
+++
oozie/trunk/core/src/test/java/org/apache/oozie/action/hadoop/TestJavaActionExecutor.java
Sat Aug 31 08:45:42 2013
@@ -30,6 +30,7 @@ import java.util.Date;
import java.util.HashMap;
import java.util.List;
import java.util.Properties;
+import java.util.regex.Pattern;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.filecache.DistributedCache;
@@ -1486,7 +1487,8 @@ public class TestJavaActionExecutor exte
}
public void testInjectLauncherUseUberMode() throws Exception {
- // TODO: Delete these two lines once uber mode is set back to the
default (OOZIE-1385)
+ // TODO: Delete these two lines once uber mode is set back to the
+ // default (OOZIE-1385)
assertFalse(Services.get().getConf().getBoolean("oozie.action.launcher.mapreduce.job.ubertask.enable",
true));
Services.get().getConf().setBoolean("oozie.action.launcher.mapreduce.job.ubertask.enable",
true);
@@ -1511,7 +1513,8 @@ public class TestJavaActionExecutor exte
jae.injectLauncherUseUberMode(conf);
assertEquals("false", conf.get("mapreduce.job.ubertask.enable"));
- // disable at oozie-site level (default is to be enabled) -- redo
above tests
+ // disable at oozie-site level (default is to be enabled) -- redo above
+ // tests
Services.get().getConf().setBoolean("oozie.action.launcher.mapreduce.job.ubertask.enable",
false);
// default -- should not set
@@ -1535,6 +1538,116 @@ public class TestJavaActionExecutor exte
assertEquals("false", conf.get("mapreduce.job.ubertask.enable"));
}
+ public void testUpdateConfForUberMode() throws Exception {
+
+
Services.get().getConf().setBoolean("oozie.action.launcher.mapreduce.job.ubertask.enable",
true);
+
+ Element actionXml1 = XmlUtils
+ .parseXml("<java>"
+ + "<job-tracker>"
+ + getJobTrackerUri()
+ + "</job-tracker>"
+ + "<name-node>"
+ + getNameNodeUri()
+ + "</name-node>"
+ + "<configuration>"
+ +
"<property><name>oozie.launcher.mapreduce.map.memory.mb</name><value>2048</value></property>"
+ +
"<property><name>oozie.launcher.mapred.child.java.opts</name>"
+ + "<value>-Xmx2048m
-Djava.net.preferIPv4Stack=true</value></property>"
+ +
"<property><name>oozie.launcher.mapred.child.env</name><value>A=foo</value></property>"
+ + "</configuration>" +
"<main-class>MAIN-CLASS</main-class>" + "</java>");
+ JavaActionExecutor ae = new JavaActionExecutor();
+ XConfiguration protoConf = new XConfiguration();
+ protoConf.set(WorkflowAppService.HADOOP_USER, getTestUser());
+
+ WorkflowJobBean wf = createBaseWorkflow(protoConf, "action");
+ WorkflowActionBean action = (WorkflowActionBean)
wf.getActions().get(0);
+ action.setType(ae.getType());
+
+ Context context = new Context(wf, action);
+ JobConf launcherConf = ae.createBaseHadoopConf(context, actionXml1);
+ ae.setupLauncherConf(launcherConf, actionXml1, getFsTestCaseDir(),
context);
+ // memoryMB (2048 + 512)
+ assertEquals("2560",
launcherConf.get(JavaActionExecutor.YARN_AM_RESOURCE_MB));
+ // heap size in child.opts (2048 + 512)
+ int heapSize =
ae.extractHeapSizeMB(launcherConf.get(JavaActionExecutor.YARN_AM_COMMAND_OPTS));
+ assertEquals("-Xmx2048m -Djava.net.preferIPv4Stack=true -Xmx2560m",
+
launcherConf.get(JavaActionExecutor.YARN_AM_COMMAND_OPTS).trim());
+
+ // env
+ assertEquals("A=foo",
launcherConf.get(JavaActionExecutor.YARN_AM_ENV));
+
+ Element actionXml2 = XmlUtils
+ .parseXml("<java>"
+ + "<job-tracker>"
+ + getJobTrackerUri()
+ + "</job-tracker>"
+ + "<name-node>"
+ + getNameNodeUri()
+ + "</name-node>"
+ + "<configuration>"
+ +
"<property><name>oozie.launcher.yarn.app.mapreduce.am.resource.mb</name><value>3072</value></property>"
+ +
"<property><name>oozie.launcher.mapreduce.map.memory.mb</name><value>2048</value></property>"
+ +
"<property><name>oozie.launcher.yarn.app.mapreduce.am.command-opts</name>"
+ + "<value>-Xmx1024m -Djava.net.preferIPv4Stack=true
</value></property>"
+ +
"<property><name>oozie.launcher.mapred.child.java.opts</name><value>-Xmx1536m</value></property>"
+ +
"<property><name>oozie.launcher.mapreduce.map.java.opts</name>"
+ + "<value>-Xmx2560m -XX:NewRatio=8</value></property>"
+ +
"<property><name>oozie.launcher.yarn.app.mapreduce.am.env</name><value>A=foo</value></property>"
+ +
"<property><name>oozie.launcher.mapred.child.env</name><value>B=bar</value></property>"
+ + "</configuration>" +
"<main-class>MAIN-CLASS</main-class>" + "</java>");
+
+ launcherConf = ae.createBaseHadoopConf(context, actionXml2);
+ ae.setupLauncherConf(launcherConf, actionXml2, getFsTestCaseDir(),
context);
+
+ // memoryMB (3072 + 512)
+ assertEquals("3584",
launcherConf.get(JavaActionExecutor.YARN_AM_RESOURCE_MB));
+
+ // heap size (2560 + 512)
+ heapSize =
ae.extractHeapSizeMB(launcherConf.get(JavaActionExecutor.YARN_AM_COMMAND_OPTS));
+ assertEquals("-Xmx1024m -Djava.net.preferIPv4Stack=true -Xmx2560m
-XX:NewRatio=8 -Xmx3072m",
+
launcherConf.get(JavaActionExecutor.YARN_AM_COMMAND_OPTS).trim());
+ assertEquals(3072, heapSize);
+
+ // env (equqls to mapreduce.map.env + am.env)
+
assertTrue(launcherConf.get(JavaActionExecutor.YARN_AM_ENV).trim().equals("A=foo,B=bar"));
+
+ // Test limit is applied in case of 32 bit
+ Element actionXml3 = XmlUtils
+ .parseXml("<java>"
+ + "<job-tracker>"
+ + getJobTrackerUri()
+ + "</job-tracker>"
+ + "<name-node>"
+ + getNameNodeUri()
+ + "</name-node>"
+ + "<configuration>"
+ +
"<property><name>oozie.launcher.yarn.app.mapreduce.am.resource.mb</name><value>3072</value></property>"
+ +
"<property><name>oozie.launcher.mapreduce.map.memory.mb</name><value>4000</value></property>"
+ +
"<property><name>oozie.launcher.yarn.app.mapreduce.am.command-opts</name>"
+ + "<value>-Xmx1024m
-Djava.net.preferIPv4Stack=true</value></property>"
+ +
"<property><name>oozie.launcher.mapred.child.java.opts</name><value>-Xmx1536m</value></property>"
+ +
"<property><name>oozie.launcher.mapreduce.map.java.opts</name>"
+ + "<value>-Xmx4000m -XX:NewRatio=8</value></property>"
+ +
"<property><name>oozie.launcher.yarn.app.mapreduce.am.env</name><value>A=foo</value></property>"
+ +
"<property><name>oozie.launcher.mapred.child.env</name><value>B=bar</value></property>"
+ + "</configuration>" +
"<main-class>MAIN-CLASS</main-class>" + "</java>");
+
+ launcherConf = ae.createBaseHadoopConf(context, actionXml3);
+ ae.setupLauncherConf(launcherConf, actionXml3, getFsTestCaseDir(),
context);
+
+ // memoryMB (limit to 4096)
+ assertEquals("4096",
launcherConf.get(JavaActionExecutor.YARN_AM_RESOURCE_MB));
+
+ // heap size (limit to 3584)
+ heapSize =
ae.extractHeapSizeMB(launcherConf.get(JavaActionExecutor.YARN_AM_COMMAND_OPTS));
+ assertEquals("-Xmx1024m -Djava.net.preferIPv4Stack=true -Xmx4000m
-XX:NewRatio=8 -Xmx3584m",
+
launcherConf.get(JavaActionExecutor.YARN_AM_COMMAND_OPTS).trim());
+
+ // env (equqls to mapreduce.map.env + am.env)
+ assertEquals("A=foo,B=bar",
launcherConf.get(JavaActionExecutor.YARN_AM_ENV));
+ }
+
public void testAddToCache() throws Exception {
JavaActionExecutor ae = new JavaActionExecutor();
Configuration conf = new XConfiguration();
Modified: oozie/trunk/release-log.txt
URL:
http://svn.apache.org/viewvc/oozie/trunk/release-log.txt?rev=1519159&r1=1519158&r2=1519159&view=diff
==============================================================================
--- oozie/trunk/release-log.txt (original)
+++ oozie/trunk/release-log.txt Sat Aug 31 08:45:42 2013
@@ -1,5 +1,6 @@
-- Oozie 4.1.0 release (trunk - unreleased)
+OOZIE-1372 When using uber mode, Oozie should also make the AM container size
larger (ryota)
OOZIE-615 Support high availability for the Oozie service (rkanter)
OOZIE-1486 Cut down on number of small files created to track a running action
(mona)
OOZIE-1476 Add ability to issue kill on Coordinator Action directly with id
and nominal daterange (mona)