Author: ryota
Date: Sat Aug 31 08:45:42 2013
New Revision: 1519159

URL: http://svn.apache.org/r1519159
Log:
OOZIE-1372 When using uber mode, Oozie should also make the AM container size 
larger (ryota)

Modified:
    
oozie/trunk/core/src/main/java/org/apache/oozie/action/hadoop/JavaActionExecutor.java
    
oozie/trunk/core/src/test/java/org/apache/oozie/action/hadoop/TestJavaActionExecutor.java
    oozie/trunk/release-log.txt

Modified: 
oozie/trunk/core/src/main/java/org/apache/oozie/action/hadoop/JavaActionExecutor.java
URL: 
http://svn.apache.org/viewvc/oozie/trunk/core/src/main/java/org/apache/oozie/action/hadoop/JavaActionExecutor.java?rev=1519159&r1=1519158&r2=1519159&view=diff
==============================================================================
--- 
oozie/trunk/core/src/main/java/org/apache/oozie/action/hadoop/JavaActionExecutor.java
 (original)
+++ 
oozie/trunk/core/src/main/java/org/apache/oozie/action/hadoop/JavaActionExecutor.java
 Sat Aug 31 08:45:42 2013
@@ -36,6 +36,8 @@ import java.util.Map;
 import java.util.Properties;
 import java.util.Set;
 import java.util.Map.Entry;
+import java.util.regex.Matcher;
+import java.util.regex.Pattern;
 
 import org.apache.hadoop.conf.Configuration;
 import org.apache.hadoop.filecache.DistributedCache;
@@ -89,6 +91,15 @@ public class JavaActionExecutor extends 
     public static final String ACL_MODIFY_JOB = "mapreduce.job.acl-modify-job";
     private static final String HADOOP_YARN_UBER_MODE = 
"mapreduce.job.ubertask.enable";
     public static final String OOZIE_ACTION_SHIP_LAUNCHER_JAR = 
"oozie.action.ship.launcher.jar";
+    public static final String HADOOP_MAP_MEMORY_MB = 
"mapreduce.map.memory.mb";
+    public static final String HADOOP_CHILD_JAVA_OPTS = 
"mapred.child.java.opts";
+    public static final String HADOOP_MAP_JAVA_OPTS = 
"mapreduce.map.java.opts";
+    public static final String HADOOP_CHILD_JAVA_ENV = "mapred.child.env";
+    public static final String HADOOP_MAP_JAVA_ENV = "mapreduce.map.env";
+    public static final String YARN_AM_RESOURCE_MB = 
"yarn.app.mapreduce.am.resource.mb";
+    public static final String YARN_AM_COMMAND_OPTS = 
"yarn.app.mapreduce.am.command-opts";
+    public static final String YARN_AM_ENV = "yarn.app.mapreduce.am.env";
+    public static final int YARN_MEMORY_MB_MIN = 512;
     private boolean useLauncherJar;
     private static int maxActionOutputLen;
     private static int maxExternalStatsSize;
@@ -99,6 +110,7 @@ public class JavaActionExecutor extends 
     private static final String FAILED_KILLED = "FAILED/KILLED";
     private static final String RUNNING = "RUNNING";
     protected XLog log = XLog.getLog(getClass());
+    private static final Pattern heapPattern = 
Pattern.compile("-Xmx(([0-9]+)[mMgG])");
 
     static {
         DISALLOWED_PROPERTIES.add(HADOOP_USER);
@@ -263,10 +275,95 @@ public class JavaActionExecutor extends 
         if (launcherConf.get(HADOOP_YARN_UBER_MODE) == null) {
             if 
(getOozieConf().getBoolean("oozie.action.launcher.mapreduce.job.ubertask.enable",
 false)) {
                 launcherConf.setBoolean(HADOOP_YARN_UBER_MODE, true);
+                updateConfForUberMode(launcherConf);
             }
         }
     }
 
+    void updateConfForUberMode(Configuration launcherConf) {
+
+        // memory.mb
+        int launcherMapMemoryMB = launcherConf.getInt(HADOOP_MAP_MEMORY_MB, 
1536);
+        int amMemoryMB = launcherConf.getInt(YARN_AM_RESOURCE_MB, 1536);
+        // YARN_MEMORY_MB_MIN to provide buffer.
+        // suppose launcher map aggressively use high memory, need some 
headroom for AM
+        int memoryMB = Math.max(launcherMapMemoryMB, amMemoryMB) + 
YARN_MEMORY_MB_MIN;
+        // limit to 4096 in case of 32 bit
+        if(launcherMapMemoryMB < 4096 && amMemoryMB < 4096 && memoryMB > 4096){
+            memoryMB = 4096;
+        }
+        launcherConf.setInt(YARN_AM_RESOURCE_MB, memoryMB);
+
+        // child.java.opts
+        String launcherMapOpts = launcherConf.get(HADOOP_MAP_JAVA_OPTS);
+        if (launcherMapOpts == null) {
+            launcherMapOpts = launcherConf.get(HADOOP_CHILD_JAVA_OPTS);
+        }
+        String amChildOpts = launcherConf.get(YARN_AM_COMMAND_OPTS);
+        StringBuffer optsStr = new StringBuffer();
+        int heapSizeForMap = extractHeapSizeMB(launcherMapOpts);
+        int heapSizeForAm = extractHeapSizeMB(amChildOpts);
+        int heapSize = Math.max(heapSizeForMap, heapSizeForAm) + 
YARN_MEMORY_MB_MIN;
+        // limit to 3584 in case of 32 bit
+        if(heapSizeForMap < 4096 && heapSizeForAm < 4096 && heapSize > 3584) {
+            heapSize = 3584;
+        }
+        if (amChildOpts != null) {
+            optsStr.append(amChildOpts);
+        }
+        if (launcherMapOpts != null) {
+            optsStr.append(" ");
+            optsStr.append(launcherMapOpts);
+        }
+        if (heapSize > 0) {
+            // append calculated total heap size to the end
+            optsStr.append(" ");
+            optsStr.append("-Xmx" + heapSize + "m");
+        }
+        launcherConf.set(YARN_AM_COMMAND_OPTS, optsStr.toString());
+
+        // child.env
+        String launcherMapEnv = launcherConf.get(HADOOP_MAP_JAVA_ENV);
+        if (launcherMapEnv == null) {
+            launcherMapEnv = launcherConf.get(HADOOP_CHILD_JAVA_ENV);
+        }
+        String envForAm = launcherConf.get(YARN_AM_ENV);
+        StringBuffer envStr = new StringBuffer();
+        if (envForAm != null) {
+            envStr.append(envForAm);
+        }
+        if (launcherMapEnv != null) {
+            if (envForAm != null) {
+                envStr.append(",");
+            }
+            envStr.append(launcherMapEnv);
+        }
+        launcherConf.set(YARN_AM_ENV, envStr.toString());
+    }
+
+    public int extractHeapSizeMB(String input) {
+        int ret = 0;
+        if(input == null || input.equals(""))
+            return ret;
+        Matcher m = heapPattern.matcher(input);
+        String heapStr = null;
+        String heapNum = null;
+        // Grabs the last match which takes effect (in case that multiple Xmx 
options specified)
+        while (m.find()) {
+            heapStr = m.group(1);
+            heapNum = m.group(2);
+        }
+        if (heapStr != null) {
+            // when Xmx specified in Gigabyte
+            if(heapStr.endsWith("g") || heapStr.endsWith("G")) {
+                ret = Integer.parseInt(heapNum) * 1024;
+            } else {
+                ret = Integer.parseInt(heapNum);
+            }
+        }
+        return ret;
+    }
+
     public static void parseJobXmlAndConfiguration(Context context, Element 
element, Path appPath, Configuration conf)
             throws IOException, ActionExecutorException, 
HadoopAccessorException, URISyntaxException {
         Namespace ns = element.getNamespace();
@@ -983,7 +1080,7 @@ public class JavaActionExecutor extends 
      *
      * @param context
      * @param jobConf
-     * @return
+     * @return JobClient
      * @throws HadoopAccessorException
      */
     protected JobClient createJobClient(Context context, JobConf jobConf) 
throws HadoopAccessorException {

Modified: 
oozie/trunk/core/src/test/java/org/apache/oozie/action/hadoop/TestJavaActionExecutor.java
URL: 
http://svn.apache.org/viewvc/oozie/trunk/core/src/test/java/org/apache/oozie/action/hadoop/TestJavaActionExecutor.java?rev=1519159&r1=1519158&r2=1519159&view=diff
==============================================================================
--- 
oozie/trunk/core/src/test/java/org/apache/oozie/action/hadoop/TestJavaActionExecutor.java
 (original)
+++ 
oozie/trunk/core/src/test/java/org/apache/oozie/action/hadoop/TestJavaActionExecutor.java
 Sat Aug 31 08:45:42 2013
@@ -30,6 +30,7 @@ import java.util.Date;
 import java.util.HashMap;
 import java.util.List;
 import java.util.Properties;
+import java.util.regex.Pattern;
 
 import org.apache.hadoop.conf.Configuration;
 import org.apache.hadoop.filecache.DistributedCache;
@@ -1486,7 +1487,8 @@ public class TestJavaActionExecutor exte
     }
 
     public void testInjectLauncherUseUberMode() throws Exception {
-        // TODO: Delete these two lines once uber mode is set back to the 
default (OOZIE-1385)
+        // TODO: Delete these two lines once uber mode is set back to the
+        // default (OOZIE-1385)
         
assertFalse(Services.get().getConf().getBoolean("oozie.action.launcher.mapreduce.job.ubertask.enable",
 true));
         
Services.get().getConf().setBoolean("oozie.action.launcher.mapreduce.job.ubertask.enable",
 true);
 
@@ -1511,7 +1513,8 @@ public class TestJavaActionExecutor exte
         jae.injectLauncherUseUberMode(conf);
         assertEquals("false", conf.get("mapreduce.job.ubertask.enable"));
 
-        // disable at oozie-site level (default is to be enabled) -- redo 
above tests
+        // disable at oozie-site level (default is to be enabled) -- redo above
+        // tests
         
Services.get().getConf().setBoolean("oozie.action.launcher.mapreduce.job.ubertask.enable",
 false);
 
         // default -- should not set
@@ -1535,6 +1538,116 @@ public class TestJavaActionExecutor exte
         assertEquals("false", conf.get("mapreduce.job.ubertask.enable"));
     }
 
+    public void testUpdateConfForUberMode() throws Exception {
+
+        
Services.get().getConf().setBoolean("oozie.action.launcher.mapreduce.job.ubertask.enable",
 true);
+
+        Element actionXml1 = XmlUtils
+                .parseXml("<java>"
+                        + "<job-tracker>"
+                        + getJobTrackerUri()
+                        + "</job-tracker>"
+                        + "<name-node>"
+                        + getNameNodeUri()
+                        + "</name-node>"
+                        + "<configuration>"
+                        + 
"<property><name>oozie.launcher.mapreduce.map.memory.mb</name><value>2048</value></property>"
+                        + 
"<property><name>oozie.launcher.mapred.child.java.opts</name>"
+                        + "<value>-Xmx2048m 
-Djava.net.preferIPv4Stack=true</value></property>"
+                        + 
"<property><name>oozie.launcher.mapred.child.env</name><value>A=foo</value></property>"
+                        + "</configuration>" + 
"<main-class>MAIN-CLASS</main-class>" + "</java>");
+        JavaActionExecutor ae = new JavaActionExecutor();
+        XConfiguration protoConf = new XConfiguration();
+        protoConf.set(WorkflowAppService.HADOOP_USER, getTestUser());
+
+        WorkflowJobBean wf = createBaseWorkflow(protoConf, "action");
+        WorkflowActionBean action = (WorkflowActionBean) 
wf.getActions().get(0);
+        action.setType(ae.getType());
+
+        Context context = new Context(wf, action);
+        JobConf launcherConf = ae.createBaseHadoopConf(context, actionXml1);
+        ae.setupLauncherConf(launcherConf, actionXml1, getFsTestCaseDir(), 
context);
+        // memoryMB (2048 + 512)
+        assertEquals("2560", 
launcherConf.get(JavaActionExecutor.YARN_AM_RESOURCE_MB));
+        // heap size in child.opts (2048 + 512)
+        int heapSize = 
ae.extractHeapSizeMB(launcherConf.get(JavaActionExecutor.YARN_AM_COMMAND_OPTS));
+        assertEquals("-Xmx2048m -Djava.net.preferIPv4Stack=true -Xmx2560m",
+                
launcherConf.get(JavaActionExecutor.YARN_AM_COMMAND_OPTS).trim());
+
+        // env
+        assertEquals("A=foo", 
launcherConf.get(JavaActionExecutor.YARN_AM_ENV));
+
+        Element actionXml2 = XmlUtils
+                .parseXml("<java>"
+                        + "<job-tracker>"
+                        + getJobTrackerUri()
+                        + "</job-tracker>"
+                        + "<name-node>"
+                        + getNameNodeUri()
+                        + "</name-node>"
+                        + "<configuration>"
+                        + 
"<property><name>oozie.launcher.yarn.app.mapreduce.am.resource.mb</name><value>3072</value></property>"
+                        + 
"<property><name>oozie.launcher.mapreduce.map.memory.mb</name><value>2048</value></property>"
+                        + 
"<property><name>oozie.launcher.yarn.app.mapreduce.am.command-opts</name>"
+                        + "<value>-Xmx1024m -Djava.net.preferIPv4Stack=true 
</value></property>"
+                        + 
"<property><name>oozie.launcher.mapred.child.java.opts</name><value>-Xmx1536m</value></property>"
+                        + 
"<property><name>oozie.launcher.mapreduce.map.java.opts</name>"
+                        + "<value>-Xmx2560m -XX:NewRatio=8</value></property>"
+                        + 
"<property><name>oozie.launcher.yarn.app.mapreduce.am.env</name><value>A=foo</value></property>"
+                        + 
"<property><name>oozie.launcher.mapred.child.env</name><value>B=bar</value></property>"
+                        + "</configuration>" + 
"<main-class>MAIN-CLASS</main-class>" + "</java>");
+
+        launcherConf = ae.createBaseHadoopConf(context, actionXml2);
+        ae.setupLauncherConf(launcherConf, actionXml2, getFsTestCaseDir(), 
context);
+
+        // memoryMB (3072 + 512)
+        assertEquals("3584", 
launcherConf.get(JavaActionExecutor.YARN_AM_RESOURCE_MB));
+
+        // heap size (2560 + 512)
+        heapSize = 
ae.extractHeapSizeMB(launcherConf.get(JavaActionExecutor.YARN_AM_COMMAND_OPTS));
+        assertEquals("-Xmx1024m -Djava.net.preferIPv4Stack=true -Xmx2560m 
-XX:NewRatio=8 -Xmx3072m",
+                
launcherConf.get(JavaActionExecutor.YARN_AM_COMMAND_OPTS).trim());
+        assertEquals(3072, heapSize);
+
+        // env (equqls to mapreduce.map.env + am.env)
+        
assertTrue(launcherConf.get(JavaActionExecutor.YARN_AM_ENV).trim().equals("A=foo,B=bar"));
+
+        // Test limit is applied in case of 32 bit
+        Element actionXml3 = XmlUtils
+                .parseXml("<java>"
+                        + "<job-tracker>"
+                        + getJobTrackerUri()
+                        + "</job-tracker>"
+                        + "<name-node>"
+                        + getNameNodeUri()
+                        + "</name-node>"
+                        + "<configuration>"
+                        + 
"<property><name>oozie.launcher.yarn.app.mapreduce.am.resource.mb</name><value>3072</value></property>"
+                        + 
"<property><name>oozie.launcher.mapreduce.map.memory.mb</name><value>4000</value></property>"
+                        + 
"<property><name>oozie.launcher.yarn.app.mapreduce.am.command-opts</name>"
+                        + "<value>-Xmx1024m 
-Djava.net.preferIPv4Stack=true</value></property>"
+                        + 
"<property><name>oozie.launcher.mapred.child.java.opts</name><value>-Xmx1536m</value></property>"
+                        + 
"<property><name>oozie.launcher.mapreduce.map.java.opts</name>"
+                        + "<value>-Xmx4000m -XX:NewRatio=8</value></property>"
+                        + 
"<property><name>oozie.launcher.yarn.app.mapreduce.am.env</name><value>A=foo</value></property>"
+                        + 
"<property><name>oozie.launcher.mapred.child.env</name><value>B=bar</value></property>"
+                        + "</configuration>" + 
"<main-class>MAIN-CLASS</main-class>" + "</java>");
+
+        launcherConf = ae.createBaseHadoopConf(context, actionXml3);
+        ae.setupLauncherConf(launcherConf, actionXml3, getFsTestCaseDir(), 
context);
+
+        // memoryMB (limit to 4096)
+        assertEquals("4096", 
launcherConf.get(JavaActionExecutor.YARN_AM_RESOURCE_MB));
+
+        // heap size (limit to 3584)
+        heapSize = 
ae.extractHeapSizeMB(launcherConf.get(JavaActionExecutor.YARN_AM_COMMAND_OPTS));
+        assertEquals("-Xmx1024m -Djava.net.preferIPv4Stack=true -Xmx4000m 
-XX:NewRatio=8 -Xmx3584m",
+                
launcherConf.get(JavaActionExecutor.YARN_AM_COMMAND_OPTS).trim());
+
+        // env (equqls to mapreduce.map.env + am.env)
+        assertEquals("A=foo,B=bar", 
launcherConf.get(JavaActionExecutor.YARN_AM_ENV));
+    }
+
     public void testAddToCache() throws Exception {
         JavaActionExecutor ae = new JavaActionExecutor();
         Configuration conf = new XConfiguration();

Modified: oozie/trunk/release-log.txt
URL: 
http://svn.apache.org/viewvc/oozie/trunk/release-log.txt?rev=1519159&r1=1519158&r2=1519159&view=diff
==============================================================================
--- oozie/trunk/release-log.txt (original)
+++ oozie/trunk/release-log.txt Sat Aug 31 08:45:42 2013
@@ -1,5 +1,6 @@
 -- Oozie 4.1.0 release (trunk - unreleased)
 
+OOZIE-1372 When using uber mode, Oozie should also make the AM container size 
larger (ryota)
 OOZIE-615 Support high availability for the Oozie service (rkanter)
 OOZIE-1486 Cut down on number of small files created to track a running action 
(mona)
 OOZIE-1476 Add ability to issue kill on Coordinator Action directly with id 
and nominal daterange (mona)


Reply via email to