Repository: oozie Updated Branches: refs/heads/master b0f0dc1d5 -> 53c1c81ef
OOZIE-3227 Eliminate duplicate dependencies when using Hadoop 3 DistributedCache (dionusos via andras.piros) Project: http://git-wip-us.apache.org/repos/asf/oozie/repo Commit: http://git-wip-us.apache.org/repos/asf/oozie/commit/53c1c81e Tree: http://git-wip-us.apache.org/repos/asf/oozie/tree/53c1c81e Diff: http://git-wip-us.apache.org/repos/asf/oozie/diff/53c1c81e Branch: refs/heads/master Commit: 53c1c81ef7195beb56ded42cec4c840f921ec084 Parents: b0f0dc1 Author: Andras Piros <[email protected]> Authored: Mon Jun 4 11:08:22 2018 +0200 Committer: Andras Piros <[email protected]> Committed: Mon Jun 4 11:08:22 2018 +0200 ---------------------------------------------------------------------- .../action/hadoop/DependencyDeduplicator.java | 82 +++++++++++++++ .../oozie/action/hadoop/JavaActionExecutor.java | 13 +++ core/src/main/resources/oozie-default.xml | 13 +++ .../hadoop/TestDependencyDeduplicator.java | 100 +++++++++++++++++++ .../TestDependencyDeduplicatorCornerCases.java | 73 ++++++++++++++ .../src/site/twiki/AG_ActionConfiguration.twiki | 30 ++++++ release-log.txt | 1 + 7 files changed, 312 insertions(+) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/oozie/blob/53c1c81e/core/src/main/java/org/apache/oozie/action/hadoop/DependencyDeduplicator.java ---------------------------------------------------------------------- diff --git a/core/src/main/java/org/apache/oozie/action/hadoop/DependencyDeduplicator.java b/core/src/main/java/org/apache/oozie/action/hadoop/DependencyDeduplicator.java new file mode 100644 index 0000000..11c62bc --- /dev/null +++ b/core/src/main/java/org/apache/oozie/action/hadoop/DependencyDeduplicator.java @@ -0,0 +1,82 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.oozie.action.hadoop; + +import com.google.common.base.Strings; +import java.io.File; +import java.util.HashMap; +import java.util.Map; +import org.apache.hadoop.conf.Configuration; +import org.apache.oozie.util.XLog; + +public class DependencyDeduplicator { + public static final XLog LOG = XLog.getLog(DependencyDeduplicator.class); + + private static final String SYMLINK_SEPARATOR = "#"; + private static final String DEPENDENCY_SEPARATOR = ","; + + public void deduplicate(final Configuration conf, final String key) { + if(conf == null || key == null) { + return; + } + + final String commaSeparatedFilePaths = conf.get(key); + LOG.debug("Oozie tries to deduplicate dependencies with key [{0}], which has value of [{1}]", + key, + commaSeparatedFilePaths); + + if(Strings.isNullOrEmpty(commaSeparatedFilePaths)) { + return; + } + + final Map<String, String> nameToPath = new HashMap<>(); + final StringBuilder uniqList = new StringBuilder(); + + final String[] dependencyPaths = commaSeparatedFilePaths.split(DEPENDENCY_SEPARATOR); + for (String dependencyPath : dependencyPaths) { + try { + final String dependencyName = resolveName( + dependencyPath.substring( + dependencyPath.lastIndexOf(File.separator) + DEPENDENCY_SEPARATOR.length() + ) + ); + + if (nameToPath.putIfAbsent(dependencyName, dependencyPath) == null) { + uniqList.append(dependencyPath).append(DEPENDENCY_SEPARATOR); + } else { + LOG.warn("{0}[{1}] is already defined in {2}. Skipping.", + dependencyName, dependencyPath, key); + } + } catch (final IndexOutOfBoundsException e) { + LOG.warn("Dependency [{0}] is malformed. Skipping.", dependencyPath); + } + } + if(uniqList.length() > 0) { + uniqList.setLength(uniqList.length() - 1); + } + conf.set(key, uniqList.toString()); + LOG.info("{0} dependencies are unified by their filename or symlink name.", key); + } + + private String resolveName(final String dependencyName) { + if(dependencyName.contains(SYMLINK_SEPARATOR)) { + return dependencyName.split(SYMLINK_SEPARATOR)[1]; + } + return dependencyName; + } +} http://git-wip-us.apache.org/repos/asf/oozie/blob/53c1c81e/core/src/main/java/org/apache/oozie/action/hadoop/JavaActionExecutor.java ---------------------------------------------------------------------- diff --git a/core/src/main/java/org/apache/oozie/action/hadoop/JavaActionExecutor.java b/core/src/main/java/org/apache/oozie/action/hadoop/JavaActionExecutor.java index 0ba3cbf..ed809ef 100644 --- a/core/src/main/java/org/apache/oozie/action/hadoop/JavaActionExecutor.java +++ b/core/src/main/java/org/apache/oozie/action/hadoop/JavaActionExecutor.java @@ -55,6 +55,7 @@ import org.apache.hadoop.io.DataOutputBuffer; import org.apache.hadoop.ipc.RemoteException; import org.apache.hadoop.mapred.JobClient; import org.apache.hadoop.mapred.TaskLog; +import org.apache.hadoop.mapreduce.MRJobConfig; import org.apache.hadoop.mapreduce.filecache.ClientDistributedCacheManager; import org.apache.hadoop.mapreduce.v2.util.MRApps; import org.apache.hadoop.security.AccessControlException; @@ -165,6 +166,7 @@ public class JavaActionExecutor extends ActionExecutor { private static final Set<String> DISALLOWED_PROPERTIES = new HashSet<String>(); private static final String OOZIE_ACTION_NAME = "oozie.action.name"; private final static String ACTION_SHARELIB_FOR = "oozie.action.sharelib.for."; + public static final String OOZIE_ACTION_DEPENDENCY_DEDUPLICATE = "oozie.action.dependency.deduplicate"; private static int maxActionOutputLen; private static int maxExternalStatsSize; @@ -175,6 +177,8 @@ public class JavaActionExecutor extends ActionExecutor { protected XLog LOG = XLog.getLog(getClass()); private static final String JAVA_TMP_DIR_SETTINGS = "-Djava.io.tmpdir="; + private static DependencyDeduplicator dependencyDeduplicator = new DependencyDeduplicator(); + public XConfiguration workflowConf = null; static { @@ -875,6 +879,7 @@ public class JavaActionExecutor extends ActionExecutor { prepareXML = XmlUtils.prettyPrint(prepareElement).toString().trim(); } } + checkAndDeduplicate(actionConf); LauncherHelper.setupLauncherInfo(launcherJobConf, jobId, actionId, actionDir, recoveryId, actionConf, prepareXML); @@ -925,6 +930,7 @@ public class JavaActionExecutor extends ActionExecutor { // maybe we should add queue to the WF schema, below job-tracker actionConfToLauncherConf(actionConf, launcherJobConf); + checkAndDeduplicate(launcherJobConf); return launcherJobConf; } catch (Exception ex) { @@ -932,6 +938,13 @@ public class JavaActionExecutor extends ActionExecutor { } } + private void checkAndDeduplicate(final Configuration conf) { + if(ConfigurationService.getBoolean(OOZIE_ACTION_DEPENDENCY_DEDUPLICATE, false)) { + dependencyDeduplicator.deduplicate(conf, MRJobConfig.CACHE_FILES); + dependencyDeduplicator.deduplicate(conf, MRJobConfig.CACHE_ARCHIVES); + } + } + @VisibleForTesting protected static int getMaxOutputData(Configuration actionConf) { String userMaxActionOutputLen = actionConf.get("oozie.action.max.output.data"); http://git-wip-us.apache.org/repos/asf/oozie/blob/53c1c81e/core/src/main/resources/oozie-default.xml ---------------------------------------------------------------------- diff --git a/core/src/main/resources/oozie-default.xml b/core/src/main/resources/oozie-default.xml index c54db34..14bb200 100644 --- a/core/src/main/resources/oozie-default.xml +++ b/core/src/main/resources/oozie-default.xml @@ -88,6 +88,19 @@ </property> <property> + <name>oozie.action.dependency.deduplicate</name> + <value>false</value> + <description> + If true, then Oozie will remove all the duplicates from the list of dependencies when they are passed to + the jobtracker. Higher priority dependencies will remain as the following: + Original list: "/a/a.jar#a.jar,/a/b.jar#b.jar,/b/a.jar,/b/b.jar,/c/d.jar" + Deduplicated list: "/a/a.jar#a.jar,/a/b.jar#b.jar,/c/d.jar" + With other words, priority order is: action jar > user workflow libs > action libs > system lib, + where dependency with greater prio is used. + </description> + </property> + + <property> <name>oozie.processing.timezone</name> <value>UTC</value> <description> http://git-wip-us.apache.org/repos/asf/oozie/blob/53c1c81e/core/src/test/java/org/apache/oozie/action/hadoop/TestDependencyDeduplicator.java ---------------------------------------------------------------------- diff --git a/core/src/test/java/org/apache/oozie/action/hadoop/TestDependencyDeduplicator.java b/core/src/test/java/org/apache/oozie/action/hadoop/TestDependencyDeduplicator.java new file mode 100644 index 0000000..4e73910 --- /dev/null +++ b/core/src/test/java/org/apache/oozie/action/hadoop/TestDependencyDeduplicator.java @@ -0,0 +1,100 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.oozie.action.hadoop; + +import java.util.Arrays; +import java.util.Collection; +import org.apache.hadoop.conf.Configuration; +import org.junit.Assert; +import org.junit.Before; +import org.junit.Test; +import org.junit.runner.RunWith; +import org.junit.runners.Parameterized; + +@RunWith(Parameterized.class) +public class TestDependencyDeduplicator { + private String originalList; + private String deduplicatedList; + + public TestDependencyDeduplicator(String testName, String deduplicatedList, String originalList) { + this.originalList = originalList; + this.deduplicatedList = deduplicatedList; + } + private static final String KEY = "key"; + + private static final String WITHOUT_SYMLINKS = "/a/a.jar,/b/a.jar"; + private static final String WITHOUT_SYMLINKS_DEDUPLICATED = "/a/a.jar"; + + private static final String FILENAME_ONLY = "a.jar,b.jar#a.jar"; + private static final String FILENAME_ONLY_DEDUPLICATED = "a.jar"; + + private static final String SAME_SYMLINK = "/a/a.jar#a.jar,/a/b.jar#a.jar"; + private static final String SAME_SYMLINK_DEDUPLICATED = "/a/a.jar#a.jar"; + + private static final String DIFFERENT_SYMLINK = "/a/a.jar#a.jar,/b/a.jar#b.jar"; + private static final String DIFFERENT_SYMLINK_DEDUPLICATED = DIFFERENT_SYMLINK; + + private static final String NOT_JARS = "/b/a.txt,/a/a.txt"; + private static final String NOT_JARS_DEDUPLICATED = "/b/a.txt"; + + private static final String EMPTY = ""; + private static final String COMMAS_ONLY = ",,,,,"; + + private static final String DIFFERENT_EXTENSON_SYMLINK = "/a/a.txt#a,/b/b.txt/a"; + private static final String DIFFERENT_EXTENSON_SYMLINK_DEDUPLICATED = "/a/a.txt#a"; + + private static final String UNICODE_PATHS = "/é³/a.txt#a,/é³/b.txt/a"; + private static final String UNICODE_PATHS_DEDUPLICATED = "/é³/a.txt#a"; + + private static final String UNICODE_FILENAMES = "/a/a.txt#é³,/b/é³"; + private static final String UNICODE_FILENAMES_DEDUPLICATED = "/a/a.txt#é³"; + + private Configuration conf; + private DependencyDeduplicator deduplicator; + + @Parameterized.Parameters(name = "{0}") + public static Collection testCases() { + return Arrays.asList(new Object[][] { + {"Test without symliks", WITHOUT_SYMLINKS_DEDUPLICATED, WITHOUT_SYMLINKS}, + {"Test only with filenames", FILENAME_ONLY_DEDUPLICATED, FILENAME_ONLY}, + {"Test with symliks", SAME_SYMLINK_DEDUPLICATED, SAME_SYMLINK}, + {"Test with different symliks", DIFFERENT_SYMLINK_DEDUPLICATED, DIFFERENT_SYMLINK}, + {"Test not jars", NOT_JARS_DEDUPLICATED, NOT_JARS}, + {"Test with empty list", EMPTY, EMPTY}, + {"Test with list of empty values", EMPTY, COMMAS_ONLY}, + {"Test with different extension in symlink", DIFFERENT_EXTENSON_SYMLINK_DEDUPLICATED, DIFFERENT_EXTENSON_SYMLINK}, + {"Test with unicode characters in path", UNICODE_PATHS_DEDUPLICATED, UNICODE_PATHS}, + {"Test with unicode characters in path", UNICODE_FILENAMES_DEDUPLICATED, UNICODE_FILENAMES} + }); + } + + @Before + public void init() { + deduplicator = new DependencyDeduplicator(); + conf = new Configuration(); + } + + @Test + public void testDeduplication() { + conf.set(KEY, originalList); + deduplicator.deduplicate(conf, KEY); + Assert.assertEquals( + String.format("Deduplicator should provide [%s] for input [%s]", deduplicatedList, originalList), + deduplicatedList, conf.get(KEY)); + } +} http://git-wip-us.apache.org/repos/asf/oozie/blob/53c1c81e/core/src/test/java/org/apache/oozie/action/hadoop/TestDependencyDeduplicatorCornerCases.java ---------------------------------------------------------------------- diff --git a/core/src/test/java/org/apache/oozie/action/hadoop/TestDependencyDeduplicatorCornerCases.java b/core/src/test/java/org/apache/oozie/action/hadoop/TestDependencyDeduplicatorCornerCases.java new file mode 100644 index 0000000..99350f5 --- /dev/null +++ b/core/src/test/java/org/apache/oozie/action/hadoop/TestDependencyDeduplicatorCornerCases.java @@ -0,0 +1,73 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.oozie.action.hadoop; + +import java.util.Arrays; +import java.util.Collection; +import org.apache.hadoop.conf.Configuration; +import org.junit.Assert; +import org.junit.Before; +import org.junit.Test; +import org.junit.runner.RunWith; +import org.junit.runners.Parameterized; + +@RunWith(Parameterized.class) +public class TestDependencyDeduplicatorCornerCases { + private static final Configuration DEFAULT_CONF = new Configuration(); + private static final String KEY = "key"; + static { + DEFAULT_CONF.set(KEY, "some_value"); + } + private DependencyDeduplicator deduplicator; + private Configuration conf; + private String key; + private String assertMessage; + + public TestDependencyDeduplicatorCornerCases(final String testName, final Configuration conf, + final String key, final String assertMessage) { + this.conf = conf; + this.key = key; + this.assertMessage = assertMessage; + } + + @Parameterized.Parameters(name = "{0}") + public static Collection testCases() { + return Arrays.asList(new Object[][] { + {"Test with null key", DEFAULT_CONF, null, "[null] value as a key shall be handled."}, + {"Test with null key and conf", null, null, "[null] value as a key or conf shall be handled."}, + {"Test with null conf but real key", null, KEY, "[null] value as conf shall be handled when key is not null, too."}, + {"Test with invalid key", DEFAULT_CONF, "nonexistentkey", "[null] value for a key shall be handled."} + }); + } + + @Before + public void init() { + deduplicator = new DependencyDeduplicator(); + } + + @Test + public void testCornerCase() { + try { + deduplicator.deduplicate(conf, key); + } catch (NullPointerException npe) { + Assert.fail(assertMessage); + } catch (Exception e) { + Assert.fail("No exception should be thrown."); + } + } +} http://git-wip-us.apache.org/repos/asf/oozie/blob/53c1c81e/docs/src/site/twiki/AG_ActionConfiguration.twiki ---------------------------------------------------------------------- diff --git a/docs/src/site/twiki/AG_ActionConfiguration.twiki b/docs/src/site/twiki/AG_ActionConfiguration.twiki index f6ffdb2..8c032a7 100644 --- a/docs/src/site/twiki/AG_ActionConfiguration.twiki +++ b/docs/src/site/twiki/AG_ActionConfiguration.twiki @@ -50,6 +50,36 @@ The action configuration files use the Hadoop configuration syntax. By default Oozie does not define any default action configurations. +---++ Dependency deduplication + +Using Oozie with Hadoop 3 may require to have dependency file names distinguishable, + which means having two files on sharelib and in your app's dependencies with identical names, leads to job submission failure. +To avoid this you can enable the deduplicator by setting oozie.action.dependency.deduplicate=true in oozie-site.xml +(false, by default). +Dependencies which are closer to your application has higher priority: action jar > user workflow libs > action libs > system lib, +where dependency with greater prio is used. + +Real world example: +You have an application workflow which is uploaded to HDFS in /apps/app directory. You have your app.jar and dependency jars. +You also define a spark action in your workflow and set use system libs; the HDFS tree is similar to this: +<verbatim> + + /apps/app/ + - app.jar + - workflow.xml + + libs + - app.jar + - jackson-annotations-1.0.jar + + share/lib/ + + spark + - app.jar + - jackson-annotations-1.0.jar + + oozie + - jackson-annotations-1.0.jar +</verbatim> +The deduplicator code will create the following list of files: +=/apps/app/app.jar,/apps/app/libs/jackson-annotations-1.0.jar= +And no other files will be passed at job submission. + [[index][::Go back to Oozie Documentation Index::]] </noautolink> http://git-wip-us.apache.org/repos/asf/oozie/blob/53c1c81e/release-log.txt ---------------------------------------------------------------------- diff --git a/release-log.txt b/release-log.txt index 0de0b35..21dfe34 100644 --- a/release-log.txt +++ b/release-log.txt @@ -1,5 +1,6 @@ -- Oozie 5.1.0 release (trunk - unreleased) +OOZIE-3227 Eliminate duplicate dependencies when using Hadoop 3 DistributedCache (dionusos via andras.piros) OOZIE-2097 Get rid of non-Javadoc comments (Jan Hentschel via andras.piros) OOZIE-3269 Flaky tests in TestCoordMaterializeTriggerService class (pbacsko via andras.piros) OOZIE-3258 Surefire should not trim stack traces (pbacsko)
