Repository: falcon Updated Branches: refs/heads/master 237bab6ed -> 54402764c
FALCON-887 Support for multiple lib paths in falcon process Author: Sowmya Ramesh <[email protected]> Reviewers: Peeyush Bishnoi <[email protected]>, Pavan Kumar<[email protected]> Closes #38 from sowmyaramesh/FALCON-887 and squashes the following commits: 95865df [Sowmya Ramesh] Moved getting lib path to private method 3209a7e [Sowmya Ramesh] Applied feedback aa8f6a2 [Sowmya Ramesh] FALCON-887 Fixed checkstyle errors ad4d455 [Sowmya Ramesh] FALCON-887 Support for multiple lib paths in falcon process Project: http://git-wip-us.apache.org/repos/asf/falcon/repo Commit: http://git-wip-us.apache.org/repos/asf/falcon/commit/54402764 Tree: http://git-wip-us.apache.org/repos/asf/falcon/tree/54402764 Diff: http://git-wip-us.apache.org/repos/asf/falcon/diff/54402764 Branch: refs/heads/master Commit: 54402764c124cd12645e24fa2d3cf2283f4c13bd Parents: 237bab6 Author: Sowmya Ramesh <[email protected]> Authored: Thu Feb 18 14:25:03 2016 +0530 Committer: peeyush b <[email protected]> Committed: Thu Feb 18 14:25:03 2016 +0530 ---------------------------------------------------------------------- CHANGES.txt | 2 + .../org/apache/falcon/entity/EntityUtil.java | 1 + .../entity/parser/ProcessEntityParser.java | 9 +++- docs/src/site/twiki/EntitySpecification.twiki | 6 ++- .../apache/falcon/oozie/OozieBundleBuilder.java | 26 +++++++++-- .../ProcessExecutionWorkflowBuilder.java | 49 +++++++++++--------- 6 files changed, 63 insertions(+), 30 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/falcon/blob/54402764/CHANGES.txt ---------------------------------------------------------------------- diff --git a/CHANGES.txt b/CHANGES.txt index 297e508..6861519 100755 --- a/CHANGES.txt +++ b/CHANGES.txt @@ -32,6 +32,8 @@ Trunk FALCON-1770 Update README file (Ajay Yadava) BUG FIXES + FALCON-887 Support for multiple lib paths in falcon process (Sowmya Ramesh) + FALCON-1795 Kill api not killing waiting/ready instances FALCON-1804 Non-SLA feed throws NullPointerException. http://git-wip-us.apache.org/repos/asf/falcon/blob/54402764/common/src/main/java/org/apache/falcon/entity/EntityUtil.java ---------------------------------------------------------------------- diff --git a/common/src/main/java/org/apache/falcon/entity/EntityUtil.java b/common/src/main/java/org/apache/falcon/entity/EntityUtil.java index 24dbf3d..9c03de3 100644 --- a/common/src/main/java/org/apache/falcon/entity/EntityUtil.java +++ b/common/src/main/java/org/apache/falcon/entity/EntityUtil.java @@ -89,6 +89,7 @@ public final class EntityUtil { public static final String MR_JOB_PRIORITY = "jobPriority"; public static final String SUCCEEDED_FILE_NAME = "_SUCCESS"; + public static final String WF_LIB_SEPARATOR = ","; private static final String STAGING_DIR_NAME_SEPARATOR = "_"; /** Priority with which the DAG will be scheduled. http://git-wip-us.apache.org/repos/asf/falcon/blob/54402764/common/src/main/java/org/apache/falcon/entity/parser/ProcessEntityParser.java ---------------------------------------------------------------------- diff --git a/common/src/main/java/org/apache/falcon/entity/parser/ProcessEntityParser.java b/common/src/main/java/org/apache/falcon/entity/parser/ProcessEntityParser.java index 10a5265..eec6f69 100644 --- a/common/src/main/java/org/apache/falcon/entity/parser/ProcessEntityParser.java +++ b/common/src/main/java/org/apache/falcon/entity/parser/ProcessEntityParser.java @@ -173,8 +173,13 @@ public class ProcessEntityParser extends EntityParser<Process> { "Workflow path: " + workflowPath + " does not exists in HDFS: " + nameNode); } - if (StringUtils.isNotEmpty(libPath) && !fs.exists(new Path(libPath))) { - throw new ValidationException("Lib path: " + libPath + " does not exists in HDFS: " + nameNode); + if (StringUtils.isNotBlank(libPath)) { + String[] libPaths = libPath.split(EntityUtil.WF_LIB_SEPARATOR); + for (String path : libPaths) { + if (!fs.exists(new Path(path))) { + throw new ValidationException("Lib path: " + path + " does not exists in HDFS: " + nameNode); + } + } } } catch (IOException e) { throw new FalconException("Error validating workflow path " + workflowPath, e); http://git-wip-us.apache.org/repos/asf/falcon/blob/54402764/docs/src/site/twiki/EntitySpecification.twiki ---------------------------------------------------------------------- diff --git a/docs/src/site/twiki/EntitySpecification.twiki b/docs/src/site/twiki/EntitySpecification.twiki index 6f24d8f..b3d80e2 100644 --- a/docs/src/site/twiki/EntitySpecification.twiki +++ b/docs/src/site/twiki/EntitySpecification.twiki @@ -821,6 +821,7 @@ The following are some special properties, which when present are used by the Fa ---+++ Workflow The workflow defines the workflow engine that should be used and the path to the workflow on hdfs. +Libraries required can be specified using lib attribute in the workflow element and will be comma separated HDFS paths. The workflow definition on hdfs contains the actual job that should run and it should confirm to the workflow specification of the engine specified. The libraries required by the workflow should be in lib folder inside the workflow path. @@ -840,7 +841,7 @@ Syntax: <verbatim> <process name="[process name]"> ... - <workflow engine=[workflow engine] path=[workflow path]/> + <workflow engine=[workflow engine] path=[workflow path] lib=[comma separated lib paths]/> ... </process> </verbatim> @@ -856,6 +857,7 @@ Example: This defines the workflow engine to be oozie and the workflow xml is defined at /projects/bootcamp/workflow/workflow.xml. The libraries are at /projects/bootcamp/workflow/lib. +Libraries path can be overridden using lib attribute. e.g.: lib="/projects/bootcamp/wf/libs,/projects/bootcamp/oozie/libs" in the workflow element. ---++++ Pig @@ -865,7 +867,7 @@ Example: <verbatim> <process name="sample-process"> ... - <workflow engine="pig" path="/projects/bootcamp/pig.script"/> + <workflow engine="pig" path="/projects/bootcamp/pig.script" lib="/projects/bootcamp/wf/libs,/projects/bootcamp/pig/libs"/> ... </process> </verbatim> http://git-wip-us.apache.org/repos/asf/falcon/blob/54402764/oozie/src/main/java/org/apache/falcon/oozie/OozieBundleBuilder.java ---------------------------------------------------------------------- diff --git a/oozie/src/main/java/org/apache/falcon/oozie/OozieBundleBuilder.java b/oozie/src/main/java/org/apache/falcon/oozie/OozieBundleBuilder.java index af3f44d..5f93cc2 100644 --- a/oozie/src/main/java/org/apache/falcon/oozie/OozieBundleBuilder.java +++ b/oozie/src/main/java/org/apache/falcon/oozie/OozieBundleBuilder.java @@ -18,6 +18,7 @@ package org.apache.falcon.oozie; +import org.apache.commons.lang3.StringUtils; import org.apache.falcon.FalconException; import org.apache.falcon.entity.ClusterHelper; import org.apache.falcon.entity.EntityUtil; @@ -106,14 +107,33 @@ public abstract class OozieBundleBuilder<T extends Entity> extends OozieEntityBu properties.setProperty(AbstractWorkflowEngine.NAME_NODE, ClusterHelper.getStorageUrl(cluster)); //Add libpath - String libPath = getLibPath(buildPath); - if (libPath != null) { - properties.put(OozieClient.LIBPATH, getStoragePath(libPath)); + String libPath = getOozieLibPath(buildPath); + if (StringUtils.isNotBlank(libPath)) { + properties.put(OozieClient.LIBPATH, libPath); } return properties; } + private String getOozieLibPath(final Path buildPath) { + String path = getLibPath(buildPath); + if (StringUtils.isBlank(path)) { + return null; + } + StringBuilder storageLibPaths = new StringBuilder(); + String[] libPaths = path.split(EntityUtil.WF_LIB_SEPARATOR); + for (String libPath : libPaths) { + String tempPath = getStoragePath(libPath); + if (StringUtils.isNotBlank(tempPath)) { + if (StringUtils.isNotBlank(storageLibPaths)) { + storageLibPaths.append(EntityUtil.WF_LIB_SEPARATOR); + } + storageLibPaths.append(tempPath); + } + } + return StringUtils.isBlank(storageLibPaths) ? null : storageLibPaths.toString(); + } + protected CONFIGURATION getConfig(Properties props) { CONFIGURATION conf = new CONFIGURATION(); for (Entry<Object, Object> prop : props.entrySet()) { http://git-wip-us.apache.org/repos/asf/falcon/blob/54402764/oozie/src/main/java/org/apache/falcon/oozie/process/ProcessExecutionWorkflowBuilder.java ---------------------------------------------------------------------- diff --git a/oozie/src/main/java/org/apache/falcon/oozie/process/ProcessExecutionWorkflowBuilder.java b/oozie/src/main/java/org/apache/falcon/oozie/process/ProcessExecutionWorkflowBuilder.java index dc9349f..7d5b331 100644 --- a/oozie/src/main/java/org/apache/falcon/oozie/process/ProcessExecutionWorkflowBuilder.java +++ b/oozie/src/main/java/org/apache/falcon/oozie/process/ProcessExecutionWorkflowBuilder.java @@ -238,36 +238,39 @@ public abstract class ProcessExecutionWorkflowBuilder extends OozieOrchestration protected void addArchiveForCustomJars(Cluster cluster, List<String> archiveList, String lib) throws FalconException { - if (StringUtils.isEmpty(lib)) { + if (StringUtils.isBlank(lib)) { return; } - Path libPath = new Path(lib); - try { - final FileSystem fs = HadoopClientFactory.get().createProxiedFileSystem( - ClusterHelper.getConfiguration(cluster)); - if (fs.isFile(libPath)) { // File, not a Dir - archiveList.add(libPath.toString()); - return; - } + String[] libPaths = lib.split(EntityUtil.WF_LIB_SEPARATOR); + for (String path : libPaths) { + Path libPath = new Path(path); + try { + final FileSystem fs = HadoopClientFactory.get().createProxiedFileSystem( + ClusterHelper.getConfiguration(cluster)); + if (fs.isFile(libPath)) { // File, not a Dir + archiveList.add(libPath.toString()); + return; + } - // lib path is a directory, add each file under the lib dir to archive - final FileStatus[] fileStatuses = fs.listStatus(libPath, new PathFilter() { - @Override - public boolean accept(Path path) { - try { - return fs.isFile(path) && path.getName().endsWith(".jar"); - } catch (IOException ignore) { - return false; + // lib path is a directory, add each file under the lib dir to archive + final FileStatus[] fileStatuses = fs.listStatus(libPath, new PathFilter() { + @Override + public boolean accept(Path path) { + try { + return fs.isFile(path) && path.getName().endsWith(".jar"); + } catch (IOException ignore) { + return false; + } } - } - }); + }); - for (FileStatus fileStatus : fileStatuses) { - archiveList.add(fileStatus.getPath().toString()); + for (FileStatus fileStatus : fileStatuses) { + archiveList.add(fileStatus.getPath().toString()); + } + } catch (IOException e) { + throw new FalconException("Error adding archive for custom jars under: " + libPath, e); } - } catch (IOException e) { - throw new FalconException("Error adding archive for custom jars under: " + libPath, e); } }
