Repository: oozie
Updated Branches:
  refs/heads/branch-4.1 f4fdc0e9c -> f94b01b07


OOZIE-1958 address duplication of env variables in 
oozie.launcher.yarn.app.mapreduce.am.env when running with uber mode (ryota)


Project: http://git-wip-us.apache.org/repos/asf/oozie/repo
Commit: http://git-wip-us.apache.org/repos/asf/oozie/commit/f94b01b0
Tree: http://git-wip-us.apache.org/repos/asf/oozie/tree/f94b01b0
Diff: http://git-wip-us.apache.org/repos/asf/oozie/diff/f94b01b0

Branch: refs/heads/branch-4.1
Commit: f94b01b07e2e4962d4a7097406a594b75096a977
Parents: f4fdc0e
Author: egashira <[email protected]>
Authored: Mon Aug 11 12:33:43 2014 -0700
Committer: egashira <[email protected]>
Committed: Mon Aug 11 12:33:43 2014 -0700

----------------------------------------------------------------------
 .../oozie/action/hadoop/JavaActionExecutor.java | 142 +++++++++++++------
 .../action/hadoop/TestJavaActionExecutor.java   |  69 +++++++++
 release-log.txt                                 |   1 +
 3 files changed, 171 insertions(+), 41 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/oozie/blob/f94b01b0/core/src/main/java/org/apache/oozie/action/hadoop/JavaActionExecutor.java
----------------------------------------------------------------------
diff --git 
a/core/src/main/java/org/apache/oozie/action/hadoop/JavaActionExecutor.java 
b/core/src/main/java/org/apache/oozie/action/hadoop/JavaActionExecutor.java
index 94b55cf..7f4d473 100644
--- a/core/src/main/java/org/apache/oozie/action/hadoop/JavaActionExecutor.java
+++ b/core/src/main/java/org/apache/oozie/action/hadoop/JavaActionExecutor.java
@@ -93,7 +93,7 @@ public class JavaActionExecutor extends ActionExecutor {
     public final static String MAX_EXTERNAL_STATS_SIZE = 
"oozie.external.stats.max.size";
     public static final String ACL_VIEW_JOB = "mapreduce.job.acl-view-job";
     public static final String ACL_MODIFY_JOB = "mapreduce.job.acl-modify-job";
-    private static final String HADOOP_YARN_UBER_MODE = 
"mapreduce.job.ubertask.enable";
+    public static final String HADOOP_YARN_UBER_MODE = 
"mapreduce.job.ubertask.enable";
     public static final String HADOOP_MAP_MEMORY_MB = 
"mapreduce.map.memory.mb";
     public static final String HADOOP_CHILD_JAVA_OPTS = 
"mapred.child.java.opts";
     public static final String HADOOP_MAP_JAVA_OPTS = 
"mapreduce.map.java.opts";
@@ -270,56 +270,116 @@ public class JavaActionExecutor extends ActionExecutor {
 
     void updateConfForUberMode(Configuration launcherConf) {
 
-        // memory.mb
-        int launcherMapMemoryMB = launcherConf.getInt(HADOOP_MAP_MEMORY_MB, 
1536);
-        int amMemoryMB = launcherConf.getInt(YARN_AM_RESOURCE_MB, 1536);
-        // YARN_MEMORY_MB_MIN to provide buffer.
-        // suppose launcher map aggressively use high memory, need some 
headroom for AM
-        int memoryMB = Math.max(launcherMapMemoryMB, amMemoryMB) + 
YARN_MEMORY_MB_MIN;
-        // limit to 4096 in case of 32 bit
-        if(launcherMapMemoryMB < 4096 && amMemoryMB < 4096 && memoryMB > 4096){
-            memoryMB = 4096;
-        }
-        launcherConf.setInt(YARN_AM_RESOURCE_MB, memoryMB);
-
-        // We already made mapred.child.java.opts and mapreduce.map.java.opts 
equal, so just start with one of them
-        String launcherMapOpts = launcherConf.get(HADOOP_MAP_JAVA_OPTS, "");
-        String amChildOpts = launcherConf.get(YARN_AM_COMMAND_OPTS);
-        StringBuilder optsStr = new StringBuilder();
-        int heapSizeForMap = extractHeapSizeMB(launcherMapOpts);
-        int heapSizeForAm = extractHeapSizeMB(amChildOpts);
-        int heapSize = Math.max(heapSizeForMap, heapSizeForAm) + 
YARN_MEMORY_MB_MIN;
-        // limit to 3584 in case of 32 bit
-        if(heapSizeForMap < 4096 && heapSizeForAm < 4096 && heapSize > 3584) {
-            heapSize = 3584;
-        }
-        if (amChildOpts != null) {
-            optsStr.append(amChildOpts);
-        }
-        optsStr.append(" ").append(launcherMapOpts.trim());
-        if (heapSize > 0) {
-            // append calculated total heap size to the end
-            optsStr.append(" ").append("-Xmx").append(heapSize).append("m");
-        }
-        launcherConf.set(YARN_AM_COMMAND_OPTS, optsStr.toString().trim());
-
         // child.env
+        boolean hasConflictEnv = false;
         String launcherMapEnv = launcherConf.get(HADOOP_MAP_JAVA_ENV);
         if (launcherMapEnv == null) {
             launcherMapEnv = launcherConf.get(HADOOP_CHILD_JAVA_ENV);
         }
-        String envForAm = launcherConf.get(YARN_AM_ENV);
+        String amEnv = launcherConf.get(YARN_AM_ENV);
         StringBuffer envStr = new StringBuffer();
-        if (envForAm != null) {
-            envStr.append(envForAm);
+        HashMap<String, List<String>> amEnvMap = null;
+        HashMap<String, List<String>> launcherMapEnvMap = null;
+        if (amEnv != null) {
+            envStr.append(amEnv);
+            amEnvMap = populateEnvMap(amEnv);
         }
         if (launcherMapEnv != null) {
-            if (envForAm != null) {
-                envStr.append(",");
+            launcherMapEnvMap = populateEnvMap(launcherMapEnv);
+            if (amEnvMap != null) {
+                Iterator<String> envKeyItr = 
launcherMapEnvMap.keySet().iterator();
+                while (envKeyItr.hasNext()) {
+                    String envKey = envKeyItr.next();
+                    if (amEnvMap.containsKey(envKey)) {
+                        List<String> amValList = amEnvMap.get(envKey);
+                        List<String> launcherValList = 
launcherMapEnvMap.get(envKey);
+                        Iterator<String> valItr = launcherValList.iterator();
+                        while (valItr.hasNext()) {
+                            String val = valItr.next();
+                            if (!amValList.contains(val)) {
+                                hasConflictEnv = true;
+                                break;
+                            }
+                            else {
+                                valItr.remove();
+                            }
+                        }
+                        if (launcherValList.isEmpty()) {
+                            envKeyItr.remove();
+                        }
+                    }
+                }
+            }
+        }
+        if (hasConflictEnv) {
+            launcherConf.setBoolean(HADOOP_YARN_UBER_MODE, false);
+        }
+        else {
+            if (launcherMapEnvMap != null) {
+                for (String key : launcherMapEnvMap.keySet()) {
+                    List<String> launcherValList = launcherMapEnvMap.get(key);
+                    for (String val : launcherValList) {
+                        if (envStr.length() > 0) {
+                            envStr.append(",");
+                        }
+                        envStr.append(key).append("=").append(val);
+                    }
+                }
+            }
+
+            launcherConf.set(YARN_AM_ENV, envStr.toString());
+
+            // memory.mb
+            int launcherMapMemoryMB = 
launcherConf.getInt(HADOOP_MAP_MEMORY_MB, 1536);
+            int amMemoryMB = launcherConf.getInt(YARN_AM_RESOURCE_MB, 1536);
+            // YARN_MEMORY_MB_MIN to provide buffer.
+            // suppose launcher map aggressively use high memory, need some
+            // headroom for AM
+            int memoryMB = Math.max(launcherMapMemoryMB, amMemoryMB) + 
YARN_MEMORY_MB_MIN;
+            // limit to 4096 in case of 32 bit
+            if (launcherMapMemoryMB < 4096 && amMemoryMB < 4096 && memoryMB > 
4096) {
+                memoryMB = 4096;
+            }
+            launcherConf.setInt(YARN_AM_RESOURCE_MB, memoryMB);
+
+            // We already made mapred.child.java.opts and
+            // mapreduce.map.java.opts equal, so just start with one of them
+            String launcherMapOpts = launcherConf.get(HADOOP_MAP_JAVA_OPTS, 
"");
+            String amChildOpts = launcherConf.get(YARN_AM_COMMAND_OPTS);
+            StringBuilder optsStr = new StringBuilder();
+            int heapSizeForMap = extractHeapSizeMB(launcherMapOpts);
+            int heapSizeForAm = extractHeapSizeMB(amChildOpts);
+            int heapSize = Math.max(heapSizeForMap, heapSizeForAm) + 
YARN_MEMORY_MB_MIN;
+            // limit to 3584 in case of 32 bit
+            if (heapSizeForMap < 4096 && heapSizeForAm < 4096 && heapSize > 
3584) {
+                heapSize = 3584;
+            }
+            if (amChildOpts != null) {
+                optsStr.append(amChildOpts);
+            }
+            optsStr.append(" ").append(launcherMapOpts.trim());
+            if (heapSize > 0) {
+                // append calculated total heap size to the end
+                optsStr.append(" 
").append("-Xmx").append(heapSize).append("m");
+            }
+            launcherConf.set(YARN_AM_COMMAND_OPTS, optsStr.toString().trim());
+        }
+    }
+
+    private HashMap<String, List<String>> populateEnvMap(String input) {
+        HashMap<String, List<String>> envMaps = new HashMap<String, 
List<String>>();
+        String[] envEntries = input.split(",");
+        for (String envEntry : envEntries) {
+            String[] envKeyVal = envEntry.split("=");
+            String envKey = envKeyVal[0].trim();
+            List<String> valList = envMaps.get(envKey);
+            if (valList == null) {
+                valList = new ArrayList<String>();
             }
-            envStr.append(launcherMapEnv);
+            valList.add(envKeyVal[1].trim());
+            envMaps.put(envKey, valList);
         }
-        launcherConf.set(YARN_AM_ENV, envStr.toString());
+        return envMaps;
     }
 
     public int extractHeapSizeMB(String input) {

http://git-wip-us.apache.org/repos/asf/oozie/blob/f94b01b0/core/src/test/java/org/apache/oozie/action/hadoop/TestJavaActionExecutor.java
----------------------------------------------------------------------
diff --git 
a/core/src/test/java/org/apache/oozie/action/hadoop/TestJavaActionExecutor.java 
b/core/src/test/java/org/apache/oozie/action/hadoop/TestJavaActionExecutor.java
index 72a137c..17c5fd9 100644
--- 
a/core/src/test/java/org/apache/oozie/action/hadoop/TestJavaActionExecutor.java
+++ 
b/core/src/test/java/org/apache/oozie/action/hadoop/TestJavaActionExecutor.java
@@ -1788,6 +1788,75 @@ public class TestJavaActionExecutor extends 
ActionExecutorTestCase {
         assertEquals("A=foo,B=bar", 
launcherConf.get(JavaActionExecutor.YARN_AM_ENV));
     }
 
+    public void testUpdateConfForUberModeWithEnvDup() throws Exception {
+
+        
Services.get().getConf().setBoolean("oozie.action.launcher.mapreduce.job.ubertask.enable",
 true);
+
+        Element actionXml1 = XmlUtils.parseXml("<java>" + "<job-tracker>" + 
getJobTrackerUri() + "</job-tracker>"
+                + "<name-node>" + getNameNodeUri() + "</name-node>" + 
"<configuration>"
+                + 
"<property><name>oozie.launcher.yarn.app.mapreduce.am.env</name>"
+                + 
"<value>JAVA_HOME=/home/blah/java/jdk64/current,A=foo,B=bar</value></property>"
+                + "<property><name>oozie.launcher.mapreduce.map.env</name>"
+                + 
"<value>JAVA_HOME=/home/blah/java/jdk64/latest,C=blah</value></property>" + 
"</configuration>"
+                + "<main-class>MAIN-CLASS</main-class>" + "</java>");
+        JavaActionExecutor ae = new JavaActionExecutor();
+        XConfiguration protoConf = new XConfiguration();
+        protoConf.set(WorkflowAppService.HADOOP_USER, getTestUser());
+
+        WorkflowJobBean wf = createBaseWorkflow(protoConf, "action");
+        WorkflowActionBean action = (WorkflowActionBean) 
wf.getActions().get(0);
+        action.setType(ae.getType());
+
+        Context context = new Context(wf, action);
+        JobConf launcherConf = new JobConf();
+        launcherConf = ae.createLauncherConf(getFileSystem(), context, action, 
actionXml1, launcherConf);
+
+        // uber mode should be disabled since JAVA_HOME points to different 
paths in am.evn and map.env
+        assertEquals("false", 
launcherConf.get(JavaActionExecutor.HADOOP_YARN_UBER_MODE));
+
+        // testing complicated env setting case
+        Element actionXml2 = XmlUtils.parseXml("<java>" + "<job-tracker>" + 
getJobTrackerUri() + "</job-tracker>"
+                + "<name-node>" + getNameNodeUri() + "</name-node>" + 
"<configuration>" + "<property>"
+                + "<name>oozie.launcher.yarn.app.mapreduce.am.env</name>"
+                + 
"<value>LD_LIBRARY_PATH=$HADOOP_HOME_1/lib/native/`$JAVA_HOME/bin/java -d32 
-version;"
+                + "if [ $? -eq 0 ]; then echo Linux-i386-32; else echo 
Linux-amd64-64;fi`</value></property>"
+                + "<property>" + 
"<name>oozie.launcher.mapreduce.map.env</name>"
+                + 
"<value>LD_LIBRARY_PATH=$HADOOP_HOME_2/lib/native/`$JAVA_HOME/bin/java -d32 
-version;"
+                + "if [ $? -eq 0 ]; then echo Linux-i386-32; else echo 
Linux-amd64-64;fi`</value></property>"
+                + "</configuration>" + "<main-class>MAIN-CLASS</main-class>" + 
"</java>");
+
+        launcherConf = ae.createLauncherConf(getFileSystem(), context, action, 
actionXml2, launcherConf);
+
+        // uber mode should be disabled since LD_LIBRARY_PATH is different in 
am.evn and map.env
+        assertEquals("false", 
launcherConf.get(JavaActionExecutor.HADOOP_YARN_UBER_MODE));
+
+        Element actionXml3 = XmlUtils
+                .parseXml("<java>"
+                        + "<job-tracker>"
+                        + getJobTrackerUri()
+                        + "</job-tracker>"
+                        + "<name-node>"
+                        + getNameNodeUri()
+                        + "</name-node>"
+                        + "<configuration>"
+                        + 
"<property><name>oozie.launcher.yarn.app.mapreduce.am.env</name>"
+                        + 
"<value>JAVA_HOME=/home/blah/java/jdk64/current,PATH=A,PATH=B</value></property>"
+                        + 
"<property><name>oozie.launcher.mapreduce.map.env</name>"
+                        + 
"<value>JAVA_HOME=/home/blah/java/jdk64/current,PATH=A</value></property>"
+                        + "</configuration>" + 
"<main-class>MAIN-CLASS</main-class>" + "</java>");
+
+        launcherConf = ae.createBaseHadoopConf(context, actionXml3);
+        launcherConf = ae.createLauncherConf(getFileSystem(), context, action, 
actionXml3, launcherConf);
+
+        // uber mode should be enabled since JAVA_HOME is the same, and PATH 
doesn't conflict
+        assertEquals("true", 
launcherConf.get(JavaActionExecutor.HADOOP_YARN_UBER_MODE));
+
+        // JAVA_HOME, PATH=A duplication is removed
+        String a = launcherConf.get(JavaActionExecutor.YARN_AM_ENV);
+        assertEquals("JAVA_HOME=/home/blah/java/jdk64/current,PATH=A,PATH=B",
+                launcherConf.get(JavaActionExecutor.YARN_AM_ENV));
+    }
+
     public void testUpdateConfForUberModeForJavaOpts() throws Exception {
         
Services.get().getConf().setBoolean("oozie.action.launcher.mapreduce.job.ubertask.enable",
 true);
 

http://git-wip-us.apache.org/repos/asf/oozie/blob/f94b01b0/release-log.txt
----------------------------------------------------------------------
diff --git a/release-log.txt b/release-log.txt
index a6032c6..b2c59d7 100644
--- a/release-log.txt
+++ b/release-log.txt
@@ -1,5 +1,6 @@
 -- Oozie 4.1.0 release (4.1 - unreleased)
 
+OOZIE-1958 address duplication of env variables in 
oozie.launcher.yarn.app.mapreduce.am.env when running with uber mode (ryota)
 OOZIE-1920 Capture Output for SSH Action doesn't work (Richard Williams via 
rkanter)
 OOZIE-1961 Remove requireJavaVersion from enforcer rules (lars_francke via 
rkanter)
 OOZIE-1883 hostnameFilter has invalid url-pattern (dvillegas via rkanter)

Reply via email to