Repository: oozie
Updated Branches:
  refs/heads/master b0f0dc1d5 -> 53c1c81ef


OOZIE-3227 Eliminate duplicate dependencies when using Hadoop 3 
DistributedCache (dionusos via andras.piros)


Project: http://git-wip-us.apache.org/repos/asf/oozie/repo
Commit: http://git-wip-us.apache.org/repos/asf/oozie/commit/53c1c81e
Tree: http://git-wip-us.apache.org/repos/asf/oozie/tree/53c1c81e
Diff: http://git-wip-us.apache.org/repos/asf/oozie/diff/53c1c81e

Branch: refs/heads/master
Commit: 53c1c81ef7195beb56ded42cec4c840f921ec084
Parents: b0f0dc1
Author: Andras Piros <[email protected]>
Authored: Mon Jun 4 11:08:22 2018 +0200
Committer: Andras Piros <[email protected]>
Committed: Mon Jun 4 11:08:22 2018 +0200

----------------------------------------------------------------------
 .../action/hadoop/DependencyDeduplicator.java   |  82 +++++++++++++++
 .../oozie/action/hadoop/JavaActionExecutor.java |  13 +++
 core/src/main/resources/oozie-default.xml       |  13 +++
 .../hadoop/TestDependencyDeduplicator.java      | 100 +++++++++++++++++++
 .../TestDependencyDeduplicatorCornerCases.java  |  73 ++++++++++++++
 .../src/site/twiki/AG_ActionConfiguration.twiki |  30 ++++++
 release-log.txt                                 |   1 +
 7 files changed, 312 insertions(+)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/oozie/blob/53c1c81e/core/src/main/java/org/apache/oozie/action/hadoop/DependencyDeduplicator.java
----------------------------------------------------------------------
diff --git 
a/core/src/main/java/org/apache/oozie/action/hadoop/DependencyDeduplicator.java 
b/core/src/main/java/org/apache/oozie/action/hadoop/DependencyDeduplicator.java
new file mode 100644
index 0000000..11c62bc
--- /dev/null
+++ 
b/core/src/main/java/org/apache/oozie/action/hadoop/DependencyDeduplicator.java
@@ -0,0 +1,82 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.oozie.action.hadoop;
+
+import com.google.common.base.Strings;
+import java.io.File;
+import java.util.HashMap;
+import java.util.Map;
+import org.apache.hadoop.conf.Configuration;
+import org.apache.oozie.util.XLog;
+
+public class DependencyDeduplicator {
+    public static final XLog LOG = XLog.getLog(DependencyDeduplicator.class);
+
+    private static final String SYMLINK_SEPARATOR = "#";
+    private static final String DEPENDENCY_SEPARATOR = ",";
+
+    public void deduplicate(final Configuration conf, final String key) {
+        if(conf == null || key == null) {
+            return;
+        }
+
+        final String commaSeparatedFilePaths = conf.get(key);
+        LOG.debug("Oozie tries to deduplicate dependencies with key [{0}], 
which has value of [{1}]",
+                key,
+                commaSeparatedFilePaths);
+
+        if(Strings.isNullOrEmpty(commaSeparatedFilePaths)) {
+            return;
+        }
+
+        final Map<String, String> nameToPath = new HashMap<>();
+        final StringBuilder uniqList = new StringBuilder();
+
+        final String[] dependencyPaths = 
commaSeparatedFilePaths.split(DEPENDENCY_SEPARATOR);
+        for (String dependencyPath : dependencyPaths) {
+            try {
+                final String dependencyName = resolveName(
+                        dependencyPath.substring(
+                                dependencyPath.lastIndexOf(File.separator) + 
DEPENDENCY_SEPARATOR.length()
+                        )
+                );
+
+                if (nameToPath.putIfAbsent(dependencyName, dependencyPath) == 
null) {
+                    
uniqList.append(dependencyPath).append(DEPENDENCY_SEPARATOR);
+                } else {
+                    LOG.warn("{0}[{1}] is already defined in {2}. Skipping.",
+                            dependencyName, dependencyPath, key);
+                }
+            } catch (final IndexOutOfBoundsException e) {
+                LOG.warn("Dependency [{0}] is malformed. Skipping.", 
dependencyPath);
+            }
+        }
+        if(uniqList.length() > 0) {
+            uniqList.setLength(uniqList.length() - 1);
+        }
+        conf.set(key, uniqList.toString());
+        LOG.info("{0} dependencies are unified by their filename or symlink 
name.", key);
+    }
+
+    private String resolveName(final String dependencyName) {
+        if(dependencyName.contains(SYMLINK_SEPARATOR)) {
+            return dependencyName.split(SYMLINK_SEPARATOR)[1];
+        }
+        return dependencyName;
+    }
+}

http://git-wip-us.apache.org/repos/asf/oozie/blob/53c1c81e/core/src/main/java/org/apache/oozie/action/hadoop/JavaActionExecutor.java
----------------------------------------------------------------------
diff --git 
a/core/src/main/java/org/apache/oozie/action/hadoop/JavaActionExecutor.java 
b/core/src/main/java/org/apache/oozie/action/hadoop/JavaActionExecutor.java
index 0ba3cbf..ed809ef 100644
--- a/core/src/main/java/org/apache/oozie/action/hadoop/JavaActionExecutor.java
+++ b/core/src/main/java/org/apache/oozie/action/hadoop/JavaActionExecutor.java
@@ -55,6 +55,7 @@ import org.apache.hadoop.io.DataOutputBuffer;
 import org.apache.hadoop.ipc.RemoteException;
 import org.apache.hadoop.mapred.JobClient;
 import org.apache.hadoop.mapred.TaskLog;
+import org.apache.hadoop.mapreduce.MRJobConfig;
 import org.apache.hadoop.mapreduce.filecache.ClientDistributedCacheManager;
 import org.apache.hadoop.mapreduce.v2.util.MRApps;
 import org.apache.hadoop.security.AccessControlException;
@@ -165,6 +166,7 @@ public class JavaActionExecutor extends ActionExecutor {
     private static final Set<String> DISALLOWED_PROPERTIES = new 
HashSet<String>();
     private static final String OOZIE_ACTION_NAME = "oozie.action.name";
     private final static String ACTION_SHARELIB_FOR = 
"oozie.action.sharelib.for.";
+    public static final String OOZIE_ACTION_DEPENDENCY_DEDUPLICATE = 
"oozie.action.dependency.deduplicate";
 
     private static int maxActionOutputLen;
     private static int maxExternalStatsSize;
@@ -175,6 +177,8 @@ public class JavaActionExecutor extends ActionExecutor {
     protected XLog LOG = XLog.getLog(getClass());
     private static final String JAVA_TMP_DIR_SETTINGS = "-Djava.io.tmpdir=";
 
+    private static DependencyDeduplicator dependencyDeduplicator = new 
DependencyDeduplicator();
+
     public XConfiguration workflowConf = null;
 
     static {
@@ -875,6 +879,7 @@ public class JavaActionExecutor extends ActionExecutor {
                     prepareXML = 
XmlUtils.prettyPrint(prepareElement).toString().trim();
                 }
             }
+            checkAndDeduplicate(actionConf);
             LauncherHelper.setupLauncherInfo(launcherJobConf, jobId, actionId, 
actionDir, recoveryId, actionConf,
                     prepareXML);
 
@@ -925,6 +930,7 @@ public class JavaActionExecutor extends ActionExecutor {
             // maybe we should add queue to the WF schema, below job-tracker
             actionConfToLauncherConf(actionConf, launcherJobConf);
 
+            checkAndDeduplicate(launcherJobConf);
             return launcherJobConf;
         }
         catch (Exception ex) {
@@ -932,6 +938,13 @@ public class JavaActionExecutor extends ActionExecutor {
         }
     }
 
+    private void checkAndDeduplicate(final Configuration conf) {
+        
if(ConfigurationService.getBoolean(OOZIE_ACTION_DEPENDENCY_DEDUPLICATE, false)) 
{
+            dependencyDeduplicator.deduplicate(conf, MRJobConfig.CACHE_FILES);
+            dependencyDeduplicator.deduplicate(conf, 
MRJobConfig.CACHE_ARCHIVES);
+        }
+    }
+
     @VisibleForTesting
     protected static int getMaxOutputData(Configuration actionConf) {
         String userMaxActionOutputLen = 
actionConf.get("oozie.action.max.output.data");

http://git-wip-us.apache.org/repos/asf/oozie/blob/53c1c81e/core/src/main/resources/oozie-default.xml
----------------------------------------------------------------------
diff --git a/core/src/main/resources/oozie-default.xml 
b/core/src/main/resources/oozie-default.xml
index c54db34..14bb200 100644
--- a/core/src/main/resources/oozie-default.xml
+++ b/core/src/main/resources/oozie-default.xml
@@ -88,6 +88,19 @@
     </property>
 
     <property>
+        <name>oozie.action.dependency.deduplicate</name>
+        <value>false</value>
+        <description>
+            If true, then Oozie will remove all the duplicates from the list 
of dependencies when they are passed to
+            the jobtracker. Higher priority dependencies will remain as the 
following:
+            Original list: 
"/a/a.jar#a.jar,/a/b.jar#b.jar,/b/a.jar,/b/b.jar,/c/d.jar"
+            Deduplicated list: "/a/a.jar#a.jar,/a/b.jar#b.jar,/c/d.jar"
+            With other words, priority order is: action jar > user workflow 
libs > action libs > system lib,
+            where dependency with greater prio is used.
+        </description>
+    </property>
+
+    <property>
         <name>oozie.processing.timezone</name>
         <value>UTC</value>
         <description>

http://git-wip-us.apache.org/repos/asf/oozie/blob/53c1c81e/core/src/test/java/org/apache/oozie/action/hadoop/TestDependencyDeduplicator.java
----------------------------------------------------------------------
diff --git 
a/core/src/test/java/org/apache/oozie/action/hadoop/TestDependencyDeduplicator.java
 
b/core/src/test/java/org/apache/oozie/action/hadoop/TestDependencyDeduplicator.java
new file mode 100644
index 0000000..4e73910
--- /dev/null
+++ 
b/core/src/test/java/org/apache/oozie/action/hadoop/TestDependencyDeduplicator.java
@@ -0,0 +1,100 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.oozie.action.hadoop;
+
+import java.util.Arrays;
+import java.util.Collection;
+import org.apache.hadoop.conf.Configuration;
+import org.junit.Assert;
+import org.junit.Before;
+import org.junit.Test;
+import org.junit.runner.RunWith;
+import org.junit.runners.Parameterized;
+
+@RunWith(Parameterized.class)
+public class TestDependencyDeduplicator {
+    private String originalList;
+    private String deduplicatedList;
+
+    public TestDependencyDeduplicator(String testName, String 
deduplicatedList, String originalList) {
+        this.originalList = originalList;
+        this.deduplicatedList = deduplicatedList;
+    }
+    private static final String KEY = "key";
+
+    private static final String WITHOUT_SYMLINKS = "/a/a.jar,/b/a.jar";
+    private static final String WITHOUT_SYMLINKS_DEDUPLICATED = "/a/a.jar";
+
+    private static final String FILENAME_ONLY = "a.jar,b.jar#a.jar";
+    private static final String FILENAME_ONLY_DEDUPLICATED = "a.jar";
+
+    private static final String SAME_SYMLINK = "/a/a.jar#a.jar,/a/b.jar#a.jar";
+    private static final String SAME_SYMLINK_DEDUPLICATED = "/a/a.jar#a.jar";
+
+    private static final String DIFFERENT_SYMLINK = 
"/a/a.jar#a.jar,/b/a.jar#b.jar";
+    private static final String DIFFERENT_SYMLINK_DEDUPLICATED = 
DIFFERENT_SYMLINK;
+
+    private static final String NOT_JARS = "/b/a.txt,/a/a.txt";
+    private static final String NOT_JARS_DEDUPLICATED = "/b/a.txt";
+
+    private static final String EMPTY = "";
+    private static final String COMMAS_ONLY = ",,,,,";
+
+    private static final String DIFFERENT_EXTENSON_SYMLINK = 
"/a/a.txt#a,/b/b.txt/a";
+    private static final String DIFFERENT_EXTENSON_SYMLINK_DEDUPLICATED = 
"/a/a.txt#a";
+
+    private static final String UNICODE_PATHS = "/音/a.txt#a,/音/b.txt/a";
+    private static final String UNICODE_PATHS_DEDUPLICATED = "/音/a.txt#a";
+
+    private static final String UNICODE_FILENAMES = "/a/a.txt#音,/b/音";
+    private static final String UNICODE_FILENAMES_DEDUPLICATED = 
"/a/a.txt#音";
+
+    private Configuration conf;
+    private DependencyDeduplicator deduplicator;
+
+    @Parameterized.Parameters(name = "{0}")
+    public static Collection testCases() {
+        return Arrays.asList(new Object[][] {
+                {"Test without symliks", WITHOUT_SYMLINKS_DEDUPLICATED, 
WITHOUT_SYMLINKS},
+                {"Test only with filenames", FILENAME_ONLY_DEDUPLICATED, 
FILENAME_ONLY},
+                {"Test with symliks", SAME_SYMLINK_DEDUPLICATED, SAME_SYMLINK},
+                {"Test with different symliks", 
DIFFERENT_SYMLINK_DEDUPLICATED, DIFFERENT_SYMLINK},
+                {"Test not jars", NOT_JARS_DEDUPLICATED, NOT_JARS},
+                {"Test with empty list", EMPTY, EMPTY},
+                {"Test with list of empty values", EMPTY, COMMAS_ONLY},
+                {"Test with different extension in symlink", 
DIFFERENT_EXTENSON_SYMLINK_DEDUPLICATED, DIFFERENT_EXTENSON_SYMLINK},
+                {"Test with unicode characters in path", 
UNICODE_PATHS_DEDUPLICATED, UNICODE_PATHS},
+                {"Test with unicode characters in path", 
UNICODE_FILENAMES_DEDUPLICATED, UNICODE_FILENAMES}
+        });
+    }
+
+    @Before
+    public void init() {
+        deduplicator = new DependencyDeduplicator();
+        conf = new Configuration();
+    }
+
+    @Test
+    public void testDeduplication() {
+        conf.set(KEY, originalList);
+        deduplicator.deduplicate(conf, KEY);
+        Assert.assertEquals(
+                String.format("Deduplicator should provide [%s] for input 
[%s]", deduplicatedList, originalList),
+                deduplicatedList, conf.get(KEY));
+    }
+}

http://git-wip-us.apache.org/repos/asf/oozie/blob/53c1c81e/core/src/test/java/org/apache/oozie/action/hadoop/TestDependencyDeduplicatorCornerCases.java
----------------------------------------------------------------------
diff --git 
a/core/src/test/java/org/apache/oozie/action/hadoop/TestDependencyDeduplicatorCornerCases.java
 
b/core/src/test/java/org/apache/oozie/action/hadoop/TestDependencyDeduplicatorCornerCases.java
new file mode 100644
index 0000000..99350f5
--- /dev/null
+++ 
b/core/src/test/java/org/apache/oozie/action/hadoop/TestDependencyDeduplicatorCornerCases.java
@@ -0,0 +1,73 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.oozie.action.hadoop;
+
+import java.util.Arrays;
+import java.util.Collection;
+import org.apache.hadoop.conf.Configuration;
+import org.junit.Assert;
+import org.junit.Before;
+import org.junit.Test;
+import org.junit.runner.RunWith;
+import org.junit.runners.Parameterized;
+
+@RunWith(Parameterized.class)
+public class TestDependencyDeduplicatorCornerCases {
+    private static final Configuration DEFAULT_CONF = new Configuration();
+    private static final String KEY = "key";
+    static {
+        DEFAULT_CONF.set(KEY, "some_value");
+    }
+    private DependencyDeduplicator deduplicator;
+    private Configuration conf;
+    private String key;
+    private String assertMessage;
+
+    public TestDependencyDeduplicatorCornerCases(final String testName, final 
Configuration conf,
+                                                 final String key, final 
String assertMessage) {
+        this.conf = conf;
+        this.key = key;
+        this.assertMessage = assertMessage;
+    }
+
+    @Parameterized.Parameters(name = "{0}")
+    public static Collection testCases() {
+        return Arrays.asList(new Object[][] {
+                {"Test with null key", DEFAULT_CONF, null, "[null] value as a 
key shall be handled."},
+                {"Test with null key and conf", null, null, "[null] value as a 
key or conf shall be handled."},
+                {"Test with null conf but real key", null, KEY, "[null] value 
as conf shall be handled when key is not null, too."},
+                {"Test with invalid key", DEFAULT_CONF, "nonexistentkey", 
"[null] value for a key shall be handled."}
+        });
+    }
+
+    @Before
+    public void init() {
+        deduplicator = new DependencyDeduplicator();
+    }
+
+    @Test
+    public void testCornerCase() {
+        try {
+            deduplicator.deduplicate(conf, key);
+        } catch (NullPointerException npe) {
+            Assert.fail(assertMessage);
+        } catch (Exception e) {
+            Assert.fail("No exception should be thrown.");
+        }
+    }
+}

http://git-wip-us.apache.org/repos/asf/oozie/blob/53c1c81e/docs/src/site/twiki/AG_ActionConfiguration.twiki
----------------------------------------------------------------------
diff --git a/docs/src/site/twiki/AG_ActionConfiguration.twiki 
b/docs/src/site/twiki/AG_ActionConfiguration.twiki
index f6ffdb2..8c032a7 100644
--- a/docs/src/site/twiki/AG_ActionConfiguration.twiki
+++ b/docs/src/site/twiki/AG_ActionConfiguration.twiki
@@ -50,6 +50,36 @@ The action configuration files use the Hadoop configuration 
syntax.
 
 By default Oozie does not define any default action configurations.
 
+---++ Dependency deduplication
+
+Using Oozie with Hadoop 3 may require to have dependency file names 
distinguishable,
+ which means having two files on sharelib and in your app's dependencies with 
identical names, leads to job submission failure.
+To avoid this you can enable the deduplicator by setting 
oozie.action.dependency.deduplicate=true in oozie-site.xml
+(false, by default).
+Dependencies which are closer to your application has higher priority: action 
jar > user workflow libs > action libs > system lib,
+where dependency with greater prio is used.
+
+Real world example:
+You have an application workflow which is uploaded to HDFS in /apps/app 
directory. You have your app.jar and dependency jars.
+You also define a spark action in your workflow and set use system libs; the 
HDFS tree is similar to this:
+<verbatim>
+ + /apps/app/
+   - app.jar
+   - workflow.xml
+   + libs
+     - app.jar
+     - jackson-annotations-1.0.jar
+ + share/lib/
+   + spark
+     - app.jar
+     - jackson-annotations-1.0.jar
+   + oozie
+     - jackson-annotations-1.0.jar
+</verbatim>
+The deduplicator code will create the following list of files:
+=/apps/app/app.jar,/apps/app/libs/jackson-annotations-1.0.jar=
+And no other files will be passed at job submission.
+
 [[index][::Go back to Oozie Documentation Index::]]
 
 </noautolink>

http://git-wip-us.apache.org/repos/asf/oozie/blob/53c1c81e/release-log.txt
----------------------------------------------------------------------
diff --git a/release-log.txt b/release-log.txt
index 0de0b35..21dfe34 100644
--- a/release-log.txt
+++ b/release-log.txt
@@ -1,5 +1,6 @@
 -- Oozie 5.1.0 release (trunk - unreleased)
 
+OOZIE-3227 Eliminate duplicate dependencies when using Hadoop 3 
DistributedCache (dionusos via andras.piros)
 OOZIE-2097 Get rid of non-Javadoc comments (Jan Hentschel via andras.piros)
 OOZIE-3269 Flaky tests in TestCoordMaterializeTriggerService class (pbacsko 
via andras.piros)
 OOZIE-3258 Surefire should not trim stack traces (pbacsko)

Reply via email to