Repository: incubator-falcon Updated Branches: refs/heads/master ea8c7c6be -> 80314b0d6
FALCON-787 FalconCLI - Submit recipe failed. Contributed by Sowmya Ramesh Project: http://git-wip-us.apache.org/repos/asf/incubator-falcon/repo Commit: http://git-wip-us.apache.org/repos/asf/incubator-falcon/commit/2a31462c Tree: http://git-wip-us.apache.org/repos/asf/incubator-falcon/tree/2a31462c Diff: http://git-wip-us.apache.org/repos/asf/incubator-falcon/diff/2a31462c Branch: refs/heads/master Commit: 2a31462c5672cff235bc821a465c9648d0045352 Parents: ea8c7c6 Author: Venkatesh Seetharam <venkat...@apache.org> Authored: Mon Oct 13 10:44:58 2014 -0700 Committer: Venkatesh Seetharam <venkat...@apache.org> Committed: Mon Oct 13 10:44:58 2014 -0700 ---------------------------------------------------------------------- CHANGES.txt | 3 + addons/recipes/hdfs-replication/README.txt | 29 +++++++ .../resources/hdfs-replication-template.xml | 3 +- .../resources/hdfs-replication-workflow.xml | 7 +- .../main/resources/hdfs-replication.properties | 14 ++-- .../org/apache/falcon/client/FalconClient.java | 16 ++-- .../org/apache/falcon/recipe/RecipeTool.java | 80 ++++++++++++++------ .../org/apache/falcon/entity/ProcessHelper.java | 5 +- .../entity/parser/ProcessEntityParser.java | 3 +- .../org/apache/falcon/update/UpdateHelper.java | 2 +- docs/src/site/twiki/recipes.twiki | 14 +++- .../oozie/process/ProcessBundleBuilder.java | 4 +- .../falcon/replication/FeedReplicator.java | 4 +- 13 files changed, 133 insertions(+), 51 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/incubator-falcon/blob/2a31462c/CHANGES.txt ---------------------------------------------------------------------- diff --git a/CHANGES.txt b/CHANGES.txt index 9f79151..1896b4e 100755 --- a/CHANGES.txt +++ b/CHANGES.txt @@ -115,6 +115,9 @@ Trunk (Unreleased) OPTIMIZATIONS BUG FIXES + FALCON-787 FalconCLI - Submit recipe failed (Sowmya Ramesh via + Venkatesh Seetharam) + FALCON-786 FalconAuditFilter - Arguments in wrong order (Venkatesh Seetharam) FALCON-789 Post processing is broken for Feeds (Sowmya Ramesh via http://git-wip-us.apache.org/repos/asf/incubator-falcon/blob/2a31462c/addons/recipes/hdfs-replication/README.txt ---------------------------------------------------------------------- diff --git a/addons/recipes/hdfs-replication/README.txt b/addons/recipes/hdfs-replication/README.txt new file mode 100644 index 0000000..5742d43 --- /dev/null +++ b/addons/recipes/hdfs-replication/README.txt @@ -0,0 +1,29 @@ +# Licensed to the Apache Software Foundation (ASF) under one +# or more contributor license agreements. See the NOTICE file +# distributed with this work for additional information +# regarding copyright ownership. The ASF licenses this file +# to you under the Apache License, Version 2.0 (the +# "License"); you may not use this file except in compliance +# with the License. You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, +# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +# See the License for the specific language governing permissions and +# limitations under the License. + +HDFS Directory Replication Recipe + +Overview +This recipe implements replicating arbitrary directories on HDFS from one +Hadoop cluster to another Hadoop cluster. +This piggy backs on replication solution in Falcon which uses the DistCp tool. + +Use Case +* Copy directories between HDFS clusters with out dated partitions +* Archive directories from HDFS to Cloud. Ex: S3, Azure WASB + +Limitations +As the data volume and number of files grow, this can get inefficient. http://git-wip-us.apache.org/repos/asf/incubator-falcon/blob/2a31462c/addons/recipes/hdfs-replication/src/main/resources/hdfs-replication-template.xml ---------------------------------------------------------------------- diff --git a/addons/recipes/hdfs-replication/src/main/resources/hdfs-replication-template.xml b/addons/recipes/hdfs-replication/src/main/resources/hdfs-replication-template.xml index 67c82db..824e6f5 100644 --- a/addons/recipes/hdfs-replication/src/main/resources/hdfs-replication-template.xml +++ b/addons/recipes/hdfs-replication/src/main/resources/hdfs-replication-template.xml @@ -26,6 +26,7 @@ </clusters> <parallel>1</parallel> + <!-- Dir replication needs to run only once to catch up --> <order>LAST_ONLY</order> <frequency>##process.frequency##</frequency> <timezone>UTC</timezone> @@ -40,6 +41,6 @@ <property name="##process.property7.name##" value="##process.property7.value##"/> </properties> - <workflow name="##workflow.name##" engine="oozie" path="##workflow.path##"/> + <workflow name="##workflow.name##" engine="oozie" path="##workflow.path##" lib="##workflow.lib.path##"/> <retry policy="periodic" delay="minutes(10)" attempts="3"/> </process> http://git-wip-us.apache.org/repos/asf/incubator-falcon/blob/2a31462c/addons/recipes/hdfs-replication/src/main/resources/hdfs-replication-workflow.xml ---------------------------------------------------------------------- diff --git a/addons/recipes/hdfs-replication/src/main/resources/hdfs-replication-workflow.xml b/addons/recipes/hdfs-replication/src/main/resources/hdfs-replication-workflow.xml index 6868011..841b9df 100644 --- a/addons/recipes/hdfs-replication/src/main/resources/hdfs-replication-workflow.xml +++ b/addons/recipes/hdfs-replication/src/main/resources/hdfs-replication-workflow.xml @@ -39,19 +39,24 @@ <name>oozie.use.system.libpath</name> <value>true</value> </property> + <property> + <name>oozie.action.sharelib.for.java</name> + <value>distcp</value> + </property> </configuration> <main-class>org.apache.falcon.replication.FeedReplicator</main-class> <arg>-Dmapred.job.queue.name=${queueName}</arg> <arg>-Dmapred.job.priority=${jobPriority}</arg> <arg>-maxMaps</arg> <arg>${maxMaps}</arg> + <arg>-mapBandwidthKB</arg> + <arg>${mapBandwidthKB}</arg> <arg>-sourcePaths</arg> <arg>${nameNode}${drSourceDir}</arg> <arg>-targetPath</arg> <arg>${drTargetClusterFS}${drTargetDir}</arg> <arg>-falconFeedStorageType</arg> <arg>FILESYSTEM</arg> - <file>${wf:conf("falcon.libpath")}/hadoop-distcp.jar</file> </java> <ok to="end"/> <error to="fail"/> http://git-wip-us.apache.org/repos/asf/incubator-falcon/blob/2a31462c/addons/recipes/hdfs-replication/src/main/resources/hdfs-replication.properties ---------------------------------------------------------------------- diff --git a/addons/recipes/hdfs-replication/src/main/resources/hdfs-replication.properties b/addons/recipes/hdfs-replication/src/main/resources/hdfs-replication.properties index 34d1843..29d6cab 100644 --- a/addons/recipes/hdfs-replication/src/main/resources/hdfs-replication.properties +++ b/addons/recipes/hdfs-replication/src/main/resources/hdfs-replication.properties @@ -16,6 +16,8 @@ # limitations under the License. # +##### NOTE: This is a TEMPLATE file which can be copied and edited + ##### Recipe properties falcon.recipe.name=hdfs-replication @@ -23,10 +25,10 @@ falcon.recipe.name=hdfs-replication ##### Workflow properties falcon.recipe.workflow.name=hdfs-dr-workflow -# If artifacts are present on local FS, provide paths here +# Provide Wf absolute path. This can be HDFS or local FS path. If WF is on local FS it will be copied to HDFS falcon.recipe.workflow.path=/recipes/hdfs-replication/hdfs-replication-workflow.xml -falcon.recipe.workflow.lib.path=/recipes/hdfs-replication/lib - +# Provide Wf lib absolute path. This can be HDFS or local FS path. If libs are on local FS it will be copied to HDFS +#falcon.recipe.workflow.lib.path=/recipes/hdfs-replication/lib ##### Cluster properties @@ -35,9 +37,9 @@ falcon.recipe.src.cluster.name=test # Change the src cluster hdfs write end point here. This is mandatory. falcon.recipe.src.cluster.hdfs.writeEndPoint=hdfs://sandbox.hortonworks.com:8020 # Change the src cluster validity start time here -falcon.recipe.src.cluster.validity.start=2012-04-20T00:00Z +falcon.recipe.src.cluster.validity.start=2014-10-01T00:00Z # Change the src cluster validity end time here -falcon.recipe.src.cluster.validity.end=2014-04-20T00:00Z +falcon.recipe.src.cluster.validity.end=2016-12-30T00:00Z ##### Scheduling properties @@ -59,5 +61,5 @@ falcon.recipe.process.property5.name=drTargetCluster falcon.recipe.process.property5.value=backupCluster falcon.recipe.process.property6.name=maxMaps falcon.recipe.process.property6.value=5 -falcon.recipe.process.property7.name=mapBandwidth +falcon.recipe.process.property7.name=mapBandwidthKB falcon.recipe.process.property7.value=102400 http://git-wip-us.apache.org/repos/asf/incubator-falcon/blob/2a31462c/client/src/main/java/org/apache/falcon/client/FalconClient.java ---------------------------------------------------------------------- diff --git a/client/src/main/java/org/apache/falcon/client/FalconClient.java b/client/src/main/java/org/apache/falcon/client/FalconClient.java index d73560d..7e46f28 100644 --- a/client/src/main/java/org/apache/falcon/client/FalconClient.java +++ b/client/src/main/java/org/apache/falcon/client/FalconClient.java @@ -1040,8 +1040,11 @@ public class FalconClient { public String submitRecipe(String recipeName, String recipeToolClassName) throws FalconCLIException { - String recipePath = clientProperties.getProperty("falcon.recipe.path", - System.getProperty("falcon.home")); + String recipePath = clientProperties.getProperty("falcon.recipe.path"); + + if (StringUtils.isEmpty(recipePath)) { + throw new FalconCLIException("falcon.recipe.path is not set in client.properties"); + } String recipeFilePath = recipePath + File.separator + recipeName + TEMPLATE_SUFFIX; File file = new File(recipeFilePath); @@ -1055,7 +1058,7 @@ public class FalconClient { throw new FalconCLIException("Recipe properties file does not exist : " + propertiesFilePath); } - String processFile = null; + String processFile; try { String prefix = "falcon-recipe" + "-" + System.currentTimeMillis(); File tmpPath = new File("/tmp"); @@ -1082,12 +1085,9 @@ public class FalconClient { RecipeTool.main(args); } validate(EntityType.PROCESS.toString(), processFile); - String result = submitAndSchedule(EntityType.PROCESS.toString(), processFile); - return result + System.getProperty("line.separator") + "Submitted process entity: " + processFile; + return submitAndSchedule(EntityType.PROCESS.toString(), processFile); } catch (Exception e) { - String msg = (processFile == null) ? e.getMessage() - : e.getMessage() + System.getProperty("line.separator") + "Submitted process entity: " + processFile; - throw new FalconCLIException(msg, e); + throw new FalconCLIException(e.getMessage(), e); } } http://git-wip-us.apache.org/repos/asf/incubator-falcon/blob/2a31462c/client/src/main/java/org/apache/falcon/recipe/RecipeTool.java ---------------------------------------------------------------------- diff --git a/client/src/main/java/org/apache/falcon/recipe/RecipeTool.java b/client/src/main/java/org/apache/falcon/recipe/RecipeTool.java index bd91e8e..1a2e2f0 100644 --- a/client/src/main/java/org/apache/falcon/recipe/RecipeTool.java +++ b/client/src/main/java/org/apache/falcon/recipe/RecipeTool.java @@ -22,6 +22,7 @@ import org.apache.commons.cli.CommandLine; import org.apache.commons.cli.ParseException; import org.apache.commons.cli.GnuParser; import org.apache.commons.cli.Option; +import org.apache.commons.io.FileUtils; import org.apache.commons.io.FilenameUtils; import org.apache.commons.io.IOUtils; import org.apache.commons.lang.StringUtils; @@ -71,17 +72,20 @@ public class RecipeTool extends Configured implements Tool { Properties recipeProperties = loadProperties(recipePropertiesFilePath); validateProperties(recipeProperties); - validateArtifacts(recipeProperties); - String recipeName = FilenameUtils.getBaseName(recipePropertiesFilePath); - FileSystem fs = getFileSystemForHdfs(recipeProperties); + + validateArtifacts(recipeProperties, fs); + + String recipeName = FilenameUtils.getBaseName(recipePropertiesFilePath); copyFilesToHdfsIfRequired(recipeProperties, fs, recipeName); Map<String, String> overlayMap = getOverlay(recipeProperties); - overlayParametersOverTemplate(argMap.get(RecipeToolArgs.RECIPE_FILE_ARG), argMap.get(RecipeToolArgs - .RECIPE_PROCESS_XML_FILE_PATH_ARG), overlayMap); + String processFilename = overlayParametersOverTemplate(argMap.get(RecipeToolArgs.RECIPE_FILE_ARG), + argMap.get(RecipeToolArgs.RECIPE_PROCESS_XML_FILE_PATH_ARG), overlayMap); + System.out.println("Generated process file to be scheduled: "); + System.out.println(FileUtils.readFileToString(new File(processFilename))); - System.out.println("Completed disaster recovery"); + System.out.println("Completed recipe processing"); return 0; } @@ -130,19 +134,24 @@ public class RecipeTool extends Configured implements Tool { } } - private static void validateArtifacts(final Properties recipeProperties) throws Exception{ + private static void validateArtifacts(final Properties recipeProperties, final FileSystem fs) throws Exception{ // validate the WF path String wfPath = recipeProperties.getProperty(RecipeToolOptions.WORKFLOW_PATH.getName()); - // If the file doesn't exist locally throw exception - if (!StringUtils.isEmpty(wfPath) && !doesFileExist(wfPath)) { - throw new Exception("Recipe workflow file does not exist : " + wfPath); + // Check if file exists on HDFS + if (StringUtils.isNotEmpty(wfPath) && !fs.exists(new Path(wfPath))) { + // If the file doesn't exist locally throw exception + if (!doesFileExist(wfPath)) { + throw new Exception("Recipe workflow file does not exist : " + wfPath + " on local FS or HDFS"); + } } // validate lib path String libPath = recipeProperties.getProperty(RecipeToolOptions.WORKFLOW_LIB_PATH.getName()); - if (!StringUtils.isEmpty(libPath) && !doesFileExist(libPath)) { - throw new Exception("Recipe lib file path does not exist : " + libPath); + if (StringUtils.isNotEmpty(libPath) && !fs.exists(new Path(libPath))) { + if (!doesFileExist(libPath)) { + throw new Exception("Recipe lib file path does not exist : " + libPath + " on local FS or HDFS"); + } } } @@ -177,7 +186,8 @@ public class RecipeTool extends Configured implements Tool { String variable = line.substring(matcher.start(), matcher.end()); String paramString = overlay.get(variable.substring(2, variable.length() - 2)); if (paramString == null) { - throw new Exception("Match not found for the template: " + variable); + throw new Exception("Match not found for the template: " + variable + + ". Please add it in recipe properties file"); } line = line.replace(variable, paramString); matcher = RECIPE_VAR_PATTERN.matcher(line); @@ -195,39 +205,50 @@ public class RecipeTool extends Configured implements Tool { private static void copyFilesToHdfsIfRequired(final Properties recipeProperties, final FileSystem fs, final String recipeName) throws Exception { + + String hdfsPath = HDFS_WF_PATH + recipeName + File.separator; + String recipeWfPathName = RecipeToolOptions.WORKFLOW_PATH.getName(); String wfPath = recipeProperties.getProperty(recipeWfPathName); String wfPathValue; - String hdfsPath = HDFS_WF_PATH + recipeName + File.separator; - if (!StringUtils.isEmpty(wfPath)) { + // Copy only if files are on local FS + if (StringUtils.isNotEmpty(wfPath) && !fs.exists(new Path(wfPath))) { createDirOnHdfs(hdfsPath, fs); if (new File(wfPath).isDirectory()) { wfPathValue = hdfsPath + getLastPartOfPath(wfPath); + copyFileFromLocalToHdfs(wfPath, hdfsPath, true, wfPathValue, fs); } else { wfPathValue = hdfsPath + new File(wfPath).getName(); + copyFileFromLocalToHdfs(wfPath, hdfsPath, false, null, fs); } - copyFileFromLocalToHdfs(wfPath, hdfsPath, fs); // Update the property with the hdfs path - recipeProperties.setProperty(recipeWfPathName, wfPathValue); - System.out.println("recipeWfPathName: " + recipeProperties.getProperty(recipeWfPathName)); + recipeProperties.setProperty(recipeWfPathName, + fs.getFileStatus(new Path(wfPathValue)).getPath().toString()); + System.out.println("Copied WF to: " + recipeProperties.getProperty(recipeWfPathName)); } String recipeWfLibPathName = RecipeToolOptions.WORKFLOW_LIB_PATH.getName(); String libPath = recipeProperties.getProperty(recipeWfLibPathName); String libPathValue; - if (!StringUtils.isEmpty(libPath)) { + // Copy only if files are on local FS + boolean isLibPathEmpty = StringUtils.isEmpty(libPath); + if (!isLibPathEmpty && !fs.exists(new Path(libPath))) { if (new File(libPath).isDirectory()) { libPathValue = hdfsPath + getLastPartOfPath(libPath); - copyFileFromLocalToHdfs(libPath, hdfsPath, fs); + copyFileFromLocalToHdfs(libPath, hdfsPath, true, libPathValue, fs); } else { libPathValue = hdfsPath + "lib" + File.separator + new File(libPath).getName(); - copyFileFromLocalToHdfs(libPath, libPathValue, fs); + copyFileFromLocalToHdfs(libPath, libPathValue, false, null, fs); } // Update the property with the hdfs path - recipeProperties.setProperty(recipeWfLibPathName, libPathValue); - System.out.println("recipeWfLibPathName: " + recipeProperties.getProperty(recipeWfLibPathName)); + recipeProperties.setProperty(recipeWfLibPathName, + fs.getFileStatus(new Path(libPathValue)).getPath().toString()); + System.out.println("Copied WF libs to: " + recipeProperties.getProperty(recipeWfLibPathName)); + } else if (isLibPathEmpty) { + // Replace ##workflow.lib.path## with "" to ignore lib in workflow template + recipeProperties.setProperty(recipeWfLibPathName, ""); } } @@ -250,9 +271,18 @@ public class RecipeTool extends Configured implements Tool { private static void copyFileFromLocalToHdfs(final String localFilePath, final String hdfsFilePath, + final boolean copyDir, + final String hdfsFileDirPath, final FileSystem fs) throws IOException { - // For cases where validation of process entity file fails, the artifacts would have been already copied to - // HDFS. Set overwrite to true so that next submit recipe copies updated artifats from local FS to HDFS + /* If directory already exists and has contents, copyFromLocalFile with overwrite set to yes will fail with + * "Target is a directory". Delete the directory */ + if (copyDir) { + Path hdfsPath = new Path(hdfsFileDirPath); + fs.delete(hdfsPath, true); + } + + /* For cases where validation of process entity file fails, the artifacts would have been already copied to + * HDFS. Set overwrite to true so that next submit recipe copies updated artifacts from local FS to HDFS */ fs.copyFromLocalFile(false, true, new Path(localFilePath), new Path(hdfsFilePath)); } http://git-wip-us.apache.org/repos/asf/incubator-falcon/blob/2a31462c/common/src/main/java/org/apache/falcon/entity/ProcessHelper.java ---------------------------------------------------------------------- diff --git a/common/src/main/java/org/apache/falcon/entity/ProcessHelper.java b/common/src/main/java/org/apache/falcon/entity/ProcessHelper.java index 8e0c87a..2565bf6 100644 --- a/common/src/main/java/org/apache/falcon/entity/ProcessHelper.java +++ b/common/src/main/java/org/apache/falcon/entity/ProcessHelper.java @@ -101,10 +101,11 @@ public final class ProcessHelper { public static Path getUserLibPath(Process process, org.apache.falcon.entity.v0.cluster.Cluster cluster, Path buildPath) throws FalconException { try { - if (process.getWorkflow().getLib() == null) { + String userLibPath = process.getWorkflow().getLib(); + if (StringUtils.isEmpty(userLibPath)) { return null; } - Path libPath = new Path(process.getWorkflow().getLib()); + Path libPath = new Path(userLibPath); FileSystem fs = HadoopClientFactory.get().createProxiedFileSystem(ClusterHelper.getConfiguration(cluster)); if (fs.isFile(libPath)) { http://git-wip-us.apache.org/repos/asf/incubator-falcon/blob/2a31462c/common/src/main/java/org/apache/falcon/entity/parser/ProcessEntityParser.java ---------------------------------------------------------------------- diff --git a/common/src/main/java/org/apache/falcon/entity/parser/ProcessEntityParser.java b/common/src/main/java/org/apache/falcon/entity/parser/ProcessEntityParser.java index c2be6bd..9be4e85 100644 --- a/common/src/main/java/org/apache/falcon/entity/parser/ProcessEntityParser.java +++ b/common/src/main/java/org/apache/falcon/entity/parser/ProcessEntityParser.java @@ -18,6 +18,7 @@ package org.apache.falcon.entity.parser; +import org.apache.commons.lang.StringUtils; import org.apache.falcon.FalconException; import org.apache.falcon.entity.ClusterHelper; import org.apache.falcon.entity.EntityUtil; @@ -127,7 +128,7 @@ public class ProcessEntityParser extends EntityParser<Process> { "Workflow path: " + workflowPath + " does not exists in HDFS: " + nameNode); } - if (libPath != null && !fs.exists(new Path(libPath))) { + if (StringUtils.isNotEmpty(libPath) && !fs.exists(new Path(libPath))) { throw new ValidationException("Lib path: " + libPath + " does not exists in HDFS: " + nameNode); } } catch (IOException e) { http://git-wip-us.apache.org/repos/asf/incubator-falcon/blob/2a31462c/common/src/main/java/org/apache/falcon/update/UpdateHelper.java ---------------------------------------------------------------------- diff --git a/common/src/main/java/org/apache/falcon/update/UpdateHelper.java b/common/src/main/java/org/apache/falcon/update/UpdateHelper.java index af93180..ac882d5 100644 --- a/common/src/main/java/org/apache/falcon/update/UpdateHelper.java +++ b/common/src/main/java/org/apache/falcon/update/UpdateHelper.java @@ -132,7 +132,7 @@ public final class UpdateHelper { //Get checksum from user wf/lib Map<String, String> wfPaths = checksumAndCopy(fs, new Path(process.getWorkflow().getPath()), null); - if (process.getWorkflow().getLib() != null) { + if (StringUtils.isNotEmpty(process.getWorkflow().getLib())) { wfPaths.putAll(checksumAndCopy(fs, new Path(process.getWorkflow().getLib()), null)); } http://git-wip-us.apache.org/repos/asf/incubator-falcon/blob/2a31462c/docs/src/site/twiki/recipes.twiki ---------------------------------------------------------------------- diff --git a/docs/src/site/twiki/recipes.twiki b/docs/src/site/twiki/recipes.twiki index e733889..c6cfec3 100644 --- a/docs/src/site/twiki/recipes.twiki +++ b/docs/src/site/twiki/recipes.twiki @@ -26,8 +26,8 @@ Falcon CLI functionality to support recipes has been added. <a href="./FalconCLI.html">Recipe command usage is defined here.</a> CLI accepts recipe option with a recipe name and optional tool and does the following: - * Validates the options, name option is mandatory and tool is optional and should be provided if user wants to override the base recipe tool - * Looks for <name>-tempalte.xml and <name>.properties file in the path specified by falcon.recipe.path in client.properties or falcon.home. If files cannot be found then Falcon CLI will fail + * Validates the options; name option is mandatory and tool is optional and should be provided if user wants to override the base recipe tool + * Looks for <name>-workflow.xml, <name>-template.xml and <name>.properties file in the path specified by falcon.recipe.path in client.properties. If files cannot be found then Falcon CLI will fail * Invokes a Tool to substitute the properties in the templated process for the recipe. By default invokes base tool if tool option is not passed. Tool is responsible for generating process entity at the path specified by FalconCLI * Validates the generated entity * Submit and schedule this entity @@ -38,8 +38,8 @@ CLI accepts recipe option with a recipe name and optional tool and does the foll Falcon provides a base tool that recipes can override. Base Recipe tool does the following: * Expects recipe template file path, recipe properties file path and path where process entity to be submitted should be generated. Validates these arguments * Validates the artifacts i.e. workflow and/or lib files specified in the recipe template exists on local filesystem or HDFS at the specified path else returns error - * Copies if the artifacts exists only on local filesystem - * If artifacts already exists on HDFS then hdfs path should be included in recipe template file. If the artifacts are on local filesystem then falcon.recipe.workflow.path and falcon.recipe.workflow.lib.path are mandatory properties in recipe property file. Recipe tool will copy the local artifacts only if these properties are set in properties file + * Copies if the artifacts exists on local filesystem + * If workflow is on local FS then falcon.recipe.workflow.path in recipe property file is mandatory for it to be copied to HDFS. If templated process requires custom libs falcon.recipe.workflow.lib.path property is mandatory for them to be copied from Local FS to HDFS. Recipe tool will copy the local artifacts only if these properties are set in properties file * Looks for the patten ##[A-Za-z0-9_.]*## in the templated process and substitutes it with the properties. Process entity generated after the substitution is written to the empty file passed by FalconCLI ---++ Recipe template file format @@ -62,6 +62,12 @@ Recipe template will have <workflow name="##workflow.name##">. Recipe tool will and replace it with the property value "hdfs-dr-workflow". Substituted template will have <workflow name="hdfs-dr-workflow"> </verbatim> +---++ Managing the scheduled recipe process + * Scheduled recipe process is similar to regular process + * List : falcon entity -type process -name <recipe-process-name> -list + * Status : falcon entity -type process -name <recipe-process-name> -status + * Delete : falcon entity -type process -name <recipe-process-name> -delete + ---++ Sample recipes * Sample recipes are published in addons/recipes http://git-wip-us.apache.org/repos/asf/incubator-falcon/blob/2a31462c/oozie/src/main/java/org/apache/falcon/oozie/process/ProcessBundleBuilder.java ---------------------------------------------------------------------- diff --git a/oozie/src/main/java/org/apache/falcon/oozie/process/ProcessBundleBuilder.java b/oozie/src/main/java/org/apache/falcon/oozie/process/ProcessBundleBuilder.java index ab38259..8f97ffa 100644 --- a/oozie/src/main/java/org/apache/falcon/oozie/process/ProcessBundleBuilder.java +++ b/oozie/src/main/java/org/apache/falcon/oozie/process/ProcessBundleBuilder.java @@ -18,6 +18,7 @@ package org.apache.falcon.oozie.process; +import org.apache.commons.lang.StringUtils; import org.apache.falcon.FalconException; import org.apache.falcon.Tag; import org.apache.falcon.entity.ClusterHelper; @@ -124,7 +125,8 @@ public class ProcessBundleBuilder extends OozieBundleBuilder<Process> { //Copy user workflow and lib to staging dir Map<String, String> checksums = UpdateHelper.checksumAndCopy(fs, new Path(entity.getWorkflow().getPath()), new Path(buildPath, EntityUtil.PROCESS_USER_DIR)); - if (entity.getWorkflow().getLib() != null && fs.exists(new Path(entity.getWorkflow().getLib()))) { + if (StringUtils.isNotEmpty(entity.getWorkflow().getLib()) + && fs.exists(new Path(entity.getWorkflow().getLib()))) { checksums.putAll(UpdateHelper.checksumAndCopy(fs, new Path(entity.getWorkflow().getLib()), new Path(buildPath, EntityUtil.PROCESS_USERLIB_DIR))); } http://git-wip-us.apache.org/repos/asf/incubator-falcon/blob/2a31462c/replication/src/main/java/org/apache/falcon/replication/FeedReplicator.java ---------------------------------------------------------------------- diff --git a/replication/src/main/java/org/apache/falcon/replication/FeedReplicator.java b/replication/src/main/java/org/apache/falcon/replication/FeedReplicator.java index d927fff..211c8bf 100644 --- a/replication/src/main/java/org/apache/falcon/replication/FeedReplicator.java +++ b/replication/src/main/java/org/apache/falcon/replication/FeedReplicator.java @@ -62,7 +62,9 @@ public class FeedReplicator extends Configured implements Tool { LOG.info("{} found conf ? {}", confPath, confPath.getFileSystem(conf).exists(confPath)); conf.addResource(confPath); - final boolean includePathSet = !IGNORE.equalsIgnoreCase(conf.get("falcon.include.path")); + String includePathConf = conf.get("falcon.include.path"); + final boolean includePathSet = (includePathConf != null) + && !IGNORE.equalsIgnoreCase(includePathConf); DistCp distCp = (includePathSet) ? new CustomReplicator(conf, options)