OOZIE-2068 Configuration as part of sharelib
Project: http://git-wip-us.apache.org/repos/asf/oozie/repo Commit: http://git-wip-us.apache.org/repos/asf/oozie/commit/87a6d053 Tree: http://git-wip-us.apache.org/repos/asf/oozie/tree/87a6d053 Diff: http://git-wip-us.apache.org/repos/asf/oozie/diff/87a6d053 Branch: refs/heads/master Commit: 87a6d0536149c9e84a29490b78ee49d743bad8d1 Parents: 488855a Author: Purshotam Shah <[email protected]> Authored: Tue Jan 27 16:53:50 2015 -0800 Committer: Purshotam Shah <[email protected]> Committed: Tue Jan 27 16:53:50 2015 -0800 ---------------------------------------------------------------------- .../action/hadoop/Hive2ActionExecutor.java | 5 + .../oozie/action/hadoop/HiveActionExecutor.java | 5 + .../oozie/action/hadoop/JavaActionExecutor.java | 86 +++- .../wf/WorkflowNotificationXCommand.java | 83 +--- .../apache/oozie/service/ShareLibService.java | 206 ++++++--- core/src/main/resources/oozie-default.xml | 9 + .../action/hadoop/TestJavaActionExecutor.java | 10 +- .../wf/TestWorkflowNotificationXCommand.java | 2 +- .../oozie/service/TestHAShareLibService.java | 2 +- .../oozie/service/TestShareLibService.java | 425 ++++++++++++++----- .../src/site/twiki/WorkflowFunctionalSpec.twiki | 2 + .../apache/oozie/hadoop/utils/HadoopShims.java | 4 + .../apache/oozie/hadoop/utils/HadoopShims.java | 5 +- .../apache/oozie/hadoop/utils/HadoopShims.java | 24 +- .../apache/oozie/hadoop/utils/HadoopShims.java | 24 +- release-log.txt | 1 + .../oozie/tools/TestOozieSharelibCLI.java | 4 +- 17 files changed, 645 insertions(+), 252 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/oozie/blob/87a6d053/core/src/main/java/org/apache/oozie/action/hadoop/Hive2ActionExecutor.java ---------------------------------------------------------------------- diff --git a/core/src/main/java/org/apache/oozie/action/hadoop/Hive2ActionExecutor.java b/core/src/main/java/org/apache/oozie/action/hadoop/Hive2ActionExecutor.java index d70c3e1..8f86c09 100644 --- a/core/src/main/java/org/apache/oozie/action/hadoop/Hive2ActionExecutor.java +++ b/core/src/main/java/org/apache/oozie/action/hadoop/Hive2ActionExecutor.java @@ -134,4 +134,9 @@ public class Hive2ActionExecutor extends ScriptLanguageActionExecutor { return HIVE2_SCRIPT; } + @Override + public String[] getShareLibFilesForActionConf() { + return new String[] { "hive-site.xml" }; + } + } \ No newline at end of file http://git-wip-us.apache.org/repos/asf/oozie/blob/87a6d053/core/src/main/java/org/apache/oozie/action/hadoop/HiveActionExecutor.java ---------------------------------------------------------------------- diff --git a/core/src/main/java/org/apache/oozie/action/hadoop/HiveActionExecutor.java b/core/src/main/java/org/apache/oozie/action/hadoop/HiveActionExecutor.java index 832bbe6..701d1c1 100644 --- a/core/src/main/java/org/apache/oozie/action/hadoop/HiveActionExecutor.java +++ b/core/src/main/java/org/apache/oozie/action/hadoop/HiveActionExecutor.java @@ -140,4 +140,9 @@ public class HiveActionExecutor extends ScriptLanguageActionExecutor { return XOozieClient.HIVE_SCRIPT; } + @Override + public String[] getShareLibFilesForActionConf() { + return new String[]{"hive-site.xml"}; + } + } http://git-wip-us.apache.org/repos/asf/oozie/blob/87a6d053/core/src/main/java/org/apache/oozie/action/hadoop/JavaActionExecutor.java ---------------------------------------------------------------------- diff --git a/core/src/main/java/org/apache/oozie/action/hadoop/JavaActionExecutor.java b/core/src/main/java/org/apache/oozie/action/hadoop/JavaActionExecutor.java index f207d74..6cf6ea3 100644 --- a/core/src/main/java/org/apache/oozie/action/hadoop/JavaActionExecutor.java +++ b/core/src/main/java/org/apache/oozie/action/hadoop/JavaActionExecutor.java @@ -18,17 +18,15 @@ package org.apache.oozie.action.hadoop; -import java.io.ByteArrayOutputStream; -import java.io.File; import java.io.FileNotFoundException; import java.io.IOException; -import java.io.PrintStream; import java.io.StringReader; import java.net.ConnectException; import java.net.URI; import java.net.URISyntaxException; import java.net.UnknownHostException; import java.util.ArrayList; +import java.util.Arrays; import java.util.HashMap; import java.util.HashSet; import java.util.Iterator; @@ -81,6 +79,7 @@ import org.apache.hadoop.security.token.Token; import org.apache.hadoop.security.token.TokenIdentifier; import org.apache.oozie.hadoop.utils.HadoopShims; + public class JavaActionExecutor extends ActionExecutor { private static final String HADOOP_USER = "user.name"; @@ -579,22 +578,74 @@ public class JavaActionExecutor extends ActionExecutor { protected void addShareLib(Configuration conf, String[] actionShareLibNames) throws ActionExecutorException { + Set<String> confSet = new HashSet<String>(Arrays.asList(getShareLibFilesForActionConf() == null ? new String[0] + : getShareLibFilesForActionConf())); + + Set<Path> sharelibList = new HashSet<Path>(); + if (actionShareLibNames != null) { try { ShareLibService shareLibService = Services.get().get(ShareLibService.class); FileSystem fs = shareLibService.getFileSystem(); if (fs != null) { - for (String actionShareLibName : actionShareLibNames) { + for (String actionShareLibName : actionShareLibNames) { List<Path> listOfPaths = shareLibService.getShareLibJars(actionShareLibName); if (listOfPaths != null && !listOfPaths.isEmpty()) { - for (Path actionLibPath : listOfPaths) { - JobUtils.addFileToClassPath(actionLibPath, conf, fs); - DistributedCache.createSymlink(conf); + String fragmentName = new URI(actionLibPath.toString()).getFragment(); + Path pathWithFragment = fragmentName == null ? actionLibPath : new Path(new URI( + actionLibPath.toString()).getPath()); + String fileName = fragmentName == null ? actionLibPath.getName() : fragmentName; + if (confSet.contains(fileName)) { + Configuration jobXmlConf = shareLibService.getShareLibConf(actionShareLibName, + pathWithFragment); + checkForDisallowedProps(jobXmlConf, actionLibPath.getName()); + XConfiguration.injectDefaults(jobXmlConf, conf); + LOG.trace("Adding properties of " + actionLibPath + " to job conf"); + } + else { + // Filtering out duplicate jars or files + sharelibList.add(new Path(actionLibPath.toUri()) { + @Override + public int hashCode() { + return getName().hashCode(); + } + @Override + public String getName() { + try { + return (new URI(toString())).getFragment() == null ? new Path(toUri()).getName() + : (new URI(toString())).getFragment(); + } + catch (URISyntaxException e) { + throw new RuntimeException(e); + } + } + @Override + public boolean equals(Object input) { + if (input == null) { + return false; + } + if (input == this) { + return true; + } + if (!(input instanceof Path)) { + return false; + } + return getName().equals(((Path) input).getName()); + } + }); + } } } } } + for (Path libPath : sharelibList) { + addToCache(conf, libPath, libPath.toUri().getPath(), false); + } + } + catch (URISyntaxException ex) { + throw new ActionExecutorException(ActionExecutorException.ErrorType.FAILED, "Error configuring sharelib", + ex.getMessage()); } catch (IOException ex) { throw new ActionExecutorException(ActionExecutorException.ErrorType.FAILED, "It should never happen", @@ -697,7 +748,7 @@ public class JavaActionExecutor extends ActionExecutor { } addAllShareLibs(appPath, conf, context, actionXml); - } + } // Adds action specific share libs and common share libs private void addAllShareLibs(Path appPath, Configuration conf, Context context, Element actionXml) @@ -719,9 +770,18 @@ public class JavaActionExecutor extends ActionExecutor { ioe.getMessage()); } // Action sharelibs are only added if user has specified to use system libpath - if (wfJobConf.getBoolean(OozieClient.USE_SYSTEM_LIBPATH, false)) { - // add action specific sharelibs - addShareLib(conf, getShareLibNames(context, actionXml, conf)); + if (conf.get(OozieClient.USE_SYSTEM_LIBPATH) == null) { + if (wfJobConf.getBoolean(OozieClient.USE_SYSTEM_LIBPATH, + ConfigurationService.getBoolean(OozieClient.USE_SYSTEM_LIBPATH))) { + // add action specific sharelibs + addShareLib(conf, getShareLibNames(context, actionXml, conf)); + } + } + else { + if (conf.getBoolean(OozieClient.USE_SYSTEM_LIBPATH, false)) { + // add action specific sharelibs + addShareLib(conf, getShareLibNames(context, actionXml, conf)); + } } } @@ -1453,6 +1513,10 @@ public class JavaActionExecutor extends ActionExecutor { return null; } + public String[] getShareLibFilesForActionConf() { + return null; + } + /** * Sets some data for the action on completion * http://git-wip-us.apache.org/repos/asf/oozie/blob/87a6d053/core/src/main/java/org/apache/oozie/command/wf/WorkflowNotificationXCommand.java ---------------------------------------------------------------------- diff --git a/core/src/main/java/org/apache/oozie/command/wf/WorkflowNotificationXCommand.java b/core/src/main/java/org/apache/oozie/command/wf/WorkflowNotificationXCommand.java index 7ea0e1e..d973def 100644 --- a/core/src/main/java/org/apache/oozie/command/wf/WorkflowNotificationXCommand.java +++ b/core/src/main/java/org/apache/oozie/command/wf/WorkflowNotificationXCommand.java @@ -22,38 +22,28 @@ import org.apache.oozie.client.OozieClient; import org.apache.oozie.WorkflowActionBean; import org.apache.oozie.WorkflowJobBean; import org.apache.oozie.command.CommandException; -import org.apache.oozie.command.PreconditionException; +import org.apache.oozie.command.NotificationXCommand; import org.apache.oozie.service.ConfigurationService; import org.apache.oozie.util.LogUtils; import org.apache.oozie.util.ParamChecker; -import org.apache.oozie.util.XLog; -import java.io.IOException; -import java.net.HttpURLConnection; -import java.net.URL; - -public class WorkflowNotificationXCommand extends WorkflowXCommand<Void> { - - public static final String NOTIFICATION_URL_CONNECTION_TIMEOUT_KEY = "oozie.notification.url.connection.timeout"; +public class WorkflowNotificationXCommand extends NotificationXCommand { private static final String STATUS_PATTERN = "\\$status"; private static final String JOB_ID_PATTERN = "\\$jobId"; private static final String NODE_NAME_PATTERN = "\\$nodeName"; - private String url; - private String id; - - //this variable is package private only for test purposes - int retries = 0; - public WorkflowNotificationXCommand(WorkflowJobBean workflow) { super("job.notification", "job.notification", 0); ParamChecker.notNull(workflow, "workflow"); - id = workflow.getId(); + jobId = workflow.getId(); url = workflow.getWorkflowInstance().getConf().get(OozieClient.WORKFLOW_NOTIFICATION_URL); if (url != null) { url = url.replaceAll(JOB_ID_PATTERN, workflow.getId()); url = url.replaceAll(STATUS_PATTERN, workflow.getStatus().toString()); + proxyConf = workflow.getWorkflowInstance().getConf() + .get(OozieClient.WORKFLOW_NOTIFICATION_PROXY, ConfigurationService.get(NOTIFICATION_PROXY_KEY)); + LOG.debug("Proxy :" + proxyConf); } } @@ -61,7 +51,7 @@ public class WorkflowNotificationXCommand extends WorkflowXCommand<Void> { super("action.notification", "job.notification", 0); ParamChecker.notNull(workflow, "workflow"); ParamChecker.notNull(action, "action"); - id = action.getId(); + jobId = action.getId(); url = workflow.getWorkflowInstance().getConf().get(OozieClient.ACTION_NOTIFICATION_URL); if (url != null) { url = url.replaceAll(JOB_ID_PATTERN, workflow.getId()); @@ -72,65 +62,14 @@ public class WorkflowNotificationXCommand extends WorkflowXCommand<Void> { else { url = url.replaceAll(STATUS_PATTERN, "S:" + action.getStatus().toString()); } + proxyConf = workflow.getWorkflowInstance().getConf() + .get(OozieClient.WORKFLOW_NOTIFICATION_PROXY, ConfigurationService.get(NOTIFICATION_PROXY_KEY)); + LOG.debug("Proxy :" + proxyConf); } } @Override - protected void setLogInfo() { - LogUtils.setLogInfo(id); - } - - @Override - protected boolean isLockRequired() { - return false; - } - - @Override - public String getEntityKey() { - return url; - } - - @Override protected void loadState() throws CommandException { + LogUtils.setLogInfo(jobId); } - - @Override - protected void verifyPrecondition() throws CommandException, PreconditionException { - } - - @Override - protected Void execute() throws CommandException { - if (url != null) { - int timeout = ConfigurationService.getInt(NOTIFICATION_URL_CONNECTION_TIMEOUT_KEY); - try { - URL url = new URL(this.url); - HttpURLConnection urlConn = (HttpURLConnection) url.openConnection(); - urlConn.setConnectTimeout(timeout); - urlConn.setReadTimeout(timeout); - if (urlConn.getResponseCode() != HttpURLConnection.HTTP_OK) { - handleRetry(); - } - } - catch (IOException ex) { - handleRetry(); - } - } - return null; - } - - private void handleRetry() { - if (retries < 3) { - retries++; - this.resetUsed(); - queue(this, 60 * 1000); - } - else { - LOG.warn(XLog.OPS, "could not send notification [{0}]", url); - } - } - - public String getUrl() { - return url; - } - } http://git-wip-us.apache.org/repos/asf/oozie/blob/87a6d053/core/src/main/java/org/apache/oozie/service/ShareLibService.java ---------------------------------------------------------------------- diff --git a/core/src/main/java/org/apache/oozie/service/ShareLibService.java b/core/src/main/java/org/apache/oozie/service/ShareLibService.java index bb0c7ed..51eb297 100644 --- a/core/src/main/java/org/apache/oozie/service/ShareLibService.java +++ b/core/src/main/java/org/apache/oozie/service/ShareLibService.java @@ -41,8 +41,8 @@ import java.util.Properties; import java.util.Set; import java.util.TimeZone; import java.util.Map.Entry; - import org.apache.commons.lang.StringUtils; +import org.apache.hadoop.conf.Configuration; import org.apache.hadoop.fs.FileStatus; import org.apache.hadoop.fs.FileSystem; import org.apache.hadoop.fs.Path; @@ -54,11 +54,12 @@ import org.apache.oozie.client.rest.JsonUtils; import org.apache.oozie.hadoop.utils.HadoopShims; import org.apache.oozie.util.Instrumentable; import org.apache.oozie.util.Instrumentation; +import org.apache.oozie.util.XConfiguration; import org.apache.oozie.util.XLog; - import com.google.common.annotations.VisibleForTesting; import org.apache.oozie.ErrorCode; +import org.jdom.JDOMException; public class ShareLibService implements Service, Instrumentable { @@ -74,9 +75,9 @@ public class ShareLibService implements Service, Instrumentable { private static final String PERMISSION_STRING = "-rwxr-xr-x"; - public static final String LAUNCHER_PREFIX = "launcher_"; + public static final String LAUNCHER_LIB_PREFIX = "launcher_"; - public static final String SHARED_LIB_PREFIX = "lib_"; + public static final String SHARE_LIB_PREFIX = "lib_"; public static final SimpleDateFormat dateFormat = new SimpleDateFormat("yyyyMMddHHmmss"); @@ -84,9 +85,13 @@ public class ShareLibService implements Service, Instrumentable { private Map<String, List<Path>> shareLibMap = new HashMap<String, List<Path>>(); + private Map<String, Map<Path, Configuration>> shareLibConfigMap = new HashMap<String, Map<Path, Configuration>>(); + private Map<String, List<Path>> launcherLibMap = new HashMap<String, List<Path>>(); - //symlink mapping. Oozie keeps on checking symlink path and if changes, Oozie reloads the sharelib + private Set<String> actionConfSet = new HashSet<String>(); + + // symlink mapping. Oozie keeps on checking symlink path and if changes, Oozie reloads the sharelib private Map<String, Map<Path, Path>> symlinkMapping = new HashMap<String, Map<Path, Path>>(); private static XLog LOG = XLog.getLog(ShareLibService.class); @@ -118,6 +123,8 @@ public class ShareLibService implements Service, Instrumentable { URI uri = launcherlibPath.toUri(); try { fs = FileSystem.get(has.createJobConf(uri.getAuthority())); + //cache action key sharelib conf list + cacheActionKeySharelibConfList(); updateLauncherLib(); updateShareLib(); } @@ -143,8 +150,8 @@ public class ShareLibService implements Service, Instrumentable { // Only one server should purge sharelib if (Services.get().get(JobsConcurrencyService.class).isLeader()) { final Date current = Calendar.getInstance(TimeZone.getTimeZone("GMT")).getTime(); - purgeLibs(fs, LAUNCHER_PREFIX, current); - purgeLibs(fs, SHARED_LIB_PREFIX, current); + purgeLibs(fs, LAUNCHER_LIB_PREFIX, current); + purgeLibs(fs, SHARE_LIB_PREFIX, current); } } catch (IOException e) { @@ -161,9 +168,7 @@ public class ShareLibService implements Service, Instrumentable { * Recursively change permissions. * * @throws IOException Signals that an I/O exception has occurred. - * @throws ClassNotFoundException the class not found exception */ - private void updateLauncherLib() throws IOException { if (isShipLauncherEnabled) { if (fs == null) { @@ -180,12 +185,11 @@ public class ShareLibService implements Service, Instrumentable { } /** - * Copy launcher jars to Temp directory + * Copy launcher jars to Temp directory. * * @param fs the FileSystem - * @param tmpShareLibPath destination path + * @param tmpLauncherLibPath the tmp launcher lib path * @throws IOException Signals that an I/O exception has occurred. - * @throws ClassNotFoundException the class not found exception */ private void setupLauncherLibPath(FileSystem fs, Path tmpLauncherLibPath) throws IOException { @@ -270,36 +274,51 @@ public class ShareLibService implements Service, Instrumentable { * @param fs the FileSystem * @param rootDir the root directory * @param listOfPaths the list of paths + * @param shareLibKey the share lib key * @return the path recursively * @throws IOException Signals that an I/O exception has occurred. */ - private void getPathRecursively(FileSystem fs, Path rootDir, List<Path> listOfPaths) throws IOException { + private void getPathRecursively(FileSystem fs, Path rootDir, List<Path> listOfPaths, String shareLibKey, + Map<String, Map<Path, Configuration>> shareLibConfigMap) throws IOException { if (rootDir == null) { return; } try { - if(fs.isFile(new Path(new URI(rootDir.toString()).getPath()))){ + if (fs.isFile(new Path(new URI(rootDir.toString()).getPath()))) { + Path filePath = new Path(new URI(rootDir.toString()).getPath()); + + if (isFilePartOfConfList(rootDir)) { + cachePropertyFile(filePath, shareLibKey, shareLibConfigMap); + } + listOfPaths.add(rootDir); return; } + + FileStatus[] status = fs.listStatus(rootDir); + if (status == null) { + LOG.info("Shared lib " + rootDir + " doesn't exist, not adding to cache"); + return; + } + + for (FileStatus file : status) { + if (file.isDir()) { + getPathRecursively(fs, file.getPath(), listOfPaths, shareLibKey, shareLibConfigMap); + } + else { + if (isFilePartOfConfList(file.getPath())) { + cachePropertyFile(file.getPath(), shareLibKey, shareLibConfigMap); + } + listOfPaths.add(file.getPath()); + } + } } catch (URISyntaxException e) { throw new IOException(e); } - FileStatus[] status = fs.listStatus(rootDir); - if (status == null) { - LOG.info("Shared lib " + rootDir + " doesn't exist, not adding to cache"); - return; - } - - for (FileStatus file : status) { - if (file.isDir()) { - getPathRecursively(fs, file.getPath(), listOfPaths); - } - else { - listOfPaths.add(file.getPath()); - } + catch (JDOMException e) { + throw new IOException(e); } } @@ -316,7 +335,7 @@ public class ShareLibService implements Service, Instrumentable { * * @param shareLibKey the sharelib key * @return List of paths - * @throws IOException + * @throws IOException Signals that an I/O exception has occurred. */ public List<Path> getShareLibJars(String shareLibKey) throws IOException { // Sharelib map is empty means that on previous or startup attempt of @@ -343,14 +362,21 @@ public class ShareLibService implements Service, Instrumentable { for (Path path : symlinkMapping.get(shareLibKey).keySet()) { if (!symlinkMapping.get(shareLibKey).get(path).equals(fileSystem.getSymLinkTarget(path))) { synchronized (ShareLibService.class) { - Map<String, List<Path>> tempShareLibMap = new HashMap<String, List<Path>>(shareLibMap); - Map<String, Map<Path, Path>> tmpSymlinkMapping = new HashMap<String, Map<Path, Path>>(symlinkMapping); + Map<String, List<Path>> tmpShareLibMap = new HashMap<String, List<Path>>(shareLibMap); + + Map<String, Map<Path, Configuration>> tmpShareLibConfigMap = new HashMap<String, Map<Path, Configuration>>( + shareLibConfigMap); + + Map<String, Map<Path, Path>> tmpSymlinkMapping = new HashMap<String, Map<Path, Path>>( + symlinkMapping); - LOG.info(MessageFormat.format("Symlink target for [{0}] has changed, was [{1}], now [{2}]", shareLibKey, - path, fileSystem.getSymLinkTarget(path))); - loadShareLibMetaFile(tempShareLibMap, tmpSymlinkMapping, sharelibMappingFile, shareLibKey); - shareLibMap = tempShareLibMap; + LOG.info(MessageFormat.format("Symlink target for [{0}] has changed, was [{1}], now [{2}]", + shareLibKey, path, fileSystem.getSymLinkTarget(path))); + loadShareLibMetaFile(tmpShareLibMap, tmpSymlinkMapping, tmpShareLibConfigMap, sharelibMappingFile, + shareLibKey); + shareLibMap = tmpShareLibMap; symlinkMapping = tmpSymlinkMapping; + shareLibConfigMap = tmpShareLibConfigMap; return; } @@ -364,8 +390,7 @@ public class ShareLibService implements Service, Instrumentable { * * @param shareLibKey the shareLib key * @return launcher jars paths - * @throws ClassNotFoundException - * @throws IOException + * @throws IOException Signals that an I/O exception has occurred. */ public List<Path> getSystemLibJars(String shareLibKey) throws IOException { List<Path> returnList = new ArrayList<Path>(); @@ -439,6 +464,7 @@ public class ShareLibService implements Service, Instrumentable { * * @param fs the fs * @param prefix the prefix + * @param current the current time * @throws IOException Signals that an I/O exception has occurred. */ private void purgeLibs(FileSystem fs, final String prefix, final Date current) throws IOException { @@ -485,7 +511,6 @@ public class ShareLibService implements Service, Instrumentable { public void destroy() { shareLibMap.clear(); launcherLibMap.clear(); - } @Override @@ -511,11 +536,12 @@ public class ShareLibService implements Service, Instrumentable { Map<String, List<Path>> tempShareLibMap = new HashMap<String, List<Path>>(); Map<String, Map<Path, Path>> tmpSymlinkMapping = new HashMap<String, Map<Path, Path>>(); + Map<String, Map<Path, Configuration>> tmpShareLibConfigMap = new HashMap<String, Map<Path, Configuration>>(); if (!StringUtils.isEmpty(sharelibMappingFile.trim())) { String sharelibMetaFileNewTimeStamp = JsonUtils.formatDateRfc822( new Date(fs.getFileStatus(new Path(sharelibMappingFile)).getModificationTime()), "GMT"); - loadShareLibMetaFile(tempShareLibMap, tmpSymlinkMapping, sharelibMappingFile, null); + loadShareLibMetaFile(tempShareLibMap, tmpSymlinkMapping, tmpShareLibConfigMap, sharelibMappingFile, null); status.put("sharelibMetaFile", sharelibMappingFile); status.put("sharelibMetaFileNewTimeStamp", sharelibMetaFileNewTimeStamp); status.put("sharelibMetaFileOldTimeStamp", sharelibMetaFileOldTimeStamp); @@ -523,8 +549,8 @@ public class ShareLibService implements Service, Instrumentable { } else { Path shareLibpath = getLatestLibPath(services.get(WorkflowAppService.class).getSystemLibPath(), - SHARED_LIB_PREFIX); - loadShareLibfromDFS(tempShareLibMap, shareLibpath); + SHARE_LIB_PREFIX); + loadShareLibfromDFS(tempShareLibMap, shareLibpath, tmpShareLibConfigMap); if (shareLibpath != null) { status.put("sharelibDirNew", shareLibpath.toString()); @@ -535,6 +561,7 @@ public class ShareLibService implements Service, Instrumentable { } shareLibMap = tempShareLibMap; symlinkMapping = tmpSymlinkMapping; + shareLibConfigMap = tmpShareLibConfigMap; return status; } @@ -545,7 +572,8 @@ public class ShareLibService implements Service, Instrumentable { * @param shareLibpath the share libpath * @throws IOException Signals that an I/O exception has occurred. */ - private void loadShareLibfromDFS(Map<String, List<Path>> shareLibMap, Path shareLibpath) throws IOException { + private void loadShareLibfromDFS(Map<String, List<Path>> shareLibMap, Path shareLibpath, + Map<String, Map<Path, Configuration>> shareLibConfigMap) throws IOException { if (shareLibpath == null) { LOG.info("No share lib directory found"); @@ -564,7 +592,7 @@ public class ShareLibService implements Service, Instrumentable { continue; } List<Path> listOfPaths = new ArrayList<Path>(); - getPathRecursively(fs, dir.getPath(), listOfPaths); + getPathRecursively(fs, dir.getPath(), listOfPaths, dir.getPath().getName(), shareLibConfigMap); shareLibMap.put(dir.getPath().getName(), listOfPaths); LOG.info("Share lib for " + dir.getPath().getName() + ":" + listOfPaths); @@ -577,13 +605,14 @@ public class ShareLibService implements Service, Instrumentable { * value is the DFS location of sharelib files. * * @param shareLibMap the share lib jar map - * @param sharelipFileMapping the sharelip file mapping * @param symlinkMapping the symlink mapping - * @parm shareLibKey the sharelib key + * @param sharelibFileMapping the sharelib file mapping + * @param shareLibKey the share lib key * @throws IOException Signals that an I/O exception has occurred. + * @parm shareLibKey the sharelib key */ - private void loadShareLibMetaFile(Map<String, List<Path>> shareLibMap, - Map<String, Map<Path, Path>> symlinkMapping, String sharelibFileMapping, String shareLibKey) + private void loadShareLibMetaFile(Map<String, List<Path>> shareLibMap, Map<String, Map<Path, Path>> symlinkMapping, + Map<String, Map<Path, Configuration>> shareLibConfigMap, String sharelibFileMapping, String shareLibKey) throws IOException { Path shareFileMappingPath = new Path(sharelibFileMapping); @@ -597,21 +626,22 @@ public class ShareLibService implements Service, Instrumentable { String mapKey = key.substring(SHARE_LIB_CONF_PREFIX.length() + 1); if (key.toLowerCase().startsWith(SHARE_LIB_CONF_PREFIX) && (shareLibKey == null || shareLibKey.equals(mapKey))) { - loadSharelib(shareLibMap, symlinkMapping, mapKey, ((String) prop.get(key)).split(",")); + loadSharelib(shareLibMap, symlinkMapping, shareLibConfigMap, mapKey, + ((String) prop.get(key)).split(",")); } } } private void loadSharelib(Map<String, List<Path>> tmpShareLibMap, Map<String, Map<Path, Path>> tmpSymlinkMapping, - String shareLibKey, String pathList[]) throws IOException { + Map<String, Map<Path, Configuration>> shareLibConfigMap, String shareLibKey, String pathList[]) + throws IOException { List<Path> listOfPaths = new ArrayList<Path>(); Map<Path, Path> symlinkMappingforAction = new HashMap<Path, Path>(); HadoopShims fileSystem = new HadoopShims(fs); for (String dfsPath : pathList) { Path path = new Path(dfsPath); - - getPathRecursively(fs, path, listOfPaths); + getPathRecursively(fs, new Path(dfsPath), listOfPaths, shareLibKey, shareLibConfigMap); if (HadoopShims.isSymlinkSupported() && fileSystem.isSymlink(path)) { symlinkMappingforAction.put(path, fileSystem.getSymLinkTarget(path)); } @@ -631,7 +661,7 @@ public class ShareLibService implements Service, Instrumentable { */ private Path getLauncherlibPath() { String formattedDate = dateFormat.format(Calendar.getInstance(TimeZone.getTimeZone("GMT")).getTime()); - Path tmpLauncherLibPath = new Path(services.get(WorkflowAppService.class).getSystemLibPath(), LAUNCHER_PREFIX + Path tmpLauncherLibPath = new Path(services.get(WorkflowAppService.class).getSystemLibPath(), LAUNCHER_LIB_PREFIX + formattedDate); return tmpLauncherLibPath; } @@ -670,7 +700,7 @@ public class ShareLibService implements Service, Instrumentable { max = d; } } - //If there are no timestamped directories, fall back to root directory + // If there are no timestamped directories, fall back to root directory if (path == null) { path = rootDir; } @@ -710,7 +740,7 @@ public class ShareLibService implements Service, Instrumentable { String sharelibPath = "(unavailable)"; try { Path libPath = getLatestLibPath(services.get(WorkflowAppService.class).getSystemLibPath(), - SHARED_LIB_PREFIX); + SHARE_LIB_PREFIX); if (libPath != null) { sharelibPath = libPath.toUri().toString(); } @@ -768,6 +798,22 @@ public class ShareLibService implements Service, Instrumentable { } }); + instr.addVariable("libs", "sharelib.cached.config.file", new Instrumentation.Variable<String>() { + @Override + public String getValue() { + Map<String, Map<Path, Configuration>> shareLibConfigMap = getShareLibConfigMap(); + if (shareLibConfigMap != null && !shareLibConfigMap.isEmpty()) { + StringBuffer bf = new StringBuffer(); + + for (String path : shareLibConfigMap.keySet()) { + bf.append(path).append(";"); + } + return bf.toString(); + } + return "(none)"; + } + }); + } /** @@ -780,4 +826,58 @@ public class ShareLibService implements Service, Instrumentable { public FileSystem getFileSystem() { return fs; } + + /** + * Cache XML conf file + * + * @param hdfsPath the hdfs path + * @param shareLibKey the share lib key + * @throws IOException Signals that an I/O exception has occurred. + * @throws JDOMException + */ + private void cachePropertyFile(Path hdfsPath, String shareLibKey, + Map<String, Map<Path, Configuration>> shareLibConfigMap) throws IOException, JDOMException { + Map<Path, Configuration> confMap = shareLibConfigMap.get(shareLibKey); + if (confMap == null) { + confMap = new HashMap<Path, Configuration>(); + shareLibConfigMap.put(shareLibKey, confMap); + } + Configuration xmlConf = new XConfiguration(fs.open(hdfsPath)); + confMap.put(hdfsPath, xmlConf); + + } + + private void cacheActionKeySharelibConfList() { + ActionService actionService = Services.get().get(ActionService.class); + Set<String> actionTypes = actionService.getActionTypes(); + for (String key : actionTypes) { + ActionExecutor executor = actionService.getExecutor(key); + if (executor instanceof JavaActionExecutor) { + JavaActionExecutor jexecutor = (JavaActionExecutor) executor; + actionConfSet.addAll( + new HashSet<String>(Arrays.asList(jexecutor.getShareLibFilesForActionConf() == null ? new String[0] + : jexecutor.getShareLibFilesForActionConf()))); + } + } + } + + public Configuration getShareLibConf(String inputKey, Path path) { + Configuration conf = new Configuration(); + if (shareLibConfigMap.containsKey(inputKey)) { + conf = shareLibConfigMap.get(inputKey).get(path); + } + + return conf; + } + + @VisibleForTesting + public Map<String, Map<Path, Configuration>> getShareLibConfigMap() { + return shareLibConfigMap; + } + + private boolean isFilePartOfConfList(Path path) throws URISyntaxException { + String fragmentName = new URI(path.toString()).getFragment(); + String fileName = fragmentName == null ? path.getName() : fragmentName; + return actionConfSet.contains(fileName); + } } http://git-wip-us.apache.org/repos/asf/oozie/blob/87a6d053/core/src/main/resources/oozie-default.xml ---------------------------------------------------------------------- diff --git a/core/src/main/resources/oozie-default.xml b/core/src/main/resources/oozie-default.xml index db8aa24..31c80ac 100644 --- a/core/src/main/resources/oozie-default.xml +++ b/core/src/main/resources/oozie-default.xml @@ -2429,4 +2429,13 @@ </description> </property> + <property> + <name>oozie.use.system.libpath</name> + <value>false</value> + <description> + Default value of oozie.use.system.libpath. If user haven't specified =oozie.use.system.libpath= + in the job.properties and this value is true and Oozie will include sharelib jars for workflow. + </description> + </property> + </configuration> http://git-wip-us.apache.org/repos/asf/oozie/blob/87a6d053/core/src/test/java/org/apache/oozie/action/hadoop/TestJavaActionExecutor.java ---------------------------------------------------------------------- diff --git a/core/src/test/java/org/apache/oozie/action/hadoop/TestJavaActionExecutor.java b/core/src/test/java/org/apache/oozie/action/hadoop/TestJavaActionExecutor.java index 48166a5..c993132 100644 --- a/core/src/test/java/org/apache/oozie/action/hadoop/TestJavaActionExecutor.java +++ b/core/src/test/java/org/apache/oozie/action/hadoop/TestJavaActionExecutor.java @@ -1261,7 +1261,7 @@ public class TestJavaActionExecutor extends ActionExecutorTestCase { WorkflowAppService wps = Services.get().get(WorkflowAppService.class); - Path systemLibPath = new Path(wps.getSystemLibPath(), ShareLibService.SHARED_LIB_PREFIX + Path systemLibPath = new Path(wps.getSystemLibPath(), ShareLibService.SHARE_LIB_PREFIX + new SimpleDateFormat("yyyyMMddHHmmss").format(new Date()).toString()); Path javaShareLibPath = new Path(systemLibPath, "java"); @@ -1376,7 +1376,7 @@ public class TestJavaActionExecutor extends ActionExecutorTestCase { new Services().init(); // Create the dir WorkflowAppService wps = Services.get().get(WorkflowAppService.class); - Path systemLibPath = new Path(wps.getSystemLibPath(), ShareLibService.SHARED_LIB_PREFIX + Path systemLibPath = new Path(wps.getSystemLibPath(), ShareLibService.SHARE_LIB_PREFIX + new SimpleDateFormat("yyyyMMddHHmmss").format(new Date()).toString()); Path javaShareLibPath = new Path(systemLibPath, "java-action-executor"); getFileSystem().mkdirs(javaShareLibPath); @@ -1385,7 +1385,7 @@ public class TestJavaActionExecutor extends ActionExecutorTestCase { JobConf conf = ae.createBaseHadoopConf(context, eActionXml); // Despite systemLibPath is not fully qualified and the action refers to the // second namenode the next line won't throw exception because default fs is used - ae.addShareLib(conf, new String[]{"java-action-executor"}); + ae.addShareLib(conf, new String[] { "java-action-executor" }); // Set sharelib to a full path (i.e. include scheme and authority) Services.get().destroy(); @@ -1394,7 +1394,7 @@ public class TestJavaActionExecutor extends ActionExecutorTestCase { Services.get().setService(ShareLibService.class); conf = ae.createBaseHadoopConf(context, eActionXml); // The next line should not throw an Exception because it will get the scheme and authority from the sharelib path - ae.addShareLib(conf, new String[]{"java-action-executor"}); + ae.addShareLib(conf, new String[] { "java-action-executor" }); } public void testFilesystemScheme() throws Exception { @@ -2224,7 +2224,7 @@ public class TestJavaActionExecutor extends ActionExecutorTestCase { WorkflowAppService wps = Services.get().get(WorkflowAppService.class); - Path systemLibPath = new Path(wps.getSystemLibPath(), ShareLibService.SHARED_LIB_PREFIX + Path systemLibPath = new Path(wps.getSystemLibPath(), ShareLibService.SHARE_LIB_PREFIX + new SimpleDateFormat("yyyyMMddHHmmss").format(new Date()).toString()); File jarFile = IOUtils.createJar(new File(getTestCaseDir()), "sourcejar.jar", LauncherMainTester.class); http://git-wip-us.apache.org/repos/asf/oozie/blob/87a6d053/core/src/test/java/org/apache/oozie/command/wf/TestWorkflowNotificationXCommand.java ---------------------------------------------------------------------- diff --git a/core/src/test/java/org/apache/oozie/command/wf/TestWorkflowNotificationXCommand.java b/core/src/test/java/org/apache/oozie/command/wf/TestWorkflowNotificationXCommand.java index b427180..4d2df21 100644 --- a/core/src/test/java/org/apache/oozie/command/wf/TestWorkflowNotificationXCommand.java +++ b/core/src/test/java/org/apache/oozie/command/wf/TestWorkflowNotificationXCommand.java @@ -69,7 +69,7 @@ public class TestWorkflowNotificationXCommand extends XTestCase { Mockito.when(workflow.getStatus()).thenReturn(WorkflowJob.Status.SUCCEEDED); Mockito.when(workflow.getWorkflowInstance()).thenReturn(wfi); WorkflowNotificationXCommand command = new WorkflowNotificationXCommand(workflow); - command.retries = 3; + command.setRetry(3); long start = System.currentTimeMillis(); command.call(); long end = System.currentTimeMillis(); http://git-wip-us.apache.org/repos/asf/oozie/blob/87a6d053/core/src/test/java/org/apache/oozie/service/TestHAShareLibService.java ---------------------------------------------------------------------- diff --git a/core/src/test/java/org/apache/oozie/service/TestHAShareLibService.java b/core/src/test/java/org/apache/oozie/service/TestHAShareLibService.java index d2ad881..791f568 100644 --- a/core/src/test/java/org/apache/oozie/service/TestHAShareLibService.java +++ b/core/src/test/java/org/apache/oozie/service/TestHAShareLibService.java @@ -69,7 +69,7 @@ public class TestHAShareLibService extends ZKXTestCase { Date time = new Date(System.currentTimeMillis()); Path basePath = new Path(Services.get().getConf().get(WorkflowAppService.SYSTEM_LIB_PATH)); - Path libpath = new Path(basePath, ShareLibService.SHARED_LIB_PREFIX + ShareLibService.dateFormat.format(time)); + Path libpath = new Path(basePath, ShareLibService.SHARE_LIB_PREFIX + ShareLibService.dateFormat.format(time)); fs.mkdirs(libpath); Path pigPath = new Path(libpath.toString() + Path.SEPARATOR + "pig"); http://git-wip-us.apache.org/repos/asf/oozie/blob/87a6d053/core/src/test/java/org/apache/oozie/service/TestShareLibService.java ---------------------------------------------------------------------- diff --git a/core/src/test/java/org/apache/oozie/service/TestShareLibService.java b/core/src/test/java/org/apache/oozie/service/TestShareLibService.java index f261448..842b2d5 100644 --- a/core/src/test/java/org/apache/oozie/service/TestShareLibService.java +++ b/core/src/test/java/org/apache/oozie/service/TestShareLibService.java @@ -20,7 +20,9 @@ package org.apache.oozie.service; import java.io.File; import java.io.IOException; +import java.io.PrintWriter; import java.net.URI; +import java.net.URLDecoder; import java.text.SimpleDateFormat; import java.util.Arrays; import java.util.Date; @@ -36,6 +38,7 @@ import org.apache.hadoop.fs.Path; import org.apache.oozie.WorkflowActionBean; import org.apache.oozie.WorkflowJobBean; import org.apache.oozie.action.hadoop.ActionExecutorTestCase.Context; +import org.apache.oozie.action.hadoop.HiveActionExecutor; import org.apache.oozie.action.hadoop.JavaActionExecutor; import org.apache.oozie.action.hadoop.PigActionExecutor; import org.apache.oozie.action.hadoop.TestJavaActionExecutor; @@ -54,6 +57,8 @@ public class TestShareLibService extends XFsTestCase { private static String testCaseDirPath; String shareLibPath = "shareLibPath"; SimpleDateFormat dt = new SimpleDateFormat("yyyyMMddHHmmss"); + final String sharelibPath = "sharelib"; + final String metaFile = "/user/test/config.properties"; @Override protected void setUp() throws Exception { @@ -69,11 +74,15 @@ public class TestShareLibService extends XFsTestCase { private void setSystemProps() throws IOException { IOUtils.createJar(new File(getTestCaseDir()), MyOozie.class.getName() + ".jar", MyOozie.class); IOUtils.createJar(new File(getTestCaseDir()), MyPig.class.getName() + ".jar", MyPig.class); - Configuration conf = services.getConf(); + IOUtils.createJar(new File(getTestCaseDir()), TestHive.class.getName() + ".jar", TestHive.class); + + Configuration conf = services.get(ConfigurationService.class).getConf(); conf.set(WorkflowAppService.SYSTEM_LIB_PATH, getFsTestCaseDir() + "/share/lib"); conf.set(Services.CONF_SERVICE_CLASSES, conf.get(Services.CONF_SERVICE_CLASSES) + "," + DummyShareLibService.class.getName()); - conf.set(ActionService.CONF_ACTION_EXECUTOR_CLASSES, DummyPigActionExecutor.class.getName()); + conf.setStrings(ActionService.CONF_ACTION_EXECUTOR_CLASSES, DummyPigActionExecutor.class.getName(), + DummyHiveActionExecutor.class.getName()); + } public static class DummyShareLibService extends ShareLibService { @@ -96,17 +105,30 @@ public class TestShareLibService extends XFsTestCase { } } + public static class DummyHiveActionExecutor extends HiveActionExecutor { + public DummyHiveActionExecutor() { + } + + @Override + public List<Class> getLauncherClasses() { + return Arrays.asList((Class) TestHive.class); + } + } + static class MyOozie { } static class MyPig { } + static class TestHive { + } + @Test public void testfailFast() throws Exception { services = new Services(); setSystemProps(); - Configuration conf = services.getConf(); + Configuration conf = services.get(ConfigurationService.class).getConf(); conf.set(ShareLibService.FAIL_FAST_ON_STARTUP, "true"); // Set dummyfile as metafile which doesn't exist. conf.set(ShareLibService.SHARELIB_MAPPING_FILE, String.valueOf(new Date().getTime())); @@ -126,7 +148,7 @@ public class TestShareLibService extends XFsTestCase { public void testCreateLauncherLibPath() throws Exception { services = new Services(); setSystemProps(); - Configuration conf = services.getConf(); + Configuration conf = services.get(ConfigurationService.class).getConf(); conf.set(ShareLibService.SHIP_LAUNCHER_JAR, "true"); try { services.init(); @@ -146,7 +168,7 @@ public class TestShareLibService extends XFsTestCase { public void testAddShareLibDistributedCache() throws Exception { services = new Services(); setSystemProps(); - Configuration conf = services.getConf(); + Configuration conf = services.get(ConfigurationService.class).getConf(); conf.set(ShareLibService.SHIP_LAUNCHER_JAR, "true"); try { @@ -163,19 +185,8 @@ public class TestShareLibService extends XFsTestCase { PigActionExecutor ae = new PigActionExecutor(); Configuration jobConf = ae.createBaseHadoopConf(context, eActionXml); ae.setLibFilesArchives(context, eActionXml, new Path("hdfs://dummyAppPath"), jobConf); - - URI[] cacheFiles = DistributedCache.getCacheFiles(jobConf); - String cacheFilesStr = Arrays.toString(cacheFiles); - assertTrue(cacheFilesStr.contains(MyPig.class.getName() + ".jar")); - assertTrue(cacheFilesStr.contains(MyOozie.class.getName() + ".jar")); - // Hadoop 2 has two extra jars - if (cacheFiles.length == 4) { - assertTrue(cacheFilesStr.contains("MRAppJar.jar")); - assertTrue(cacheFilesStr.contains("hadoop-mapreduce-client-jobclient-")); - } - else { - assertEquals(2, cacheFiles.length); - } + verifyFilesInDistributedCache(DistributedCache.getCacheFiles(jobConf), MyPig.class.getName() + ".jar", + MyOozie.class.getName() + ".jar"); } finally { services.destroy(); @@ -186,7 +197,7 @@ public class TestShareLibService extends XFsTestCase { public void testAddShareLib_pig() throws Exception { services = new Services(); setSystemProps(); - Configuration conf = services.getConf(); + Configuration conf = services.get(ConfigurationService.class).getConf(); conf.set(ShareLibService.SHIP_LAUNCHER_JAR, "true"); try { services.init(); @@ -202,19 +213,8 @@ public class TestShareLibService extends XFsTestCase { PigActionExecutor ae = new PigActionExecutor(); Configuration jobConf = ae.createBaseHadoopConf(context, eActionXml); ae.setLibFilesArchives(context, eActionXml, new Path("hdfs://dummyAppPath"), jobConf); + verifyFilesInDistributedCache(DistributedCache.getCacheFiles(jobConf), "MyPig.jar", "MyOozie.jar"); - URI[] cacheFiles = DistributedCache.getCacheFiles(jobConf); - String cacheFilesStr = Arrays.toString(cacheFiles); - assertTrue(cacheFilesStr.contains("MyPig.jar")); - assertTrue(cacheFilesStr.contains("MyOozie.jar")); - // Hadoop 2 has two extra jars - if (cacheFiles.length == 4) { - assertTrue(cacheFilesStr.contains("MRAppJar.jar")); - assertTrue(cacheFilesStr.contains("hadoop-mapreduce-client-jobclient-")); - } - else { - assertEquals(2, cacheFiles.length); - } } finally { services.destroy(); @@ -225,14 +225,15 @@ public class TestShareLibService extends XFsTestCase { public void testAddShareLib_pig_withVersion() throws Exception { services = new Services(); setSystemProps(); - Configuration conf = services.getConf(); + Configuration conf = services.get(ConfigurationService.class).getConf(); conf.set(ShareLibService.SHIP_LAUNCHER_JAR, "true"); FileSystem fs = getFileSystem(); Date time = new Date(System.currentTimeMillis()); - Path basePath = new Path(services.getConf().get(WorkflowAppService.SYSTEM_LIB_PATH)); - Path libpath = new Path(basePath, ShareLibService.SHARED_LIB_PREFIX + ShareLibService.dateFormat.format(time)); + Path basePath = new Path(services.get(ConfigurationService.class).getConf() + .get(WorkflowAppService.SYSTEM_LIB_PATH)); + Path libpath = new Path(basePath, ShareLibService.SHARE_LIB_PREFIX + ShareLibService.dateFormat.format(time)); fs.mkdirs(libpath); Path pigPath = new Path(libpath.toString() + Path.SEPARATOR + "pig"); @@ -263,19 +264,9 @@ public class TestShareLibService extends XFsTestCase { jobConf.set("oozie.action.sharelib.for.pig", "pig_10"); ae.setLibFilesArchives(context, eActionXml, new Path("hdfs://dummyAppPath"), jobConf); - URI[] cacheFiles = DistributedCache.getCacheFiles(jobConf); - String cacheFilesStr = Arrays.toString(cacheFiles); - assertTrue(cacheFilesStr.contains("MyPig.jar")); - assertTrue(cacheFilesStr.contains("MyOozie.jar")); - assertTrue(cacheFilesStr.contains("pig-10.jar")); - // Hadoop 2 has two extra jars - if (cacheFiles.length == 5) { - assertTrue(cacheFilesStr.contains("MRAppJar.jar")); - assertTrue(cacheFilesStr.contains("hadoop-mapreduce-client-jobclient-")); - } - else { - assertEquals(3, cacheFiles.length); - } + verifyFilesInDistributedCache(DistributedCache.getCacheFiles(jobConf), "MyPig.jar", "MyOozie.jar", + "pig-10.jar"); + } finally { services.destroy(); @@ -286,12 +277,13 @@ public class TestShareLibService extends XFsTestCase { public void testPurgeShareLib() throws Exception { services = new Services(); setSystemProps(); - Configuration conf = services.getConf(); + Configuration conf = services.get(ConfigurationService.class).getConf(); conf.set(ShareLibService.SHIP_LAUNCHER_JAR, "true"); FileSystem fs = getFileSystem(); long expiryTime = System.currentTimeMillis() - TimeUnit.MILLISECONDS.convert( - services.getConf().getInt(ShareLibService.LAUNCHERJAR_LIB_RETENTION, 7), TimeUnit.DAYS); + services.get(ConfigurationService.class).getConf() + .getInt(ShareLibService.LAUNCHERJAR_LIB_RETENTION, 7), TimeUnit.DAYS); // for directory created 8 days back to be deleted String expireTs = dt.format(new Date(expiryTime - TimeUnit.MILLISECONDS.convert(1, TimeUnit.DAYS))); @@ -300,10 +292,11 @@ public class TestShareLibService extends XFsTestCase { // for directory created 5 days back NOT to be deleted String noexpireTs1 = dt.format(new Date(expiryTime + TimeUnit.MILLISECONDS.convert(2, TimeUnit.DAYS))); - Path basePath = new Path(services.getConf().get(WorkflowAppService.SYSTEM_LIB_PATH)); - Path expirePath = new Path(basePath, ShareLibService.SHARED_LIB_PREFIX + expireTs); - Path noexpirePath = new Path(basePath, ShareLibService.SHARED_LIB_PREFIX + noexpireTs); - Path noexpirePath1 = new Path(basePath, ShareLibService.SHARED_LIB_PREFIX + noexpireTs1); + Path basePath = new Path(services.get(ConfigurationService.class).getConf() + .get(WorkflowAppService.SYSTEM_LIB_PATH)); + Path expirePath = new Path(basePath, ShareLibService.SHARE_LIB_PREFIX + expireTs); + Path noexpirePath = new Path(basePath, ShareLibService.SHARE_LIB_PREFIX + noexpireTs); + Path noexpirePath1 = new Path(basePath, ShareLibService.SHARE_LIB_PREFIX + noexpireTs1); createDirs(fs, expirePath, noexpirePath, noexpirePath1); try { @@ -322,13 +315,14 @@ public class TestShareLibService extends XFsTestCase { public void testPurgeLauncherJar() throws Exception { services = new Services(); setSystemProps(); - Configuration conf = services.getConf(); + Configuration conf = services.get(ConfigurationService.class).getConf(); conf.set(ShareLibService.SHIP_LAUNCHER_JAR, "true"); FileSystem fs = getFileSystem(); long expiryTime = System.currentTimeMillis() - TimeUnit.MILLISECONDS.convert( - services.getConf().getInt(ShareLibService.LAUNCHERJAR_LIB_RETENTION, 7), TimeUnit.DAYS); + services.get(ConfigurationService.class).getConf() + .getInt(ShareLibService.LAUNCHERJAR_LIB_RETENTION, 7), TimeUnit.DAYS); // for directory created 8 days back to be deleted String expireTs = dt.format(new Date(expiryTime - TimeUnit.MILLISECONDS.convert(1, TimeUnit.DAYS))); @@ -337,11 +331,12 @@ public class TestShareLibService extends XFsTestCase { // for directory created 5 days back NOT to be deleted String noexpireTs1 = dt.format(new Date(expiryTime + TimeUnit.MILLISECONDS.convert(2, TimeUnit.DAYS))); - Path basePath = new Path(services.getConf().get(WorkflowAppService.SYSTEM_LIB_PATH)); - Path expirePath = new Path(basePath, ShareLibService.LAUNCHER_PREFIX + expireTs); - Path noexpirePath = new Path(basePath, ShareLibService.LAUNCHER_PREFIX + noexpireTs); + Path basePath = new Path(services.get(ConfigurationService.class).getConf() + .get(WorkflowAppService.SYSTEM_LIB_PATH)); + Path expirePath = new Path(basePath, ShareLibService.LAUNCHER_LIB_PREFIX + expireTs); + Path noexpirePath = new Path(basePath, ShareLibService.LAUNCHER_LIB_PREFIX + noexpireTs); - Path noexpirePath1 = new Path(basePath, ShareLibService.LAUNCHER_PREFIX + noexpireTs1); + Path noexpirePath1 = new Path(basePath, ShareLibService.LAUNCHER_LIB_PREFIX + noexpireTs1); createDirs(fs, expirePath, noexpirePath, noexpirePath1); try { @@ -362,24 +357,26 @@ public class TestShareLibService extends XFsTestCase { public void testPurgeJar() throws Exception { services = new Services(); setSystemProps(); - Configuration conf = services.getConf(); + Configuration conf = services.get(ConfigurationService.class).getConf(); conf.set(ShareLibService.SHIP_LAUNCHER_JAR, "true"); final FileSystem fs = getFileSystem(); // for directory created 8 days back to be deleted long expiryTime = System.currentTimeMillis() - TimeUnit.MILLISECONDS.convert( - services.getConf().getInt(ShareLibService.LAUNCHERJAR_LIB_RETENTION, 7), TimeUnit.DAYS); + services.get(ConfigurationService.class).getConf() + .getInt(ShareLibService.LAUNCHERJAR_LIB_RETENTION, 7), TimeUnit.DAYS); String expireTs = dt.format(new Date(expiryTime - TimeUnit.MILLISECONDS.convert(1, TimeUnit.DAYS))); String expireTs1 = dt.format(new Date(expiryTime - TimeUnit.MILLISECONDS.convert(2, TimeUnit.DAYS))); String noexpireTs = dt.format(new Date(expiryTime + TimeUnit.MILLISECONDS.convert(1, TimeUnit.DAYS))); String noexpireTs1 = dt.format(new Date(expiryTime + TimeUnit.MILLISECONDS.convert(2, TimeUnit.DAYS))); - final Path basePath = new Path(services.getConf().get(WorkflowAppService.SYSTEM_LIB_PATH)); + final Path basePath = new Path(services.get(ConfigurationService.class).getConf() + .get(WorkflowAppService.SYSTEM_LIB_PATH)); - Path expirePath = new Path(basePath, ShareLibService.LAUNCHER_PREFIX + expireTs); - Path expirePath1 = new Path(basePath, ShareLibService.LAUNCHER_PREFIX + expireTs1); - Path noexpirePath = new Path(basePath, ShareLibService.LAUNCHER_PREFIX + noexpireTs); - Path noexpirePath1 = new Path(basePath, ShareLibService.LAUNCHER_PREFIX + noexpireTs1); + Path expirePath = new Path(basePath, ShareLibService.LAUNCHER_LIB_PREFIX + expireTs); + Path expirePath1 = new Path(basePath, ShareLibService.LAUNCHER_LIB_PREFIX + expireTs1); + Path noexpirePath = new Path(basePath, ShareLibService.LAUNCHER_LIB_PREFIX + noexpireTs); + Path noexpirePath1 = new Path(basePath, ShareLibService.LAUNCHER_LIB_PREFIX + noexpireTs1); createDirs(fs, expirePath, expirePath1, noexpirePath, noexpirePath1); try { @@ -407,7 +404,8 @@ public class TestShareLibService extends XFsTestCase { services = new Services(); setSystemProps(); FileSystem fs = getFileSystem(); - Path basePath = new Path(services.getConf().get(WorkflowAppService.SYSTEM_LIB_PATH)); + Path basePath = new Path(services.get(ConfigurationService.class).getConf() + .get(WorkflowAppService.SYSTEM_LIB_PATH)); // Use basepath if there is no timestamped directory fs.mkdirs(basePath); @@ -428,11 +426,12 @@ public class TestShareLibService extends XFsTestCase { services = new Services(); setSystemProps(); FileSystem fs = getFileSystem(); - Path basePath = new Path(services.getConf().get(WorkflowAppService.SYSTEM_LIB_PATH)); + Path basePath = new Path(services.get(ConfigurationService.class).getConf() + .get(WorkflowAppService.SYSTEM_LIB_PATH)); // Use timedstamped directory if available Date time = new Date(System.currentTimeMillis()); - Path libpath = new Path(basePath, ShareLibService.SHARED_LIB_PREFIX + ShareLibService.dateFormat.format(time)); + Path libpath = new Path(basePath, ShareLibService.SHARE_LIB_PREFIX + ShareLibService.dateFormat.format(time)); fs.mkdirs(libpath); Path pigPath = new Path(libpath.toString() + Path.SEPARATOR + "pig"); @@ -458,16 +457,17 @@ public class TestShareLibService extends XFsTestCase { public void testShareLib() throws Exception { services = new Services(); setSystemProps(); - Configuration conf = services.getConf(); + Configuration conf = services.get(ConfigurationService.class).getConf(); conf.set(ShareLibService.SHIP_LAUNCHER_JAR, "true"); FileSystem fs = getFileSystem(); String dir1 = dt.format(new Date(System.currentTimeMillis() - TimeUnit.MILLISECONDS.convert(1, TimeUnit.DAYS))); String dir2 = dt.format(new Date(System.currentTimeMillis() - TimeUnit.MILLISECONDS.convert(2, TimeUnit.DAYS))); String dir3 = dt.format(new Date(System.currentTimeMillis() - TimeUnit.MILLISECONDS.convert(3, TimeUnit.DAYS))); - Path basePath = new Path(services.getConf().get(WorkflowAppService.SYSTEM_LIB_PATH)); - Path path1 = new Path(basePath, ShareLibService.SHARED_LIB_PREFIX + dir1); - Path path2 = new Path(basePath, ShareLibService.SHARED_LIB_PREFIX + dir2); - Path path3 = new Path(basePath, ShareLibService.SHARED_LIB_PREFIX + dir3); + Path basePath = new Path(services.get(ConfigurationService.class).getConf() + .get(WorkflowAppService.SYSTEM_LIB_PATH)); + Path path1 = new Path(basePath, ShareLibService.SHARE_LIB_PREFIX + dir1); + Path path2 = new Path(basePath, ShareLibService.SHARE_LIB_PREFIX + dir2); + Path path3 = new Path(basePath, ShareLibService.SHARE_LIB_PREFIX + dir3); createDirs(fs, path1, path2, path3); createFile(path1.toString() + Path.SEPARATOR + "pig" + Path.SEPARATOR + "pig.jar"); try { @@ -486,7 +486,7 @@ public class TestShareLibService extends XFsTestCase { FileSystem fs = getFileSystem(); setSystemProps(); createTestShareLibMetaFile(fs); - Configuration conf = services.getConf(); + Configuration conf = services.get(ConfigurationService.class).getConf(); conf.set(ShareLibService.SHARELIB_MAPPING_FILE, fs.getUri() + "/user/test/config.properties"); conf.set(ShareLibService.SHIP_LAUNCHER_JAR, "true"); try { @@ -510,8 +510,9 @@ public class TestShareLibService extends XFsTestCase { FileSystem fs = getFileSystem(); Date time = new Date(System.currentTimeMillis()); - Path basePath = new Path(services.getConf().get(WorkflowAppService.SYSTEM_LIB_PATH)); - Path libpath = new Path(basePath, ShareLibService.SHARED_LIB_PREFIX + Path basePath = new Path(services.get(ConfigurationService.class).getConf() + .get(WorkflowAppService.SYSTEM_LIB_PATH)); + Path libpath = new Path(basePath, ShareLibService.SHARE_LIB_PREFIX + ShareLibService.dateFormat.format(time)); fs.mkdirs(libpath); @@ -543,18 +544,9 @@ public class TestShareLibService extends XFsTestCase { Configuration jobConf = ae.createBaseHadoopConf(context, eActionXml); jobConf.set("oozie.action.sharelib.for.pig", "pig_10"); ae.setLibFilesArchives(context, eActionXml, new Path("hdfs://dummyAppPath"), jobConf); - URI[] cacheFiles = DistributedCache.getCacheFiles(jobConf); - String cacheFilesStr = Arrays.toString(cacheFiles); - assertTrue(cacheFilesStr.contains("pig-10.jar")); - assertTrue(cacheFilesStr.contains("oozie_luncher.jar")); - // Hadoop 2 has two extra jars - if (cacheFiles.length == 4) { - assertTrue(cacheFilesStr.contains("MRAppJar.jar")); - assertTrue(cacheFilesStr.contains("hadoop-mapreduce-client-jobclient-")); - } - else { - assertEquals(2, cacheFiles.length); - } + + verifyFilesInDistributedCache(DistributedCache.getCacheFiles(jobConf), "pig-10.jar", "oozie_luncher.jar"); + } finally { services.destroy(); @@ -567,7 +559,7 @@ public class TestShareLibService extends XFsTestCase { services = new Services(); createTestShareLibMetaFile_multipleFile(fs); setSystemProps(); - Configuration conf = services.getConf(); + Configuration conf = services.get(ConfigurationService.class).getConf(); conf.set(ShareLibService.SHARELIB_MAPPING_FILE, fs.getUri() + "/user/test/config.properties"); conf.set(ShareLibService.SHIP_LAUNCHER_JAR, "true"); try { @@ -586,13 +578,14 @@ public class TestShareLibService extends XFsTestCase { public void testMultipleLauncherCall() throws Exception { services = new Services(); setSystemProps(); - Configuration conf = services.getConf(); + Configuration conf = services.get(ConfigurationService.class).getConf(); conf.set(ShareLibService.SHIP_LAUNCHER_JAR, "true"); try { FileSystem fs = getFileSystem(); Date time = new Date(System.currentTimeMillis()); - Path basePath = new Path(services.getConf().get(WorkflowAppService.SYSTEM_LIB_PATH)); - Path libpath = new Path(basePath, ShareLibService.SHARED_LIB_PREFIX + Path basePath = new Path(services.get(ConfigurationService.class).getConf() + .get(WorkflowAppService.SYSTEM_LIB_PATH)); + Path libpath = new Path(basePath, ShareLibService.SHARE_LIB_PREFIX + ShareLibService.dateFormat.format(time)); fs.mkdirs(libpath); Path ooziePath = new Path(libpath.toString() + Path.SEPARATOR + "oozie"); @@ -600,7 +593,6 @@ public class TestShareLibService extends XFsTestCase { createFile(libpath.toString() + Path.SEPARATOR + "oozie" + Path.SEPARATOR + "oozie_luncher.jar"); services.init(); ShareLibService shareLibService = Services.get().get(ShareLibService.class); - shareLibService.init(services); List<Path> launcherPath = shareLibService.getSystemLibJars(JavaActionExecutor.OOZIE_COMMON_LIBDIR); assertEquals(launcherPath.size(), 2); launcherPath = shareLibService.getSystemLibJars(JavaActionExecutor.OOZIE_COMMON_LIBDIR); @@ -620,7 +612,7 @@ public class TestShareLibService extends XFsTestCase { services = new Services(); setSystemProps(); - Configuration conf = services.getConf(); + Configuration conf = services.get(ConfigurationService.class).getConf(); conf.set(ShareLibService.SHIP_LAUNCHER_JAR, "true"); services.init(); FileSystem fs = getFileSystem(); @@ -631,7 +623,13 @@ public class TestShareLibService extends XFsTestCase { Path basePath = new Path(testPath + Path.SEPARATOR + "testPath"); Path basePath1 = new Path(testPath + Path.SEPARATOR + "testPath1"); + Path hive_site = new Path(basePath.toString() + Path.SEPARATOR + "hive_conf" + Path.SEPARATOR + + "hive-site.xml"); + Path hive_site1 = new Path(basePath.toString() + Path.SEPARATOR + "hive_conf" + Path.SEPARATOR + + "hive-site1.xml"); Path symlink = new Path("symlink/"); + Path symlink_hive_site = new Path("symlink/hive_conf" + Path.SEPARATOR + "hive-site.xml"); + fs.mkdirs(basePath); createFile(basePath.toString() + Path.SEPARATOR + "pig" + Path.SEPARATOR + "pig.jar"); @@ -641,10 +639,15 @@ public class TestShareLibService extends XFsTestCase { createFile(basePath1.toString() + Path.SEPARATOR + "pig" + Path.SEPARATOR + "pig_3.jar"); createFile(basePath1.toString() + Path.SEPARATOR + "pig" + Path.SEPARATOR + "pig_4.jar"); + createFile(hive_site.toString()); + HadoopShims fileSystem = new HadoopShims(fs); fileSystem.createSymlink(basePath, symlink, true); + fileSystem.createSymlink(hive_site, symlink_hive_site, true); prop.put(ShareLibService.SHARE_LIB_CONF_PREFIX + ".pig", "/user/test/" + symlink.toString()); + prop.put(ShareLibService.SHARE_LIB_CONF_PREFIX + ".hive_conf", "/user/test/" + symlink_hive_site.toString() + + "#hive-site.xml"); createTestShareLibMetaFile(fs, prop); assertEquals(fileSystem.isSymlink(symlink), true); @@ -652,13 +655,19 @@ public class TestShareLibService extends XFsTestCase { conf.set(ShareLibService.SHIP_LAUNCHER_JAR, "true"); try { ShareLibService shareLibService = Services.get().get(ShareLibService.class); - shareLibService.init(services); assertEquals(shareLibService.getShareLibJars("pig").size(), 2); + assertEquals(shareLibService.getShareLibJars("hive_conf").size(), 1); new HadoopShims(fs).createSymlink(basePath1, symlink, true); + new HadoopShims(fs).createSymlink(hive_site1, symlink_hive_site, true); + assertEquals(new HadoopShims(fs).getSymLinkTarget(shareLibService.getShareLibJars("hive_conf").get(0)), + hive_site1); assertEquals(shareLibService.getShareLibJars("pig").size(), 3); } finally { fs.delete(new Path("shareLibPath/"), true); + fs.delete(new Path(metaFile), true); + fs.delete(new Path("/user/test/config.properties"), true); + fs.delete(symlink, true); services.destroy(); } @@ -668,10 +677,74 @@ public class TestShareLibService extends XFsTestCase { } } - private void createFile(String filename) throws IOException { - Path path = new Path(filename); - FSDataOutputStream out = getFileSystem().create(path); - out.close(); + @Test + public void testDuplicateJarsInDistributedCache() throws Exception { + services = new Services(); + setSystemProps(); + FileSystem fs = getFileSystem(); + Path basePath = new Path(services.get(ConfigurationService.class).getConf() + .get(WorkflowAppService.SYSTEM_LIB_PATH)); + Configuration conf = services.get(ConfigurationService.class).getConf(); + conf.set(ShareLibService.SHIP_LAUNCHER_JAR, "true"); + + // Use timedstamped directory if available + Date time = new Date(System.currentTimeMillis()); + Path libpath = new Path(basePath, ShareLibService.SHARE_LIB_PREFIX + ShareLibService.dateFormat.format(time)); + + Path pigPath = new Path(libpath.toString() + Path.SEPARATOR + "pig"); + createDirs(fs, pigPath, new Path(pigPath, "temp")); + createFile(new Path(pigPath, "pig.jar")); + createFile(new Path(pigPath, "hive.jar")); + createFile(new Path(new Path(pigPath, "temp"), "pig.jar#pig.jar")); + + try { + + // DistributedCache should have only one pig jar + verifyFilesInDistributedCache(setUpPigJob(true), "pig.jar", "hive.jar", "MyOozie.jar", "MyPig.jar"); + ShareLibService shareLibService = services.get(ShareLibService.class); + // sharelib service should have two jars + List<Path> shareLib = shareLibService.getShareLibJars("pig"); + assertEquals(shareLib.size(), 3); + assertTrue(shareLib.toString().contains("pig.jar#pig.jar")); + assertTrue(shareLib.toString().contains("hive.jar")); + } + finally { + services.destroy(); + } + } + + private URI[] setUpPigJob(boolean useSystemSharelib) throws Exception { + services.init(); + String actionXml = "<pig>" + "<job-tracker>" + getJobTrackerUri() + "</job-tracker>" + "<name-node>" + + getNameNodeUri() + "</name-node></pig>"; + Element eActionXml = XmlUtils.parseXml(actionXml); + XConfiguration protoConf = new XConfiguration(); + protoConf.set(WorkflowAppService.HADOOP_USER, getTestUser()); + WorkflowJobBean wfj = new WorkflowJobBean(); + protoConf.setBoolean(OozieClient.USE_SYSTEM_LIBPATH, useSystemSharelib); + wfj.setProtoActionConf(XmlUtils.prettyPrint(protoConf).toString()); + wfj.setConf(XmlUtils.prettyPrint(protoConf).toString()); + + Context context = new TestJavaActionExecutor().new Context(wfj, new WorkflowActionBean()); + PigActionExecutor ae = new PigActionExecutor(); + Configuration jobConf = ae.createBaseHadoopConf(context, eActionXml); + jobConf.set("oozie.action.sharelib.for.pig", "pig"); + ae.setLibFilesArchives(context, eActionXml, new Path("hdfs://dummyAppPath"), jobConf); + return DistributedCache.getCacheFiles(jobConf); + } + + private void createFile(String... filenames) throws IOException { + for (String filename : filenames) { + Path path = new Path(filename); + createFile(path); + } + } + + private void createFile(Path... paths) throws IOException { + for (Path path : paths) { + FSDataOutputStream out = getFileSystem().create(path); + out.close(); + } } private void createTestShareLibMetaFile(FileSystem fs) { @@ -702,7 +775,7 @@ public class TestShareLibService extends XFsTestCase { private void createTestShareLibMetaFile(FileSystem fs, Properties prop) { try { - FSDataOutputStream out = fs.create(new Path("/user/test/config.properties")); + FSDataOutputStream out = fs.create(new Path(metaFile)); prop.store(out, null); out.close(); @@ -731,7 +804,7 @@ public class TestShareLibService extends XFsTestCase { + Path.SEPARATOR + "pig" + Path.SEPARATOR + "pig.jar," + "/user/test/" + somethingNew.toString() + Path.SEPARATOR + "somethingNew" + Path.SEPARATOR + "somethingNew.jar"); - FSDataOutputStream out = fs.create(new Path("/user/test/config.properties")); + FSDataOutputStream out = fs.create(new Path(metaFile)); prop.store(out, null); out.close(); @@ -748,4 +821,152 @@ public class TestShareLibService extends XFsTestCase { } } + @Test + public void testConfFileAddedToActionConf() throws Exception { + + try { + + XConfiguration protoConf = new XConfiguration(); + protoConf.set(WorkflowAppService.HADOOP_USER, getTestUser()); + WorkflowJobBean wfj = new WorkflowJobBean(); + protoConf.setBoolean(OozieClient.USE_SYSTEM_LIBPATH, true); + wfj.setProtoActionConf(XmlUtils.prettyPrint(protoConf).toString()); + wfj.setConf(XmlUtils.prettyPrint(protoConf).toString()); + + Context context = new TestJavaActionExecutor().new Context(wfj, new WorkflowActionBean()); + + // Test hive-site.xml in sharelib cache + setupSharelibConf("hive-site.xml", "oozie.hive_conf"); + ShareLibService shareLibService = services.get(ShareLibService.class); + assertEquals(shareLibService.getShareLibConfigMap().get("hive_conf").values().size(), 1); + assertEquals( + shareLibService.getShareLibConfigMap().get("hive_conf").keySet().toArray(new Path[] {})[0] + .getName(), + "hive-site.xml"); + + // Test hive-site.xml not in distributed cache + setupSharelibConf("hive-site.xml", "oozie.hive_conf"); + String actionXml = "<hive>" + "<job-tracker>" + getJobTrackerUri() + "</job-tracker>" + "<name-node>" + + getNameNodeUri() + "</name-node>" + "<script>test</script>" + "</hive>"; + Element eActionXml = XmlUtils.parseXml(actionXml); + + HiveActionExecutor ae = new HiveActionExecutor(); + Configuration jobConf = ae.createBaseHadoopConf(context, eActionXml); + + Configuration actionConf = ae.createBaseHadoopConf(context, eActionXml); + jobConf.set("oozie.action.sharelib.for.hive", "hive_conf"); + ae.setLibFilesArchives(context, eActionXml, new Path("hdfs://dummyAppPath"), jobConf); + URI[] cacheFiles = DistributedCache.getCacheFiles(actionConf); + String cacheFilesStr = Arrays.toString(cacheFiles); + assertFalse(cacheFilesStr.contains("hive-site.xml")); + + // Test hive-site.xml property in jobconf with linkname + jobConf = ae.createBaseHadoopConf(context, eActionXml); + Properties prop = new Properties(); + actionConf = ae.createBaseHadoopConf(context, eActionXml); + prop.put("oozie.hive_conf", "/user/test/" + sharelibPath + "/hive-site.xml#hive-site.xml"); + setupSharelibConf("hive-site.xml", "oozie.hive_conf", prop); + jobConf.set("oozie.action.sharelib.for.hive", "hive_conf"); + ae.setLibFilesArchives(context, eActionXml, new Path("hdfs://dummyAppPath"), jobConf); + assertEquals(jobConf.get("oozie.hive_conf-sharelib-test"), "test"); + + } + finally { + getFileSystem().delete(new Path(sharelibPath), true); + services.destroy(); + } + } + + @Test + public void testConfFileAddedToDistributedCache() throws Exception { + try { + + Properties prop = new Properties(); + prop.put("oozie.hive_conf", "/user/test/" + sharelibPath + "/hive-site.xml#hive-site.xml"); + setupSharelibConf("hive-site.xml", "oozie.hive_conf", prop); + + String actionXml = "<pig>" + "<job-tracker>" + getJobTrackerUri() + "</job-tracker>" + "<name-node>" + + getNameNodeUri() + "</name-node>" + "<script>test</script>" + "</pig>"; + Element eActionXml = XmlUtils.parseXml(actionXml); + XConfiguration protoConf = new XConfiguration(); + protoConf.set(WorkflowAppService.HADOOP_USER, getTestUser()); + WorkflowJobBean wfj = new WorkflowJobBean(); + protoConf.setBoolean(OozieClient.USE_SYSTEM_LIBPATH, true); + wfj.setProtoActionConf(XmlUtils.prettyPrint(protoConf).toString()); + wfj.setConf(XmlUtils.prettyPrint(protoConf).toString()); + + Context context = new TestJavaActionExecutor().new Context(wfj, new WorkflowActionBean()); + + PigActionExecutor ae = new PigActionExecutor(); + Configuration jobConf = ae.createBaseHadoopConf(context, eActionXml); + jobConf.set("oozie.action.sharelib.for.pig", "hive_conf"); + ae.setLibFilesArchives(context, eActionXml, new Path("hdfs://dummyAppPath"), jobConf); + URI[] cacheFiles = DistributedCache.getCacheFiles(jobConf); + String cacheFilesStr = Arrays.toString(cacheFiles); + assertEquals(jobConf.get("oozie.hive_conf-sharelib-test"), null); + assertTrue(URLDecoder.decode(cacheFilesStr).contains("hive-site.xml#hive-site.xml")); + + setupSharelibConf("hbase-site.xml", "oozie.hbase_conf"); + jobConf = ae.createBaseHadoopConf(context, eActionXml); + jobConf.set("oozie.action.sharelib.for.pig", "hbase_conf"); + ae.setLibFilesArchives(context, eActionXml, new Path("hdfs://dummyAppPath"), jobConf); + cacheFiles = DistributedCache.getCacheFiles(jobConf); + cacheFilesStr = Arrays.toString(cacheFiles); + assertTrue(cacheFilesStr.contains("hbase-site.xml")); + + } + finally { + getFileSystem().delete(new Path(sharelibPath), true); + services.destroy(); + } + } + + private void setupSharelibConf(final String file, final String tag) throws ServiceException, IOException { + Properties prop = new Properties(); + prop.put(tag, "/user/test/" + sharelibPath); + setupSharelibConf(file, tag, prop); + + } + + private void setupSharelibConf(final String file, final String tag, Properties prop) throws IOException, + ServiceException { + services = new Services(); + setSystemProps(); + Configuration conf = services.get(ConfigurationService.class).getConf(); + + conf.set(ShareLibService.SHIP_LAUNCHER_JAR, "true"); + conf.set(ShareLibService.SHARELIB_MAPPING_FILE, getFileSystem().getUri() + "/user/test/config.properties"); + + XConfiguration hiveConf = new XConfiguration(); + hiveConf.set(tag + "-sharelib-test", "test"); + createDirs(getFileSystem(), new Path(sharelibPath)); + FSDataOutputStream out = getFileSystem().create(new Path(sharelibPath, file)); + PrintWriter bufOut = new PrintWriter(out); + bufOut.write(hiveConf.toXmlString(false)); + bufOut.close(); + createTestShareLibMetaFile(getFileSystem(), prop); + + services.init(); + } + + private void verifyFilesInDistributedCache(URI[] cacheFiles, String... files) { + + String cacheFilesStr = Arrays.toString(cacheFiles); + if (new HadoopShims(getFileSystem()).isYARN()) { + // Hadoop 2 has two extra jars + assertEquals(cacheFiles.length, files.length + 2); + assertTrue(cacheFilesStr.contains("MRAppJar.jar")); + assertTrue(cacheFilesStr.contains("hadoop-mapreduce-client-jobclient-")); + + } + else { + assertEquals(cacheFiles.length, files.length); + } + for (String file : files) { + assertTrue(cacheFilesStr.contains(file)); + + } + + } + } http://git-wip-us.apache.org/repos/asf/oozie/blob/87a6d053/docs/src/site/twiki/WorkflowFunctionalSpec.twiki ---------------------------------------------------------------------- diff --git a/docs/src/site/twiki/WorkflowFunctionalSpec.twiki b/docs/src/site/twiki/WorkflowFunctionalSpec.twiki index 21199e8..f9ec18c 100644 --- a/docs/src/site/twiki/WorkflowFunctionalSpec.twiki +++ b/docs/src/site/twiki/WorkflowFunctionalSpec.twiki @@ -2482,6 +2482,8 @@ classpath/libpath for all its actions. A workflow job can specify a share library path using the job property =oozie.libpath=. A workflow job can use the system share library by setting the job property =oozie.use.system.libpath= to =true=. +=oozie.use.system.libpath= can be also configured at action configuration. +=oozie.use.system.libpath= defined at action level overrides job property. ---+++ 17.1 Action Share Library Override (since Oozie 3.3) http://git-wip-us.apache.org/repos/asf/oozie/blob/87a6d053/hadooplibs/hadoop-utils-0.23/src/main/java/org/apache/oozie/hadoop/utils/HadoopShims.java ---------------------------------------------------------------------- diff --git a/hadooplibs/hadoop-utils-0.23/src/main/java/org/apache/oozie/hadoop/utils/HadoopShims.java b/hadooplibs/hadoop-utils-0.23/src/main/java/org/apache/oozie/hadoop/utils/HadoopShims.java index 9a19770..559ce8e 100644 --- a/hadooplibs/hadoop-utils-0.23/src/main/java/org/apache/oozie/hadoop/utils/HadoopShims.java +++ b/hadooplibs/hadoop-utils-0.23/src/main/java/org/apache/oozie/hadoop/utils/HadoopShims.java @@ -44,4 +44,8 @@ public class HadoopShims { public void createSymlink(Path target, Path link, boolean createParent) throws IOException { } + public boolean isYARN() { + return false; + } + } http://git-wip-us.apache.org/repos/asf/oozie/blob/87a6d053/hadooplibs/hadoop-utils-1/src/main/java/org/apache/oozie/hadoop/utils/HadoopShims.java ---------------------------------------------------------------------- diff --git a/hadooplibs/hadoop-utils-1/src/main/java/org/apache/oozie/hadoop/utils/HadoopShims.java b/hadooplibs/hadoop-utils-1/src/main/java/org/apache/oozie/hadoop/utils/HadoopShims.java index bea4781..559ce8e 100644 --- a/hadooplibs/hadoop-utils-1/src/main/java/org/apache/oozie/hadoop/utils/HadoopShims.java +++ b/hadooplibs/hadoop-utils-1/src/main/java/org/apache/oozie/hadoop/utils/HadoopShims.java @@ -22,7 +22,6 @@ import org.apache.hadoop.fs.FileSystem; import org.apache.hadoop.fs.Path; import java.io.IOException; - public class HadoopShims { FileSystem fs; @@ -45,4 +44,8 @@ public class HadoopShims { public void createSymlink(Path target, Path link, boolean createParent) throws IOException { } + public boolean isYARN() { + return false; + } + } http://git-wip-us.apache.org/repos/asf/oozie/blob/87a6d053/hadooplibs/hadoop-utils-2/src/main/java/org/apache/oozie/hadoop/utils/HadoopShims.java ---------------------------------------------------------------------- diff --git a/hadooplibs/hadoop-utils-2/src/main/java/org/apache/oozie/hadoop/utils/HadoopShims.java b/hadooplibs/hadoop-utils-2/src/main/java/org/apache/oozie/hadoop/utils/HadoopShims.java index acebd60..d7b4051 100644 --- a/hadooplibs/hadoop-utils-2/src/main/java/org/apache/oozie/hadoop/utils/HadoopShims.java +++ b/hadooplibs/hadoop-utils-2/src/main/java/org/apache/oozie/hadoop/utils/HadoopShims.java @@ -20,7 +20,9 @@ package org.apache.oozie.hadoop.utils; import org.apache.hadoop.fs.FileSystem; import org.apache.hadoop.fs.Path; + import java.io.IOException; +import java.net.URI; public class HadoopShims { FileSystem fs; @@ -34,15 +36,33 @@ public class HadoopShims { } public Path getSymLinkTarget(Path p) throws IOException { - return fs.getFileLinkStatus(p).getSymlink(); + try { + //getSymlink doesn't work with fragment name, need to remove fragment before calling getSymlink + Path tempPath = new URI(p.toString()).getFragment() == null ? p : new Path(new URI(p.toString()).getPath()); + return fs.getFileLinkStatus(tempPath).getSymlink(); + } + catch (java.net.URISyntaxException e) { + throw new IOException(e); + } } public boolean isSymlink(Path p) throws IOException { - return fs.getFileLinkStatus(p).isSymlink(); + try { + //isSymlink doesn't work with fragment name, need to remove fragment before checking for symlink + Path tempPath = new URI(p.toString()).getFragment() == null ? p : new Path(new URI(p.toString()).getPath()); + return fs.getFileLinkStatus(tempPath).isSymlink(); + } + catch (java.net.URISyntaxException e) { + throw new IOException(e); + } } public void createSymlink(Path target, Path link, boolean createParent) throws IOException { fs.createSymlink(target, link, createParent); } + public boolean isYARN() { + return true; + } + }
