http://git-wip-us.apache.org/repos/asf/falcon/blob/cc1d3840/addons/recipes/hive-disaster-recovery/src/main/resources/hive-disaster-recovery-workflow.xml
----------------------------------------------------------------------
diff --git 
a/addons/recipes/hive-disaster-recovery/src/main/resources/hive-disaster-recovery-workflow.xml
 
b/addons/recipes/hive-disaster-recovery/src/main/resources/hive-disaster-recovery-workflow.xml
new file mode 100644
index 0000000..c441998
--- /dev/null
+++ 
b/addons/recipes/hive-disaster-recovery/src/main/resources/hive-disaster-recovery-workflow.xml
@@ -0,0 +1,293 @@
+<!--
+  Licensed to the Apache Software Foundation (ASF) under one
+  or more contributor license agreements.  See the NOTICE file
+  distributed with this work for additional information
+  regarding copyright ownership.  The ASF licenses this file
+  to you under the Apache License, Version 2.0 (the
+  "License"); you may not use this file except in compliance
+  with the License.  You may obtain a copy of the License at
+
+      http://www.apache.org/licenses/LICENSE-2.0
+
+  Unless required by applicable law or agreed to in writing, software
+  distributed under the License is distributed on an "AS IS" BASIS,
+  WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+  See the License for the specific language governing permissions and
+  limitations under the License.
+  -->
+<workflow-app xmlns='uri:oozie:workflow:0.3' name='falcon-dr-hive-workflow'>
+    <start to='last-event'/>
+    <action name="last-event">
+        <java>
+            <job-tracker>${jobTracker}</job-tracker>
+            <name-node>${nameNode}</name-node>
+            <configuration>
+                <property> <!-- hadoop 2 parameter -->
+                    
<name>oozie.launcher.mapreduce.job.user.classpath.first</name>
+                    <value>true</value>
+                </property>
+                <property>
+                    <name>mapred.job.queue.name</name>
+                    <value>${queueName}</value>
+                </property>
+                <property>
+                    <name>oozie.launcher.mapred.job.priority</name>
+                    <value>${jobPriority}</value>
+                </property>
+                <property>
+                    <name>oozie.use.system.libpath</name>
+                    <value>true</value>
+                </property>
+                <property>
+                    <name>oozie.action.sharelib.for.java</name>
+                    <value>distcp,hive,hive2,hcatalog</value>
+                </property>
+            </configuration>
+            <main-class>org.apache.falcon.hive.HiveDRTool</main-class>
+            <arg>-Dmapred.job.queue.name=${queueName}</arg>
+            <arg>-Dmapred.job.priority=${jobPriority}</arg>
+            <arg>-falconLibPath</arg>
+            <arg>${wf:conf("falcon.libpath")}</arg>
+            <arg>-sourceCluster</arg>
+            <arg>${sourceCluster}</arg>
+            <arg>-sourceMetastoreUri</arg>
+            <arg>${sourceMetastoreUri}</arg>
+            <arg>-sourceHiveServer2Uri</arg>
+            <arg>${sourceHiveServer2Uri}</arg>
+            <arg>-sourceDatabase</arg>
+            <arg>${sourceDatabase}</arg>
+            <arg>-sourceTable</arg>
+            <arg>${sourceTable}</arg>
+            <arg>-sourceStagingPath</arg>
+            <arg>${sourceStagingPath}</arg>
+            <arg>-sourceNN</arg>
+            <arg>${sourceNN}</arg>
+            <arg>-targetCluster</arg>
+            <arg>${targetCluster}</arg>
+            <arg>-targetMetastoreUri</arg>
+            <arg>${targetMetastoreUri}</arg>
+            <arg>-targetHiveServer2Uri</arg>
+            <arg>${targetHiveServer2Uri}</arg>
+            <arg>-targetStagingPath</arg>
+            <arg>${targetStagingPath}</arg>
+            <arg>-targetNN</arg>
+            <arg>${targetNN}</arg>
+            <arg>-maxEvents</arg>
+            <arg>${maxEvents}</arg>
+            <arg>-clusterForJobRun</arg>
+            <arg>${clusterForJobRun}</arg>
+            <arg>-clusterForJobRunWriteEP</arg>
+            <arg>${clusterForJobRunWriteEP}</arg>
+            <arg>-drJobName</arg>
+            <arg>${drJobName}-${nominalTime}</arg>
+            <arg>-executionStage</arg>
+            <arg>lastevents</arg>
+        </java>
+        <ok to="export-dr-replication"/>
+        <error to="failure"/>
+    </action>
+    <!-- Export Replication action -->
+    <action name="export-dr-replication">
+        <java>
+            <job-tracker>${jobTracker}</job-tracker>
+            <name-node>${nameNode}</name-node>
+            <configuration>
+                <property> <!-- hadoop 2 parameter -->
+                    
<name>oozie.launcher.mapreduce.job.user.classpath.first</name>
+                    <value>true</value>
+                </property>
+                <property>
+                    <name>mapred.job.queue.name</name>
+                    <value>${queueName}</value>
+                </property>
+                <property>
+                    <name>oozie.launcher.mapred.job.priority</name>
+                    <value>${jobPriority}</value>
+                </property>
+                <property>
+                    <name>oozie.use.system.libpath</name>
+                    <value>true</value>
+                </property>
+                <property>
+                    <name>oozie.action.sharelib.for.java</name>
+                    <value>distcp,hive,hive2,hcatalog</value>
+                </property>
+            </configuration>
+            <main-class>org.apache.falcon.hive.HiveDRTool</main-class>
+            <arg>-Dmapred.job.queue.name=${queueName}</arg>
+            <arg>-Dmapred.job.priority=${jobPriority}</arg>
+            <arg>-falconLibPath</arg>
+            <arg>${wf:conf("falcon.libpath")}</arg>
+            <arg>-replicationMaxMaps</arg>
+            <arg>${replicationMaxMaps}</arg>
+            <arg>-distcpMaxMaps</arg>
+            <arg>${distcpMaxMaps}</arg>
+            <arg>-sourceCluster</arg>
+            <arg>${sourceCluster}</arg>
+            <arg>-sourceMetastoreUri</arg>
+            <arg>${sourceMetastoreUri}</arg>
+            <arg>-sourceHiveServer2Uri</arg>
+            <arg>${sourceHiveServer2Uri}</arg>
+            <arg>-sourceDatabase</arg>
+            <arg>${sourceDatabase}</arg>
+            <arg>-sourceTable</arg>
+            <arg>${sourceTable}</arg>
+            <arg>-sourceStagingPath</arg>
+            <arg>${sourceStagingPath}</arg>
+            <arg>-sourceNN</arg>
+            <arg>${sourceNN}</arg>
+            <arg>-targetCluster</arg>
+            <arg>${targetCluster}</arg>
+            <arg>-targetMetastoreUri</arg>
+            <arg>${targetMetastoreUri}</arg>
+            <arg>-targetHiveServer2Uri</arg>
+            <arg>${targetHiveServer2Uri}</arg>
+            <arg>-targetStagingPath</arg>
+            <arg>${targetStagingPath}</arg>
+            <arg>-targetNN</arg>
+            <arg>${targetNN}</arg>
+            <arg>-maxEvents</arg>
+            <arg>${maxEvents}</arg>
+            <arg>-distcpMapBandwidth</arg>
+            <arg>${distcpMapBandwidth}</arg>
+            <arg>-clusterForJobRun</arg>
+            <arg>${clusterForJobRun}</arg>
+            <arg>-clusterForJobRunWriteEP</arg>
+            <arg>${clusterForJobRunWriteEP}</arg>
+            <arg>-drJobName</arg>
+            <arg>${drJobName}-${nominalTime}</arg>
+            <arg>-executionStage</arg>
+            <arg>export</arg>
+        </java>
+        <ok to="import-dr-replication"/>
+        <error to="failure"/>
+    </action>
+    <!-- Import Replication action -->
+    <action name="import-dr-replication">
+        <java>
+            <job-tracker>${jobTracker}</job-tracker>
+            <name-node>${nameNode}</name-node>
+            <configuration>
+                <property> <!-- hadoop 2 parameter -->
+                    
<name>oozie.launcher.mapreduce.job.user.classpath.first</name>
+                    <value>true</value>
+                </property>
+                <property>
+                    <name>mapred.job.queue.name</name>
+                    <value>${queueName}</value>
+                </property>
+                <property>
+                    <name>oozie.launcher.mapred.job.priority</name>
+                    <value>${jobPriority}</value>
+                </property>
+                <property>
+                    <name>oozie.use.system.libpath</name>
+                    <value>true</value>
+                </property>
+                <property>
+                    <name>oozie.action.sharelib.for.java</name>
+                    <value>distcp,hive,hive2,hcatalog</value>
+                </property>
+            </configuration>
+            <main-class>org.apache.falcon.hive.HiveDRTool</main-class>
+            <arg>-Dmapred.job.queue.name=${queueName}</arg>
+            <arg>-Dmapred.job.priority=${jobPriority}</arg>
+            <arg>-falconLibPath</arg>
+            <arg>${wf:conf("falcon.libpath")}</arg>
+            <arg>-replicationMaxMaps</arg>
+            <arg>${replicationMaxMaps}</arg>
+            <arg>-distcpMaxMaps</arg>
+            <arg>${distcpMaxMaps}</arg>
+            <arg>-sourceCluster</arg>
+            <arg>${sourceCluster}</arg>
+            <arg>-sourceMetastoreUri</arg>
+            <arg>${sourceMetastoreUri}</arg>
+            <arg>-sourceHiveServer2Uri</arg>
+            <arg>${sourceHiveServer2Uri}</arg>
+            <arg>-sourceDatabase</arg>
+            <arg>${sourceDatabase}</arg>
+            <arg>-sourceTable</arg>
+            <arg>${sourceTable}</arg>
+            <arg>-sourceStagingPath</arg>
+            <arg>${sourceStagingPath}</arg>
+            <arg>-sourceNN</arg>
+            <arg>${sourceNN}</arg>
+            <arg>-targetCluster</arg>
+            <arg>${targetCluster}</arg>
+            <arg>-targetMetastoreUri</arg>
+            <arg>${targetMetastoreUri}</arg>
+            <arg>-targetHiveServer2Uri</arg>
+            <arg>${targetHiveServer2Uri}</arg>
+            <arg>-targetStagingPath</arg>
+            <arg>${targetStagingPath}</arg>
+            <arg>-targetNN</arg>
+            <arg>${targetNN}</arg>
+            <arg>-maxEvents</arg>
+            <arg>${maxEvents}</arg>
+            <arg>-distcpMapBandwidth</arg>
+            <arg>${distcpMapBandwidth}</arg>
+            <arg>-clusterForJobRun</arg>
+            <arg>${clusterForJobRun}</arg>
+            <arg>-clusterForJobRunWriteEP</arg>
+            <arg>${clusterForJobRunWriteEP}</arg>
+            <arg>-drJobName</arg>
+            <arg>${drJobName}-${nominalTime}</arg>
+            <arg>-executionStage</arg>
+            <arg>import</arg>
+        </java>
+        <ok to="success"/>
+        <error to="failure"/>
+    </action>
+    <decision name="success">
+        <switch>
+            <case to="successAlert">
+                ${drNotificationReceivers ne 'NA'}
+            </case>
+            <default to="end"/>
+        </switch>
+    </decision>
+    <decision name="failure">
+        <switch>
+            <case to="failureAlert">
+                ${drNotificationReceivers ne 'NA'}
+            </case>
+            <default to="fail"/>
+        </switch>
+    </decision>
+    <action name="successAlert">
+        <email xmlns="uri:oozie:email-action:0.2">
+            <to>${drNotificationReceivers}</to>
+            <subject>INFO: Hive DR workflow ${drJobName} completed 
successfully</subject>
+            <body>
+                The Hive DR workflow ${wf:id()} is successful.
+                Source          = ${sourceCluster}
+                Target          = ${targetCluster}
+                DB Name         = ${sourceDatabase}
+                Table Name      = ${sourceTable}
+            </body>
+        </email>
+        <ok to="end"/>
+        <error to="end"/>
+    </action>
+    <action name="failureAlert">
+        <email xmlns="uri:oozie:email-action:0.2">
+            <to>${drNotificationReceivers}</to>
+            <subject>ERROR: Hive DR workflow ${drJobName} failed</subject>
+            <body>
+                The Hive DR workflow ${wf:id()} had issues and was killed.  
The error message is: ${wf:errorMessage(wf:lastErrorNode())}
+                Source          = ${sourceCluster}
+                Target          = ${targetCluster}
+                DB Name         = ${sourceDatabase}
+                Table Name      = ${sourceTable}
+            </body>
+        </email>
+        <ok to="end"/>
+        <error to="fail"/>
+    </action>
+    <kill name="fail">
+        <message>
+            Workflow action failed, error 
message[${wf:errorMessage(wf:lastErrorNode())}]
+        </message>
+    </kill>
+    <end name="end"/>
+</workflow-app>

http://git-wip-us.apache.org/repos/asf/falcon/blob/cc1d3840/addons/recipes/hive-disaster-recovery/src/main/resources/hive-disaster-recovery.properties
----------------------------------------------------------------------
diff --git 
a/addons/recipes/hive-disaster-recovery/src/main/resources/hive-disaster-recovery.properties
 
b/addons/recipes/hive-disaster-recovery/src/main/resources/hive-disaster-recovery.properties
new file mode 100644
index 0000000..42ae30b
--- /dev/null
+++ 
b/addons/recipes/hive-disaster-recovery/src/main/resources/hive-disaster-recovery.properties
@@ -0,0 +1,94 @@
+#
+# Licensed to the Apache Software Foundation (ASF) under one
+# or more contributor license agreements.  See the NOTICE file
+# distributed with this work for additional information
+# regarding copyright ownership.  The ASF licenses this file
+# to you under the Apache License, Version 2.0 (the
+# "License"); you may not use this file except in compliance
+# with the License.  You may obtain a copy of the License at
+#
+#     http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing, software
+# distributed under the License is distributed on an "AS IS" BASIS,
+# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+# See the License for the specific language governing permissions and
+# limitations under the License.
+#
+
+##### NOTE: This is a TEMPLATE file which can be copied and edited
+
+##### Recipe properties
+falcon.recipe.name=hive-disaster-recovery
+
+
+##### Workflow properties
+falcon.recipe.workflow.name=hive-dr-workflow
+# Provide Wf absolute path. This can be HDFS or local FS path. If WF is on 
local FS it will be copied to HDFS
+falcon.recipe.workflow.path=/recipes/hive-replication/hive-disaster-recovery-workflow.xml
+
+##### Cluster properties
+
+# Change the cluster name where replication job should run here
+falcon.recipe.cluster.name=backupCluster
+# Change the cluster hdfs write end point here. This is mandatory.
+falcon.recipe.cluster.hdfs.writeEndPoint=hdfs://localhost:8020
+# Change the cluster validity start time here
+falcon.recipe.cluster.validity.start=2014-10-01T00:00Z
+# Change the cluster validity end time here
+falcon.recipe.cluster.validity.end=2016-12-30T00:00Z
+
+##### Scheduling properties
+
+# Change the process frequency here. Valid frequency type are minutes, hours, 
days, months
+falcon.recipe.process.frequency=minutes(60)
+
+##### Retry policy properties
+
+falcon.recipe.retry.policy=periodic
+falcon.recipe.retry.delay=minutes(30)
+falcon.recipe.retry.attempts=3
+
+##### Tag properties - An optional list of comma separated tags, Key Value 
Pairs, separated by comma
+##### Uncomment to add tags
+#falcon.recipe.tags=owner=landing,pipeline=adtech
+
+##### ACL properties - Uncomment and change ACL if authorization is enabled
+
+#falcon.recipe.acl.owner=testuser
+#falcon.recipe.acl.group=group
+#falcon.recipe.acl.permission=0x755
+
+##### Custom Job properties
+
+##### Source Cluster DR properties
+sourceCluster=primaryCluster
+sourceMetastoreUri=thrift://localhost:9083
+sourceHiveServer2Uri=hive2://localhost:10000
+# For DB level replicaiton to replicate multiple databases specify comma 
separated list of tables
+sourceDatabase=default
+# For DB level replication specify * for sourceTable.
+# For table level replication to replicate multiple tables specify comma 
separated list of tables
+sourceTable=testtable_dr
+sourceStagingPath=/apps/hive/tools/dr
+sourceNN=hdfs://localhost:8020
+
+##### Target Cluster DR properties
+targetCluster=backupCluster
+targetMetastoreUri=thrift://localhost:9083
+targetHiveServer2Uri=hive2://localhost:10000
+targetStagingPath=/apps/hive/tools/dr
+targetNN=hdfs://localhost:8020
+
+# To ceil the max events processed each time job runs. Set it to max value 
depending on your bandwidth limit.
+# Setting it to -1 will process all the events but can hog up the bandwidth. 
Use it judiciously!
+maxEvents=-1
+# Change it to specify the maximum number of mappers for replication
+replicationMaxMaps=5
+# Change it to specify the maximum number of mappers for DistCP
+distcpMaxMaps=1
+# Change it to specify the bandwidth in MB for each mapper in DistCP
+distcpMapBandwidth=100
+
+##### Email on failure
+drNotificationReceivers=NA
\ No newline at end of file

http://git-wip-us.apache.org/repos/asf/falcon/blob/cc1d3840/client/pom.xml
----------------------------------------------------------------------
diff --git a/client/pom.xml b/client/pom.xml
index afa91c9..c162125 100644
--- a/client/pom.xml
+++ b/client/pom.xml
@@ -105,6 +105,12 @@
             <groupId>org.testng</groupId>
             <artifactId>testng</artifactId>
         </dependency>
+
+        <dependency>
+            <groupId>org.apache.hive.hcatalog</groupId>
+            <artifactId>hive-webhcat-java-client</artifactId>
+            <version>${hive.version}</version>
+        </dependency>
     </dependencies>
 
     <build>

http://git-wip-us.apache.org/repos/asf/falcon/blob/cc1d3840/client/src/main/java/org/apache/falcon/cli/FalconCLI.java
----------------------------------------------------------------------
diff --git a/client/src/main/java/org/apache/falcon/cli/FalconCLI.java 
b/client/src/main/java/org/apache/falcon/cli/FalconCLI.java
index 148f789..11f6bff 100644
--- a/client/src/main/java/org/apache/falcon/cli/FalconCLI.java
+++ b/client/src/main/java/org/apache/falcon/cli/FalconCLI.java
@@ -124,8 +124,17 @@ public class FalconCLI {
     // Recipe Command
     public static final String RECIPE_CMD = "recipe";
     public static final String RECIPE_NAME = "name";
+    public static final String RECIPE_OPERATION= "operation";
     public static final String RECIPE_TOOL_CLASS_NAME = "tool";
 
+    /**
+     * Recipe operation enum.
+     */
+    public static enum RecipeOperation {
+        HDFS_REPLICATION,
+        HIVE_DISASTER_RECOVERY
+    }
+
     private final Properties clientProperties;
 
     public FalconCLI() throws Exception {
@@ -914,6 +923,9 @@ public class FalconCLI {
         Option recipeToolClassName = new Option(RECIPE_TOOL_CLASS_NAME, true, 
"recipe class");
         recipeOptions.addOption(recipeToolClassName);
 
+        Option recipeOperation = new Option(RECIPE_OPERATION, true, "recipe 
operation");
+        recipeOptions.addOption(recipeOperation);
+
         return recipeOptions;
     }
 
@@ -1005,11 +1017,23 @@ public class FalconCLI {
     private void recipeCommand(CommandLine commandLine, FalconClient client) 
throws FalconCLIException {
         String recipeName = commandLine.getOptionValue(RECIPE_NAME);
         String recipeToolClass = 
commandLine.getOptionValue(RECIPE_TOOL_CLASS_NAME);
+        String recipeOperation = commandLine.getOptionValue(RECIPE_OPERATION);
 
         validateNotEmpty(recipeName, RECIPE_NAME);
+        validateNotEmpty(recipeOperation, RECIPE_OPERATION);
+        validateRecipeOperations(recipeOperation);
 
-        String result =
-            client.submitRecipe(recipeName, recipeToolClass).getMessage();
+        String result = client.submitRecipe(recipeName, recipeToolClass, 
recipeOperation).toString();
         OUT.get().println(result);
     }
+
+    private static void validateRecipeOperations(String recipeOperation) 
throws FalconCLIException {
+        for(RecipeOperation operation : RecipeOperation.values()) {
+            if (operation.toString().equalsIgnoreCase(recipeOperation)) {
+                return;
+            }
+        }
+        throw new FalconCLIException("Allowed Recipe operations: "
+                + java.util.Arrays.asList((RecipeOperation.values())));
+    }
 }

http://git-wip-us.apache.org/repos/asf/falcon/blob/cc1d3840/client/src/main/java/org/apache/falcon/client/FalconClient.java
----------------------------------------------------------------------
diff --git a/client/src/main/java/org/apache/falcon/client/FalconClient.java 
b/client/src/main/java/org/apache/falcon/client/FalconClient.java
index 9649e10..d9bdf64 100644
--- a/client/src/main/java/org/apache/falcon/client/FalconClient.java
+++ b/client/src/main/java/org/apache/falcon/client/FalconClient.java
@@ -963,7 +963,8 @@ public class FalconClient extends AbstractFalconClient {
     }
 
     public APIResult submitRecipe(String recipeName,
-                               String recipeToolClassName) throws 
FalconCLIException {
+                               String recipeToolClassName,
+                               final String recipeOperation) throws 
FalconCLIException {
         String recipePath = clientProperties.getProperty("falcon.recipe.path");
 
         if (StringUtils.isEmpty(recipePath)) {
@@ -999,6 +1000,7 @@ public class FalconClient extends AbstractFalconClient {
                 "-" + RecipeToolArgs.RECIPE_FILE_ARG.getName(), recipeFilePath,
                 "-" + RecipeToolArgs.RECIPE_PROPERTIES_FILE_ARG.getName(), 
propertiesFilePath,
                 "-" + 
RecipeToolArgs.RECIPE_PROCESS_XML_FILE_PATH_ARG.getName(), processFile,
+                "-" + RecipeToolArgs.RECIPE_OPERATION_ARG.getName(), 
recipeOperation,
             };
 
             if (recipeToolClassName != null) {

http://git-wip-us.apache.org/repos/asf/falcon/blob/cc1d3840/client/src/main/java/org/apache/falcon/recipe/HdfsReplicationRecipeTool.java
----------------------------------------------------------------------
diff --git 
a/client/src/main/java/org/apache/falcon/recipe/HdfsReplicationRecipeTool.java 
b/client/src/main/java/org/apache/falcon/recipe/HdfsReplicationRecipeTool.java
new file mode 100644
index 0000000..cf24078
--- /dev/null
+++ 
b/client/src/main/java/org/apache/falcon/recipe/HdfsReplicationRecipeTool.java
@@ -0,0 +1,70 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.falcon.recipe;
+
+import org.apache.commons.lang3.StringUtils;
+
+import java.util.Properties;
+import java.io.File;
+
+/**
+ * Hdfs Replication recipe tool for Falcon recipes.
+ */
+public class HdfsReplicationRecipeTool implements Recipe {
+
+    private static final String COMMA_SEPARATOR = ",";
+
+    @Override
+    public void validate(final Properties recipeProperties) {
+        for (HdfsReplicationRecipeToolOptions option : 
HdfsReplicationRecipeToolOptions.values()) {
+            if (recipeProperties.getProperty(option.getName()) == null && 
option.isRequired()) {
+                throw new IllegalArgumentException("Missing argument: " + 
option.getName());
+            }
+        }
+    }
+
+    @Override
+    public Properties getAdditionalSystemProperties(final Properties 
recipeProperties) {
+        Properties additionalProperties = new Properties();
+
+        // Construct fully qualified hdfs src path
+        String srcPaths = 
recipeProperties.getProperty(HdfsReplicationRecipeToolOptions
+                .REPLICATION_SOURCE_DIR.getName());
+        StringBuilder absoluteSrcPaths = new StringBuilder();
+        String srcFsPath = recipeProperties.getProperty(
+                
HdfsReplicationRecipeToolOptions.REPLICATION_SOURCE_CLUSTER_FS_WRITE_ENDPOINT.getName());
+        if (StringUtils.isNotEmpty(srcFsPath)) {
+            srcFsPath = StringUtils.removeEnd(srcFsPath, File.separator);
+        }
+        if (StringUtils.isNotEmpty(srcPaths)) {
+            String[] paths = srcPaths.split(COMMA_SEPARATOR);
+
+            for (String path : paths) {
+                StringBuilder srcpath = new StringBuilder(srcFsPath);
+                srcpath.append(path.trim());
+                srcpath.append(COMMA_SEPARATOR);
+                absoluteSrcPaths.append(srcpath);
+            }
+        }
+
+        
additionalProperties.put(HdfsReplicationRecipeToolOptions.REPLICATION_SOURCE_DIR.getName(),
+                StringUtils.removeEnd(absoluteSrcPaths.toString(), 
COMMA_SEPARATOR));
+        return additionalProperties;
+    }
+}

http://git-wip-us.apache.org/repos/asf/falcon/blob/cc1d3840/client/src/main/java/org/apache/falcon/recipe/HdfsReplicationRecipeToolOptions.java
----------------------------------------------------------------------
diff --git 
a/client/src/main/java/org/apache/falcon/recipe/HdfsReplicationRecipeToolOptions.java
 
b/client/src/main/java/org/apache/falcon/recipe/HdfsReplicationRecipeToolOptions.java
new file mode 100644
index 0000000..4c3b543
--- /dev/null
+++ 
b/client/src/main/java/org/apache/falcon/recipe/HdfsReplicationRecipeToolOptions.java
@@ -0,0 +1,62 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.falcon.recipe;
+
+/**
+ * Hdfs Recipe tool options.
+ */
+public enum HdfsReplicationRecipeToolOptions {
+    REPLICATION_SOURCE_DIR("drSourceDir", "Location of source data to 
replicate"),
+    REPLICATION_SOURCE_CLUSTER_FS_WRITE_ENDPOINT("drSourceClusterFS", "Source 
replication cluster end point"),
+    REPLICATION_TARGET_DIR("drTargetDir", "Location on target cluster for 
replication"),
+    REPLICATION_TARGET_CLUSTER_FS_WRITE_ENDPOINT("drTargetClusterFS", "Target 
replication cluster end point"),
+    REPLICATION_MAX_MAPS("distcpMaxMaps", "Maximum number of maps used during 
replication"),
+    REPLICATION_MAP_BANDWIDTH_IN_MB("distcpMapBandwidth", "Bandwidth in MB/s 
used by each mapper during replication");
+
+    private final String name;
+    private final String description;
+    private final boolean isRequired;
+
+    HdfsReplicationRecipeToolOptions(String name, String description) {
+        this(name, description, true);
+    }
+
+    HdfsReplicationRecipeToolOptions(String name, String description, boolean 
isRequired) {
+        this.name = name;
+        this.description = description;
+        this.isRequired = isRequired;
+    }
+
+    public String getName() {
+        return this.name;
+    }
+
+    public String getDescription() {
+        return description;
+    }
+
+    public boolean isRequired() {
+        return isRequired;
+    }
+
+    @Override
+    public String toString() {
+        return getName();
+    }
+}

http://git-wip-us.apache.org/repos/asf/falcon/blob/cc1d3840/client/src/main/java/org/apache/falcon/recipe/HiveReplicationRecipeTool.java
----------------------------------------------------------------------
diff --git 
a/client/src/main/java/org/apache/falcon/recipe/HiveReplicationRecipeTool.java 
b/client/src/main/java/org/apache/falcon/recipe/HiveReplicationRecipeTool.java
new file mode 100644
index 0000000..8b39673
--- /dev/null
+++ 
b/client/src/main/java/org/apache/falcon/recipe/HiveReplicationRecipeTool.java
@@ -0,0 +1,196 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.falcon.recipe;
+
+import org.apache.commons.lang3.StringUtils;
+import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.hive.conf.HiveConf;
+import org.apache.hive.hcatalog.api.HCatClient;
+import org.apache.hive.hcatalog.api.HCatDatabase;
+import org.apache.hive.hcatalog.api.HCatTable;
+import org.apache.hive.hcatalog.api.ObjectNotFoundException;
+import org.apache.hive.hcatalog.cli.SemanticAnalysis.HCatSemanticAnalyzer;
+import org.apache.hive.hcatalog.common.HCatException;
+
+import java.io.IOException;
+import java.util.Properties;
+
+/**
+ * Hive Replication recipe tool for Falcon recipes.
+ */
+public class HiveReplicationRecipeTool implements Recipe {
+    private static final String ALL_TABLES = "*";
+
+    @Override
+    public void validate(final Properties recipeProperties) throws Exception {
+        for (HiveReplicationRecipeToolOptions option : 
HiveReplicationRecipeToolOptions.values()) {
+            if (recipeProperties.getProperty(option.getName()) == null && 
option.isRequired()) {
+                throw new IllegalArgumentException("Missing argument: " + 
option.getName());
+            }
+        }
+
+        HCatClient sourceMetastoreClient = null;
+        HCatClient targetMetastoreClient = null;
+        try {
+            // Validate if DB exists - source and target
+            sourceMetastoreClient = getHiveMetaStoreClient(
+                    
recipeProperties.getProperty(HiveReplicationRecipeToolOptions
+                            .REPLICATION_SOURCE_METASTORE_URI.getName()),
+                    
recipeProperties.getProperty(HiveReplicationRecipeToolOptions
+                            
.REPLICATION_SOURCE_HIVE_METASTORE_KERBEROS_PRINCIPAL.getName()),
+                    
recipeProperties.getProperty(HiveReplicationRecipeToolOptions
+                            
.REPLICATION_SOURCE_HIVE2_KERBEROS_PRINCIPAL.getName()));
+
+            String sourceDbList = recipeProperties.getProperty(
+                    
HiveReplicationRecipeToolOptions.REPLICATION_SOURCE_DATABASE.getName());
+
+            if (StringUtils.isEmpty(sourceDbList)) {
+                throw new Exception("No source DB specified in property file");
+            }
+
+            String sourceTableList = recipeProperties.getProperty(
+                    
HiveReplicationRecipeToolOptions.REPLICATION_SOURCE_TABLE.getName());
+            if (StringUtils.isEmpty(sourceTableList)) {
+                throw new Exception("No source table specified in property 
file. For DB replication please specify * "
+                        + "for sourceTable");
+            }
+
+            String[] srcDbs = sourceDbList.split(",");
+            if (srcDbs.length <= 0) {
+                throw new Exception("No source DB specified in property file");
+            }
+            for (String db : srcDbs) {
+                if (!dbExists(sourceMetastoreClient, db)) {
+                    throw new Exception("Database " + db + " doesn't exist on 
source cluster");
+                }
+            }
+
+            if (!sourceTableList.equals(ALL_TABLES)) {
+                String[] srcTables = sourceTableList.split(",");
+                if (srcTables.length > 0) {
+                    for (String table : srcTables) {
+                        if (!tableExists(sourceMetastoreClient, srcDbs[0], 
table)) {
+                            throw new Exception("Table " + table + " doesn't 
exist on source cluster");
+                        }
+                    }
+                }
+            }
+
+            targetMetastoreClient = getHiveMetaStoreClient(
+                    
recipeProperties.getProperty(HiveReplicationRecipeToolOptions
+                            .REPLICATION_TARGET_METASTORE_URI.getName()),
+                    
recipeProperties.getProperty(HiveReplicationRecipeToolOptions
+                            
.REPLICATION_TARGET_HIVE_METASTORE_KERBEROS_PRINCIPAL.getName()),
+                    
recipeProperties.getProperty(HiveReplicationRecipeToolOptions
+                            
.REPLICATION_TARGET_HIVE2_KERBEROS_PRINCIPAL.getName()));
+            // Verify db exists on target
+            for (String db : srcDbs) {
+                if (!dbExists(targetMetastoreClient, db)) {
+                    throw new Exception("Database " + db + " doesn't exist on 
target cluster");
+                }
+            }
+        } finally {
+            if (sourceMetastoreClient != null) {
+                sourceMetastoreClient.close();
+            }
+            if (targetMetastoreClient != null) {
+                targetMetastoreClient.close();
+            }
+        }
+    }
+
+    @Override
+    public Properties getAdditionalSystemProperties(final Properties 
recipeProperties) {
+        Properties additionalProperties = new Properties();
+        String recipeName = 
recipeProperties.getProperty(RecipeToolOptions.RECIPE_NAME.getName());
+        // Add recipe name as Hive DR job
+        
additionalProperties.put(HiveReplicationRecipeToolOptions.HIVE_DR_JOB_NAME.getName(),
 recipeName);
+        
additionalProperties.put(HiveReplicationRecipeToolOptions.CLUSTER_FOR_JOB_RUN.getName(),
+                
recipeProperties.getProperty(RecipeToolOptions.CLUSTER_NAME.getName()));
+        
additionalProperties.put(HiveReplicationRecipeToolOptions.CLUSTER_FOR_JOB_RUN_WRITE_EP.getName(),
+                
recipeProperties.getProperty(RecipeToolOptions.CLUSTER_HDFS_WRITE_ENDPOINT.getName()));
+        if 
(StringUtils.isNotEmpty(recipeProperties.getProperty(RecipeToolOptions.RECIPE_NN_PRINCIPAL.getName())))
 {
+            
additionalProperties.put(HiveReplicationRecipeToolOptions.CLUSTER_FOR_JOB_NN_KERBEROS_PRINCIPAL.getName(),
+                    
recipeProperties.getProperty(RecipeToolOptions.RECIPE_NN_PRINCIPAL.getName()));
+        }
+        return additionalProperties;
+    }
+
+    private HCatClient getHiveMetaStoreClient(String metastoreUrl, String 
metastorePrincipal,
+                                              String hive2Principal) throws 
Exception {
+        try {
+            HiveConf hcatConf = createHiveConf(new Configuration(false), 
metastoreUrl,
+                    metastorePrincipal, hive2Principal);
+            return HCatClient.create(hcatConf);
+        } catch (IOException e) {
+            throw new Exception("Exception creating HCatClient: " + 
e.getMessage(), e);
+        }
+    }
+
+    private static HiveConf createHiveConf(Configuration conf, String 
metastoreUrl, String metastorePrincipal,
+                                           String hive2Principal) throws 
IOException {
+        HiveConf hcatConf = new HiveConf(conf, HiveConf.class);
+
+        hcatConf.setVar(HiveConf.ConfVars.METASTOREURIS, metastoreUrl);
+        hcatConf.setIntVar(HiveConf.ConfVars.METASTORETHRIFTCONNECTIONRETRIES, 
3);
+        hcatConf.set(HiveConf.ConfVars.SEMANTIC_ANALYZER_HOOK.varname,
+                HCatSemanticAnalyzer.class.getName());
+        hcatConf.set(HiveConf.ConfVars.HIVE_SUPPORT_CONCURRENCY.varname, 
"false");
+
+        hcatConf.set(HiveConf.ConfVars.PREEXECHOOKS.varname, "");
+        hcatConf.set(HiveConf.ConfVars.POSTEXECHOOKS.varname, "");
+        if (StringUtils.isNotEmpty(metastorePrincipal)) {
+            
hcatConf.set(HiveConf.ConfVars.METASTORE_KERBEROS_PRINCIPAL.varname, 
metastorePrincipal);
+            hcatConf.set(HiveConf.ConfVars.METASTORE_USE_THRIFT_SASL.varname, 
"true");
+            hcatConf.set(HiveConf.ConfVars.METASTORE_EXECUTE_SET_UGI.varname, 
"true");
+        }
+        if (StringUtils.isNotEmpty(hive2Principal)) {
+            
hcatConf.set(HiveConf.ConfVars.HIVE_SERVER2_KERBEROS_PRINCIPAL.varname, 
hive2Principal);
+            
hcatConf.set(HiveConf.ConfVars.HIVE_SERVER2_AUTHENTICATION.varname, "kerberos");
+        }
+
+        return hcatConf;
+    }
+
+    private static boolean tableExists(HCatClient client, final String 
database, final String tableName)
+        throws Exception {
+        try {
+            HCatTable table = client.getTable(database, tableName);
+            return table != null;
+        } catch (ObjectNotFoundException e) {
+            System.out.println(e.getMessage());
+            return false;
+        } catch (HCatException e) {
+            throw new Exception("Exception checking if the table exists:" + 
e.getMessage(), e);
+        }
+    }
+
+    private static boolean dbExists(HCatClient client, final String database)
+        throws Exception {
+        try {
+            HCatDatabase db = client.getDatabase(database);
+            return db != null;
+        } catch (ObjectNotFoundException e) {
+            System.out.println(e.getMessage());
+            return false;
+        } catch (HCatException e) {
+            throw new Exception("Exception checking if the db exists:" + 
e.getMessage(), e);
+        }
+    }
+}

http://git-wip-us.apache.org/repos/asf/falcon/blob/cc1d3840/client/src/main/java/org/apache/falcon/recipe/HiveReplicationRecipeToolOptions.java
----------------------------------------------------------------------
diff --git 
a/client/src/main/java/org/apache/falcon/recipe/HiveReplicationRecipeToolOptions.java
 
b/client/src/main/java/org/apache/falcon/recipe/HiveReplicationRecipeToolOptions.java
new file mode 100644
index 0000000..ec0465d
--- /dev/null
+++ 
b/client/src/main/java/org/apache/falcon/recipe/HiveReplicationRecipeToolOptions.java
@@ -0,0 +1,89 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.falcon.recipe;
+
+/**
+ * Hive Recipe tool options.
+ */
+public enum HiveReplicationRecipeToolOptions {
+    REPLICATION_SOURCE_CLUSTER("sourceCluster", "Replication source cluster 
name"),
+    REPLICATION_SOURCE_METASTORE_URI("sourceMetastoreUri", "Source Hive 
metastore uri"),
+    REPLICATION_SOURCE_HS2_URI("sourceHiveServer2Uri", "Source HS2 uri"),
+    REPLICATION_SOURCE_DATABASE("sourceDatabase", "List of databases to 
replicate"),
+    REPLICATION_SOURCE_TABLE("sourceTable", "List of tables to replicate"),
+    REPLICATION_SOURCE_STAGING_PATH("sourceStagingPath", "Location of source 
staging path"),
+    REPLICATION_SOURCE_NN("sourceNN", "Source name node"),
+    REPLICATION_SOURCE_NN_KERBEROS_PRINCIPAL("sourceNNKerberosPrincipal", 
"Source name node kerberos principal", false),
+    
REPLICATION_SOURCE_HIVE_METASTORE_KERBEROS_PRINCIPAL("sourceHiveMetastoreKerberosPrincipal",
+            "Source hive metastore kerberos principal", false),
+    REPLICATION_SOURCE_HIVE2_KERBEROS_PRINCIPAL("sourceHive2KerberosPrincipal",
+            "Source hiveserver2 kerberos principal", false),
+
+    REPLICATION_TARGET_CLUSTER("targetCluster", "Replication target cluster 
name"),
+    REPLICATION_TARGET_METASTORE_URI("targetMetastoreUri", "Target Hive 
metastore uri"),
+    REPLICATION_TARGET_HS2_URI("targetHiveServer2Uri", "Target HS2 uri"),
+    REPLICATION_TARGET_STAGING_PATH("targetStagingPath", "Location of target 
staging path"),
+    REPLICATION_TARGET_NN("targetNN", "Target name node"),
+    REPLICATION_TARGET_NN_KERBEROS_PRINCIPAL("targetNNKerberosPrincipal", 
"Target name node kerberos principal", false),
+    
REPLICATION_TARGET_HIVE_METASTORE_KERBEROS_PRINCIPAL("targetHiveMetastoreKerberosPrincipal",
+            "Target hive metastore kerberos principal", false),
+    REPLICATION_TARGET_HIVE2_KERBEROS_PRINCIPAL("targetHive2KerberosPrincipal",
+            "Target hiveserver2 kerberos principal", false),
+
+    REPLICATION_MAX_EVENTS("maxEvents", "Maximum events to replicate"),
+    REPLICATION_MAX_MAPS("replicationMaxMaps", "Maximum number of maps used 
during replication"),
+    DISTCP_MAX_MAPS("distcpMaxMaps", "Maximum number of maps used during 
distcp"),
+    REPLICATION_MAP_BANDWIDTH_IN_MB("distcpMapBandwidth", "Bandwidth in MB/s 
used by each mapper during replication"),
+    CLUSTER_FOR_JOB_RUN("clusterForJobRun", "Cluster on which replication job 
runs", false),
+    CLUSTER_FOR_JOB_NN_KERBEROS_PRINCIPAL("clusterForJobNNKerberosPrincipal",
+            "Write EP of cluster on which replication job runs", false),
+    CLUSTER_FOR_JOB_RUN_WRITE_EP("clusterForJobRunWriteEP", "Write EP of 
cluster on which replication job runs", false),
+    HIVE_DR_JOB_NAME("drJobName", "Unique hive DR job name", false);
+
+    private final String name;
+    private final String description;
+    private final boolean isRequired;
+
+    HiveReplicationRecipeToolOptions(String name, String description) {
+        this(name, description, true);
+    }
+
+    HiveReplicationRecipeToolOptions(String name, String description, boolean 
isRequired) {
+        this.name = name;
+        this.description = description;
+        this.isRequired = isRequired;
+    }
+
+    public String getName() {
+        return this.name;
+    }
+
+    public String getDescription() {
+        return description;
+    }
+
+    public boolean isRequired() {
+        return isRequired;
+    }
+
+    @Override
+    public String toString() {
+        return getName();
+    }
+}

http://git-wip-us.apache.org/repos/asf/falcon/blob/cc1d3840/client/src/main/java/org/apache/falcon/recipe/Recipe.java
----------------------------------------------------------------------
diff --git a/client/src/main/java/org/apache/falcon/recipe/Recipe.java 
b/client/src/main/java/org/apache/falcon/recipe/Recipe.java
new file mode 100644
index 0000000..609131d
--- /dev/null
+++ b/client/src/main/java/org/apache/falcon/recipe/Recipe.java
@@ -0,0 +1,29 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.falcon.recipe;
+
+import java.util.Properties;
+
+/**
+ * Recipe interface.
+ */
+public interface Recipe {
+    void validate(final Properties recipeProperties) throws Exception;
+    Properties getAdditionalSystemProperties(final Properties 
recipeProperties);
+}

http://git-wip-us.apache.org/repos/asf/falcon/blob/cc1d3840/client/src/main/java/org/apache/falcon/recipe/RecipeFactory.java
----------------------------------------------------------------------
diff --git a/client/src/main/java/org/apache/falcon/recipe/RecipeFactory.java 
b/client/src/main/java/org/apache/falcon/recipe/RecipeFactory.java
new file mode 100644
index 0000000..32b0871
--- /dev/null
+++ b/client/src/main/java/org/apache/falcon/recipe/RecipeFactory.java
@@ -0,0 +1,44 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.falcon.recipe;
+
+import org.apache.falcon.cli.FalconCLI.RecipeOperation;
+
+/**
+ * Recipe factory.
+ */
+public final class RecipeFactory {
+
+    private RecipeFactory() {
+    }
+
+    public static Recipe getRecipeToolType(String recipeType) {
+        if (recipeType == null) {
+            return null;
+        }
+
+        if 
(RecipeOperation.HDFS_REPLICATION.toString().equalsIgnoreCase(recipeType)) {
+            return new HdfsReplicationRecipeTool();
+        } else if 
(RecipeOperation.HIVE_DISASTER_RECOVERY.toString().equalsIgnoreCase(recipeType))
 {
+            return new HiveReplicationRecipeTool();
+        }
+        return null;
+    }
+
+}

http://git-wip-us.apache.org/repos/asf/falcon/blob/cc1d3840/client/src/main/java/org/apache/falcon/recipe/RecipeTool.java
----------------------------------------------------------------------
diff --git a/client/src/main/java/org/apache/falcon/recipe/RecipeTool.java 
b/client/src/main/java/org/apache/falcon/recipe/RecipeTool.java
index 069db9f..243ff4d 100644
--- a/client/src/main/java/org/apache/falcon/recipe/RecipeTool.java
+++ b/client/src/main/java/org/apache/falcon/recipe/RecipeTool.java
@@ -19,44 +19,44 @@
 package org.apache.falcon.recipe;
 
 import org.apache.commons.cli.CommandLine;
-import org.apache.commons.cli.ParseException;
 import org.apache.commons.cli.GnuParser;
 import org.apache.commons.cli.Option;
+import org.apache.commons.cli.Options;
+import org.apache.commons.cli.ParseException;
 import org.apache.commons.io.FileUtils;
 import org.apache.commons.io.FilenameUtils;
 import org.apache.commons.io.IOUtils;
 import org.apache.commons.lang3.StringUtils;
+import org.apache.falcon.recipe.util.RecipeProcessBuilderUtils;
 import org.apache.hadoop.conf.Configuration;
 import org.apache.hadoop.conf.Configured;
 import org.apache.hadoop.fs.FileSystem;
 import org.apache.hadoop.fs.Path;
+import org.apache.hadoop.fs.permission.FsAction;
+import org.apache.hadoop.fs.permission.FsPermission;
+import org.apache.hadoop.security.UserGroupInformation;
 import org.apache.hadoop.util.Tool;
 import org.apache.hadoop.util.ToolRunner;
-import org.apache.commons.cli.Options;
 
-import java.io.BufferedReader;
 import java.io.File;
-import java.io.InputStream;
-import java.io.IOException;
 import java.io.FileInputStream;
-import java.io.FileOutputStream;
-import java.io.FileReader;
-import java.io.OutputStream;
-import java.util.Map;
+import java.io.IOException;
+import java.io.InputStream;
+import java.net.URI;
+import java.security.PrivilegedExceptionAction;
 import java.util.HashMap;
+import java.util.Map;
 import java.util.Properties;
-import java.util.regex.Matcher;
-import java.util.regex.Pattern;
 
 /**
  * Base recipe tool for Falcon recipes.
  */
 public class RecipeTool extends Configured implements Tool {
     private static final String HDFS_WF_PATH = "falcon" + File.separator + 
"recipes" + File.separator;
-    private static final String RECIPE_PREFIX = "falcon.recipe.";
-    private static final Pattern RECIPE_VAR_PATTERN = 
Pattern.compile("##[A-Za-z0-9_.]*##");
-
-    private FileSystem hdfsFileSystem;
+    private static final FsPermission FS_PERMISSION =
+            new FsPermission(FsAction.ALL, FsAction.READ, FsAction.NONE);
+    private static final String FS_DEFAULT_NAME_KEY = "fs.defaultFS";
+    private static final String NN_PRINCIPAL = 
"dfs.namenode.kerberos.principal";
 
     public static void main(String[] args) throws Exception {
         ToolRunner.run(new Configuration(), new RecipeTool(), args);
@@ -64,25 +64,38 @@ public class RecipeTool extends Configured implements Tool {
 
     @Override
     public int run(String[] arguments) throws Exception {
+
         Map<RecipeToolArgs, String> argMap = setupArgs(arguments);
         if (argMap == null || argMap.isEmpty()) {
             throw new Exception("Arguments passed to recipe is null");
         }
-
+        Configuration conf = getConf();
         String recipePropertiesFilePath = 
argMap.get(RecipeToolArgs.RECIPE_PROPERTIES_FILE_ARG);
         Properties recipeProperties = loadProperties(recipePropertiesFilePath);
         validateProperties(recipeProperties);
 
-        FileSystem fs = getFileSystemForHdfs(recipeProperties);
+        String recipeOperation = 
argMap.get(RecipeToolArgs.RECIPE_OPERATION_ARG);
+        Recipe recipeType = RecipeFactory.getRecipeToolType(recipeOperation);
+        if (recipeType != null) {
+            recipeType.validate(recipeProperties);
+            Properties props = 
recipeType.getAdditionalSystemProperties(recipeProperties);
+            if (props != null && !props.isEmpty()) {
+                recipeProperties.putAll(props);
+            }
+        }
 
+        String processFilename;
+
+        FileSystem fs = getFileSystemForHdfs(recipeProperties, conf);
         validateArtifacts(recipeProperties, fs);
 
-        String recipeName = 
FilenameUtils.getBaseName(recipePropertiesFilePath);
+        String recipeName = 
recipeProperties.getProperty(RecipeToolOptions.RECIPE_NAME.getName());
         copyFilesToHdfsIfRequired(recipeProperties, fs, recipeName);
 
-        Map<String, String> overlayMap = getOverlay(recipeProperties);
-        String processFilename = 
overlayParametersOverTemplate(argMap.get(RecipeToolArgs.RECIPE_FILE_ARG),
-                argMap.get(RecipeToolArgs.RECIPE_PROCESS_XML_FILE_PATH_ARG), 
overlayMap);
+        processFilename = 
RecipeProcessBuilderUtils.createProcessFromTemplate(argMap.get(RecipeToolArgs
+                .RECIPE_FILE_ARG), recipeProperties, 
argMap.get(RecipeToolArgs.RECIPE_PROCESS_XML_FILE_PATH_ARG));
+
+
         System.out.println("Generated process file to be scheduled: ");
         System.out.println(FileUtils.readFileToString(new 
File(processFilename)));
 
@@ -98,7 +111,7 @@ public class RecipeTool extends Configured implements Tool {
             addOption(options, arg, arg.isRequired());
         }
 
-        CommandLine cmd =  new GnuParser().parse(options, arguments);
+        CommandLine cmd = new GnuParser().parse(options, arguments);
         for (RecipeToolArgs arg : RecipeToolArgs.values()) {
             String optionValue = arg.getOptionValue(cmd);
             if (StringUtils.isNotEmpty(optionValue)) {
@@ -135,7 +148,7 @@ public class RecipeTool extends Configured implements Tool {
         }
     }
 
-    private static void validateArtifacts(final Properties recipeProperties, 
final FileSystem fs) throws Exception{
+    private static void validateArtifacts(final Properties recipeProperties, 
final FileSystem fs) throws Exception {
         // validate the WF path
         String wfPath = 
recipeProperties.getProperty(RecipeToolOptions.WORKFLOW_PATH.getName());
 
@@ -156,53 +169,6 @@ public class RecipeTool extends Configured implements Tool 
{
         }
     }
 
-    private static Map<String, String> getOverlay(final Properties 
recipeProperties) {
-        Map<String, String> overlay = new HashMap<String, String>();
-        for (Map.Entry<Object, Object> entry : recipeProperties.entrySet()) {
-            String key = StringUtils.removeStart((String) entry.getKey(), 
RECIPE_PREFIX);
-            overlay.put(key, (String) entry.getValue());
-        }
-
-        return overlay;
-    }
-
-    private static String overlayParametersOverTemplate(final String 
templateFile,
-                                                        final String 
outFilename,
-                                                        Map<String, String> 
overlay) throws Exception {
-        if (templateFile == null || outFilename == null || overlay == null || 
overlay.isEmpty()) {
-            throw new IllegalArgumentException("Invalid arguments passed");
-        }
-
-        String line;
-        OutputStream out = null;
-        BufferedReader reader = null;
-
-        try {
-            out = new FileOutputStream(outFilename);
-
-            reader = new BufferedReader(new FileReader(templateFile));
-            while ((line = reader.readLine()) != null) {
-                Matcher matcher = RECIPE_VAR_PATTERN.matcher(line);
-                while (matcher.find()) {
-                    String variable = line.substring(matcher.start(), 
matcher.end());
-                    String paramString = overlay.get(variable.substring(2, 
variable.length() - 2));
-                    if (paramString == null) {
-                        throw new Exception("Match not found for the template: 
" + variable
-                                + ". Please add it in recipe properties file");
-                    }
-                    line = line.replace(variable, paramString);
-                    matcher = RECIPE_VAR_PATTERN.matcher(line);
-                }
-                out.write(line.getBytes());
-                out.write("\n".getBytes());
-            }
-        } finally {
-            IOUtils.closeQuietly(reader);
-            IOUtils.closeQuietly(out);
-        }
-        return outFilename;
-    }
-
     private static void copyFilesToHdfsIfRequired(final Properties 
recipeProperties,
                                                   final FileSystem fs,
                                                   final String recipeName) 
throws Exception {
@@ -262,7 +228,7 @@ public class RecipeTool extends Configured implements Tool {
     private static void createDirOnHdfs(String path, FileSystem fs) throws 
IOException {
         Path hdfsPath = new Path(path);
         if (!fs.exists(hdfsPath)) {
-            fs.mkdirs(hdfsPath);
+            FileSystem.mkdirs(fs, hdfsPath, FS_PERMISSION);
         }
     }
 
@@ -287,19 +253,33 @@ public class RecipeTool extends Configured implements 
Tool {
         fs.copyFromLocalFile(false, true, new Path(localFilePath), new 
Path(hdfsFilePath));
     }
 
-    private static Configuration getConfiguration(final String 
storageEndpoint) throws Exception {
-        Configuration conf = new Configuration();
-        conf.set("fs.defaultFS", storageEndpoint);
-        return conf;
+    private FileSystem getFileSystemForHdfs(final Properties recipeProperties,
+                                            final Configuration conf) throws 
Exception {
+        String storageEndpoint = 
RecipeToolOptions.CLUSTER_HDFS_WRITE_ENDPOINT.getName();
+        String nameNode = recipeProperties.getProperty(storageEndpoint);
+        conf.set(FS_DEFAULT_NAME_KEY, nameNode);
+        if (UserGroupInformation.isSecurityEnabled()) {
+            String nameNodePrincipal = 
recipeProperties.getProperty(RecipeToolOptions.RECIPE_NN_PRINCIPAL.getName());
+            conf.set(NN_PRINCIPAL, nameNodePrincipal);
+        }
+        return createFileSystem(UserGroupInformation.getLoginUser(), new 
URI(nameNode), conf);
     }
 
-    private FileSystem getFileSystemForHdfs(final Properties recipeProperties) 
throws Exception {
-        if (hdfsFileSystem == null) {
-            String storageEndpoint = 
RecipeToolOptions.SOURCE_CLUSTER_HDFS_WRITE_ENDPOINT.getName();
-            hdfsFileSystem =  FileSystem.get(
-                    
getConfiguration(recipeProperties.getProperty(storageEndpoint)));
-        }
+    private FileSystem createFileSystem(UserGroupInformation ugi, final URI 
uri,
+                                       final Configuration conf) throws 
Exception {
+        try {
+            final String proxyUserName = ugi.getShortUserName();
+            if 
(proxyUserName.equals(UserGroupInformation.getLoginUser().getShortUserName())) {
+                return FileSystem.get(uri, conf);
+            }
 
-        return hdfsFileSystem;
+            return ugi.doAs(new PrivilegedExceptionAction<FileSystem>() {
+                public FileSystem run() throws Exception {
+                    return FileSystem.get(uri, conf);
+                }
+            });
+        } catch (InterruptedException ex) {
+            throw new IOException("Exception creating FileSystem:" + 
ex.getMessage(), ex);
+        }
     }
 }

http://git-wip-us.apache.org/repos/asf/falcon/blob/cc1d3840/client/src/main/java/org/apache/falcon/recipe/RecipeToolArgs.java
----------------------------------------------------------------------
diff --git a/client/src/main/java/org/apache/falcon/recipe/RecipeToolArgs.java 
b/client/src/main/java/org/apache/falcon/recipe/RecipeToolArgs.java
index baa4846..79d8f18 100644
--- a/client/src/main/java/org/apache/falcon/recipe/RecipeToolArgs.java
+++ b/client/src/main/java/org/apache/falcon/recipe/RecipeToolArgs.java
@@ -25,10 +25,11 @@ import org.apache.commons.cli.Option;
  * Recipe tool args.
  */
 public enum RecipeToolArgs {
-    RECIPE_FILE_ARG("file", "recipe file path"),
+    RECIPE_FILE_ARG("file", "recipe template file path"),
     RECIPE_PROPERTIES_FILE_ARG("propertiesFile", "recipe properties file 
path"),
     RECIPE_PROCESS_XML_FILE_PATH_ARG(
-            "recipeProcessFilePath", "file path of recipe process to be 
submitted");
+            "recipeProcessFilePath", "file path of recipe process to be 
submitted"),
+    RECIPE_OPERATION_ARG("recipeOperation", "recipe operation");
 
     private final String name;
     private final String description;

http://git-wip-us.apache.org/repos/asf/falcon/blob/cc1d3840/client/src/main/java/org/apache/falcon/recipe/RecipeToolOptions.java
----------------------------------------------------------------------
diff --git 
a/client/src/main/java/org/apache/falcon/recipe/RecipeToolOptions.java 
b/client/src/main/java/org/apache/falcon/recipe/RecipeToolOptions.java
index a1c29cd..5df9b0a 100644
--- a/client/src/main/java/org/apache/falcon/recipe/RecipeToolOptions.java
+++ b/client/src/main/java/org/apache/falcon/recipe/RecipeToolOptions.java
@@ -18,19 +18,43 @@
 
 package org.apache.falcon.recipe;
 
+import java.util.Map;
+import java.util.HashMap;
+
 /**
  * Recipe tool options.
  */
 public enum RecipeToolOptions {
-    SOURCE_CLUSTER_HDFS_WRITE_ENDPOINT(
-            "falcon.recipe.src.cluster.hdfs.writeEndPoint", "source cluster 
HDFS write endpoint"),
+    RECIPE_NAME("falcon.recipe.name", "Recipe name", false),
+    CLUSTER_NAME("falcon.recipe.cluster.name", "Cluster name where replication 
job should run", false),
+    CLUSTER_HDFS_WRITE_ENDPOINT(
+            "falcon.recipe.cluster.hdfs.writeEndPoint", "Cluster HDFS write 
endpoint"),
+    CLUSTER_VALIDITY_START("falcon.recipe.cluster.validity.start", "Source 
cluster validity start", false),
+    CLUSTER_VALIDITY_END("falcon.recipe.cluster.validity.end", "Source cluster 
validity end", false),
+    WORKFLOW_NAME("falcon.recipe.workflow.name", "Workflow name", false),
     WORKFLOW_PATH("falcon.recipe.workflow.path", "Workflow path", false),
-    WORKFLOW_LIB_PATH("falcon.recipe.workflow.lib.path", "WF lib path", false);
+    WORKFLOW_LIB_PATH("falcon.recipe.workflow.lib.path", "WF lib path", false),
+    PROCESS_FREQUENCY("falcon.recipe.process.frequency", "Process frequency", 
false),
+    RETRY_POLICY("falcon.recipe.retry.policy", "Retry policy", false),
+    RETRY_DELAY("falcon.recipe.retry.delay", "Retry delay", false),
+    RETRY_ATTEMPTS("falcon.recipe.retry.attempts", "Retry attempts", false),
+    RECIPE_TAGS("falcon.recipe.tags", "Recipe tags", false),
+    RECIPE_ACL_OWNER("falcon.recipe.acl.owner", "Recipe acl owner", false),
+    RECIPE_ACL_GROUP("falcon.recipe.acl.group", "Recipe acl group", false),
+    RECIPE_ACL_PERMISSION("falcon.recipe.acl.permission", "Recipe acl 
permission", false),
+    RECIPE_NN_PRINCIPAL("falcon.recipe.nn.principal", "Recipe DFS NN 
principal", false);
 
     private final String name;
     private final String description;
     private final boolean isRequired;
 
+    public static final Map<String, RecipeToolOptions> OPTIONSMAP = new 
HashMap<>();
+    static {
+        for (RecipeToolOptions c : RecipeToolOptions.values()) {
+            OPTIONSMAP.put(c.getName(), c);
+        }
+    }
+
     RecipeToolOptions(String name, String description) {
         this(name, description, true);
     }

http://git-wip-us.apache.org/repos/asf/falcon/blob/cc1d3840/client/src/main/java/org/apache/falcon/recipe/util/RecipeProcessBuilderUtils.java
----------------------------------------------------------------------
diff --git 
a/client/src/main/java/org/apache/falcon/recipe/util/RecipeProcessBuilderUtils.java
 
b/client/src/main/java/org/apache/falcon/recipe/util/RecipeProcessBuilderUtils.java
new file mode 100644
index 0000000..9522816
--- /dev/null
+++ 
b/client/src/main/java/org/apache/falcon/recipe/util/RecipeProcessBuilderUtils.java
@@ -0,0 +1,272 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.falcon.recipe.util;
+
+import org.apache.commons.io.IOUtils;
+import org.apache.commons.lang3.StringUtils;
+import org.apache.falcon.entity.v0.Entity;
+import org.apache.falcon.entity.v0.EntityType;
+import org.apache.falcon.entity.v0.Frequency;
+import org.apache.falcon.entity.v0.SchemaHelper;
+import org.apache.falcon.entity.v0.process.ACL;
+import org.apache.falcon.entity.v0.process.Cluster;
+import org.apache.falcon.entity.v0.process.PolicyType;
+import org.apache.falcon.entity.v0.process.Property;
+import org.apache.falcon.entity.v0.process.Retry;
+import org.apache.falcon.entity.v0.process.Workflow;
+import org.apache.falcon.recipe.RecipeToolOptions;
+
+import javax.xml.bind.JAXBException;
+import javax.xml.bind.Unmarshaller;
+import javax.xml.bind.ValidationEvent;
+import javax.xml.bind.ValidationEventHandler;
+import java.io.BufferedReader;
+import java.io.File;
+import java.io.FileOutputStream;
+import java.io.FileReader;
+import java.io.OutputStream;
+import java.net.URL;
+import java.util.ArrayList;
+import java.util.List;
+import java.util.Map;
+import java.util.Properties;
+import java.util.regex.Matcher;
+import java.util.regex.Pattern;
+
+/**
+ * Recipe builder utility.
+ */
+public final class RecipeProcessBuilderUtils {
+
+    private static final Pattern RECIPE_VAR_PATTERN = 
Pattern.compile("##[A-Za-z0-9_.]*##");
+
+    private RecipeProcessBuilderUtils() {
+    }
+
+    public static String createProcessFromTemplate(final String 
processTemplateFile, final Properties recipeProperties,
+                                                   final String 
processFilename) throws Exception {
+        org.apache.falcon.entity.v0.process.Process process = 
bindAttributesInTemplate(
+                processTemplateFile, recipeProperties);
+        String recipeProcessFilename = createProcessXmlFile(processFilename, 
process);
+
+        validateProcessXmlFile(recipeProcessFilename);
+        return recipeProcessFilename;
+    }
+
+    private static org.apache.falcon.entity.v0.process.Process
+    bindAttributesInTemplate(final String templateFile, final Properties 
recipeProperties)
+        throws Exception {
+        if (templateFile == null || recipeProperties == null) {
+            throw new IllegalArgumentException("Invalid arguments passed");
+        }
+
+        Unmarshaller unmarshaller = EntityType.PROCESS.getUnmarshaller();
+        // Validation can be skipped for unmarshalling as we want to bind 
tempalte with the properties. Vaildation is
+        // hanles as part of marshalling
+        unmarshaller.setSchema(null);
+        unmarshaller.setEventHandler(new ValidationEventHandler() {
+                public boolean handleEvent(ValidationEvent validationEvent) {
+                    return true;
+                }
+            }
+        );
+
+        URL processResourceUrl = new File(templateFile).toURI().toURL();
+        org.apache.falcon.entity.v0.process.Process process =
+                (org.apache.falcon.entity.v0.process.Process) 
unmarshaller.unmarshal(processResourceUrl);
+
+        /* For optional properties user might directly set them in the process 
xml and might not set it in properties
+           file. Before doing the submission validation is done to confirm 
process xml doesn't have RECIPE_VAR_PATTERN
+        */
+
+        String processName = 
recipeProperties.getProperty(RecipeToolOptions.RECIPE_NAME.getName());
+        if (StringUtils.isNotEmpty(processName)) {
+            process.setName(processName);
+        }
+
+        // DR process template has only one cluster
+        bindClusterProperties(process.getClusters().getClusters().get(0), 
recipeProperties);
+
+        // bind scheduling properties
+        String processFrequency = 
recipeProperties.getProperty(RecipeToolOptions.PROCESS_FREQUENCY.getName());
+        if (StringUtils.isNotEmpty(processFrequency)) {
+            process.setFrequency(Frequency.fromString(processFrequency));
+        }
+
+        bindWorkflowProperties(process.getWorkflow(), recipeProperties);
+        bindRetryProperties(process.getRetry(), recipeProperties);
+        bindACLProperties(process.getACL(), recipeProperties);
+        bindTagsProperties(process, recipeProperties);
+        bindCustomProperties(process.getProperties(), recipeProperties);
+
+        return process;
+    }
+
+    private static void bindClusterProperties(final Cluster cluster,
+                                              final Properties 
recipeProperties) {
+        // DR process template has only one cluster
+        String clusterName = 
recipeProperties.getProperty(RecipeToolOptions.CLUSTER_NAME.getName());
+        if (StringUtils.isNotEmpty(clusterName)) {
+            cluster.setName(clusterName);
+        }
+
+        String clusterStartValidity = 
recipeProperties.getProperty(RecipeToolOptions.CLUSTER_VALIDITY_START.getName());
+        if (StringUtils.isNotEmpty(clusterStartValidity)) {
+            
cluster.getValidity().setStart(SchemaHelper.parseDateUTC(clusterStartValidity));
+        }
+
+        String clusterEndValidity = 
recipeProperties.getProperty(RecipeToolOptions.CLUSTER_VALIDITY_END.getName());
+        if (StringUtils.isNotEmpty(clusterEndValidity)) {
+            
cluster.getValidity().setEnd(SchemaHelper.parseDateUTC(clusterEndValidity));
+        }
+    }
+
+    private static void bindWorkflowProperties(final Workflow wf,
+                                               final Properties 
recipeProperties) {
+        String wfName = 
recipeProperties.getProperty(RecipeToolOptions.WORKFLOW_NAME.getName());
+        if (StringUtils.isNotEmpty(wfName)) {
+            wf.setName(wfName);
+        }
+
+        String wfLibPath = 
recipeProperties.getProperty(RecipeToolOptions.WORKFLOW_LIB_PATH.getName());
+        if (StringUtils.isNotEmpty(wfLibPath)) {
+            wf.setLib(wfLibPath);
+        } else if (wf.getLib().startsWith("##")) {
+            wf.setLib("");
+        }
+
+        String wfPath = 
recipeProperties.getProperty(RecipeToolOptions.WORKFLOW_PATH.getName());
+        if (StringUtils.isNotEmpty(wfPath)) {
+            wf.setPath(wfPath);
+        }
+    }
+
+    private static void bindRetryProperties(final Retry processRetry,
+                                            final Properties recipeProperties) 
{
+        String retryPolicy = 
recipeProperties.getProperty(RecipeToolOptions.RETRY_POLICY.getName());
+        if (StringUtils.isNotEmpty(retryPolicy)) {
+            processRetry.setPolicy(PolicyType.fromValue(retryPolicy));
+        }
+
+        String retryAttempts = 
recipeProperties.getProperty(RecipeToolOptions.RETRY_ATTEMPTS.getName());
+        if (StringUtils.isNotEmpty(retryAttempts)) {
+            processRetry.setAttempts(Integer.parseInt(retryAttempts));
+        }
+
+        String retryDelay = 
recipeProperties.getProperty(RecipeToolOptions.RETRY_DELAY.getName());
+        if (StringUtils.isNotEmpty(retryDelay)) {
+            processRetry.setDelay(Frequency.fromString(retryDelay));
+        }
+    }
+
+    private static void bindACLProperties(final ACL acl,
+                                          final Properties recipeProperties) {
+        String aclowner = 
recipeProperties.getProperty(RecipeToolOptions.RECIPE_ACL_OWNER.getName());
+        if (StringUtils.isNotEmpty(aclowner)) {
+            acl.setOwner(aclowner);
+        }
+
+        String aclGroup = 
recipeProperties.getProperty(RecipeToolOptions.RECIPE_ACL_GROUP.getName());
+        if (StringUtils.isNotEmpty(aclGroup)) {
+            acl.setGroup(aclGroup);
+        }
+
+        String aclPermission = 
recipeProperties.getProperty(RecipeToolOptions.RECIPE_ACL_PERMISSION.getName());
+        if (StringUtils.isNotEmpty(aclPermission)) {
+            acl.setPermission(aclPermission);
+        }
+    }
+
+    private static void bindTagsProperties(final 
org.apache.falcon.entity.v0.process.Process process,
+                                           final Properties recipeProperties) {
+        String falconSystemTags = process.getTags();
+        String tags = 
recipeProperties.getProperty(RecipeToolOptions.RECIPE_TAGS.getName());
+        if (StringUtils.isNotEmpty(tags)) {
+            if (StringUtils.isNotEmpty(falconSystemTags)) {
+                tags += ", " + falconSystemTags;
+            }
+            process.setTags(tags);
+        }
+    }
+
+
+    private static void bindCustomProperties(final 
org.apache.falcon.entity.v0.process.Properties customProperties,
+                                             final Properties 
recipeProperties) {
+        List<Property> propertyList = new ArrayList<>();
+
+        for (Map.Entry<Object, Object> recipeProperty : 
recipeProperties.entrySet()) {
+            if 
(RecipeToolOptions.OPTIONSMAP.get(recipeProperty.getKey().toString()) == null) {
+                addProperty(propertyList, (String) recipeProperty.getKey(), 
(String) recipeProperty.getValue());
+            }
+        }
+
+        customProperties.getProperties().addAll(propertyList);
+    }
+
+    private static void addProperty(List<Property> propertyList, String name, 
String value) {
+        Property prop = new Property();
+        prop.setName(name);
+        prop.setValue(value);
+        propertyList.add(prop);
+    }
+
+    private static String createProcessXmlFile(final String outFilename,
+                                               final Entity entity) throws 
Exception {
+        if (outFilename == null || entity == null) {
+            throw new IllegalArgumentException("Invalid arguments passed");
+        }
+
+        EntityType type = EntityType.PROCESS;
+        OutputStream out = null;
+        try {
+            out = new FileOutputStream(outFilename);
+            type.getMarshaller().marshal(entity, out);
+        } catch (JAXBException e) {
+            throw new Exception("Unable to serialize the entity object " + 
type + "/" + entity.getName(), e);
+        } finally {
+            IOUtils.closeQuietly(out);
+        }
+        return outFilename;
+    }
+
+    private static void validateProcessXmlFile(final String processFileName) 
throws Exception {
+        if (processFileName == null) {
+            throw new IllegalArgumentException("Invalid arguments passed");
+        }
+
+        String line;
+        BufferedReader reader = null;
+
+        try {
+            reader = new BufferedReader(new FileReader(processFileName));
+            while ((line = reader.readLine()) != null) {
+                Matcher matcher = RECIPE_VAR_PATTERN.matcher(line);
+                if (matcher.find()) {
+                    String variable = line.substring(matcher.start(), 
matcher.end());
+                    throw new Exception("Match not found for the template: " + 
variable
+                            + " in recipe template file. Please add it in 
recipe properties file");
+                }
+            }
+        } finally {
+            IOUtils.closeQuietly(reader);
+        }
+
+    }
+
+}

http://git-wip-us.apache.org/repos/asf/falcon/blob/cc1d3840/docs/src/site/twiki/InstallationSteps.twiki
----------------------------------------------------------------------
diff --git a/docs/src/site/twiki/InstallationSteps.twiki 
b/docs/src/site/twiki/InstallationSteps.twiki
index 3dd034b..90d765b 100644
--- a/docs/src/site/twiki/InstallationSteps.twiki
+++ b/docs/src/site/twiki/InstallationSteps.twiki
@@ -31,6 +31,8 @@ It builds and installs the package into the local repository, 
for use as a depen
 [optionally -Doozie.version=<<oozie version>> can be appended to build with a 
specific version of Oozie. Oozie versions
 >= 4 are supported]
 NOTE: Falcon builds with JDK 1.7 using -noverify option
+      To compile Falcon with Hive Replication, optionally "-P hadoop-2,hivedr" 
can be appended. For this Hive >= 1.2.0
+      and Oozie >= 4.2.0 should be available.
 
 
 

http://git-wip-us.apache.org/repos/asf/falcon/blob/cc1d3840/oozie/src/main/java/org/apache/falcon/service/SharedLibraryHostingService.java
----------------------------------------------------------------------
diff --git 
a/oozie/src/main/java/org/apache/falcon/service/SharedLibraryHostingService.java
 
b/oozie/src/main/java/org/apache/falcon/service/SharedLibraryHostingService.java
index e3de6a4..49fb4f7 100644
--- 
a/oozie/src/main/java/org/apache/falcon/service/SharedLibraryHostingService.java
+++ 
b/oozie/src/main/java/org/apache/falcon/service/SharedLibraryHostingService.java
@@ -37,6 +37,7 @@ import org.slf4j.LoggerFactory;
 
 import java.io.File;
 import java.io.IOException;
+import java.util.Arrays;
 import java.util.Properties;
 
 /**
@@ -47,10 +48,15 @@ public class SharedLibraryHostingService implements 
ConfigurationChangeListener
 
     private static final String[] LIBS = 
StartupProperties.get().getProperty("shared.libs").split(",");
 
-    private static final FalconPathFilter NON_FALCON_JAR_FILTER = new 
FalconPathFilter() {
+    private static class FalconLibPath implements  FalconPathFilter {
+        private String[] shareLibs;
+        FalconLibPath(String[] libList) {
+            this.shareLibs = Arrays.copyOf(libList, libList.length);
+        }
+
         @Override
         public boolean accept(Path path) {
-            for (String jarName : LIBS) {
+            for (String jarName : shareLibs) {
                 if (path.getName().startsWith(jarName)) {
                     return true;
                 }
@@ -60,7 +66,7 @@ public class SharedLibraryHostingService implements 
ConfigurationChangeListener
 
         @Override
         public String getJarName(Path path) {
-            for (String jarName : LIBS) {
+            for (String jarName : shareLibs) {
                 if (path.getName().startsWith(jarName)) {
                     return jarName;
                 }
@@ -84,9 +90,10 @@ public class SharedLibraryHostingService implements 
ConfigurationChangeListener
                     "lib");
             Path libext = new Path(ClusterHelper.getLocation(cluster, 
ClusterLocationType.WORKING).getPath(),
                     "libext");
+            FalconPathFilter nonFalconJarFilter = new FalconLibPath(LIBS);
             Properties properties = StartupProperties.get();
             pushLibsToHDFS(fs, properties.getProperty("system.lib.location"), 
lib,
-                    NON_FALCON_JAR_FILTER);
+                    nonFalconJarFilter);
             pushLibsToHDFS(fs, properties.getProperty("libext.paths"), libext, 
null);
             pushLibsToHDFS(fs, properties.getProperty("libext.feed.paths"),
                     new Path(libext, EntityType.FEED.name()) , null);
@@ -107,7 +114,6 @@ public class SharedLibraryHostingService implements 
ConfigurationChangeListener
         if (StringUtils.isEmpty(src)) {
             return;
         }
-
         LOG.debug("Copying libs from {}", src);
         createTargetPath(fs, target);
 

http://git-wip-us.apache.org/repos/asf/falcon/blob/cc1d3840/pom.xml
----------------------------------------------------------------------
diff --git a/pom.xml b/pom.xml
index 34a5471..ec04bdf 100644
--- a/pom.xml
+++ b/pom.xml
@@ -130,7 +130,7 @@
                                <artifactId>hadoop-client</artifactId>
                                <version>${hadoop.version}</version>
                         <scope>provided</scope>
-                               <exclusions>
+                        <exclusions>
                             <exclusion>
                                 <groupId>com.sun.jersey</groupId>
                                 <artifactId>jersey-server</artifactId>
@@ -151,12 +151,12 @@
                                 <groupId>org.apache.hadoop</groupId>
                                 <artifactId>hadoop-annotations</artifactId>
                             </exclusion>
-                               </exclusions>
-                           </dependency>
-                           <dependency>
-                               <groupId>org.apache.hadoop</groupId>
-                               <artifactId>hadoop-hdfs</artifactId>
-                               <version>${hadoop.version}</version>
+                        </exclusions>
+                    </dependency>
+                    <dependency>
+                        <groupId>org.apache.hadoop</groupId>
+                        <artifactId>hadoop-hdfs</artifactId>
+                        <version>${hadoop.version}</version>
                         <scope>provided</scope>
                         <exclusions>
                             <exclusion>
@@ -166,10 +166,10 @@
                         </exclusions>
                     </dependency>
 
-                           <dependency>
-                               <groupId>org.apache.hadoop</groupId>
-                               <artifactId>hadoop-common</artifactId>
-                               <version>${hadoop.version}</version>
+                    <dependency>
+                        <groupId>org.apache.hadoop</groupId>
+                        <artifactId>hadoop-common</artifactId>
+                        <version>${hadoop.version}</version>
                         <scope>provided</scope>
                         <exclusions>
                             <exclusion>
@@ -179,10 +179,10 @@
                         </exclusions>
                     </dependency>
 
-                           <dependency>
-                               <groupId>org.apache.hadoop</groupId>
-                               
<artifactId>hadoop-mapreduce-client-common</artifactId>
-                               <version>${hadoop.version}</version>
+                    <dependency>
+                        <groupId>org.apache.hadoop</groupId>
+                        <artifactId>hadoop-mapreduce-client-common</artifactId>
+                        <version>${hadoop.version}</version>
                         <scope>provided</scope>
                         <exclusions>
                             <exclusion>
@@ -200,18 +200,18 @@
                         <scope>test</scope>
                     </dependency>
 
-                           <dependency>
-                               <groupId>org.apache.hadoop</groupId>
-                               <artifactId>hadoop-common</artifactId>
-                               <version>${hadoop.version}</version>
-                               <classifier>tests</classifier>
+                    <dependency>
+                        <groupId>org.apache.hadoop</groupId>
+                        <artifactId>hadoop-common</artifactId>
+                        <version>${hadoop.version}</version>
+                        <classifier>tests</classifier>
                         <scope>test</scope>
                     </dependency>
 
-                           <dependency>
-                               <groupId>org.apache.hadoop</groupId>
-                               
<artifactId>hadoop-yarn-server-resourcemanager</artifactId>
-                               <version>${hadoop.version}</version>
+                    <dependency>
+                        <groupId>org.apache.hadoop</groupId>
+                        
<artifactId>hadoop-yarn-server-resourcemanager</artifactId>
+                        <version>${hadoop.version}</version>
                         <scope>provided</scope>
                     </dependency>
 
@@ -241,6 +241,13 @@
                         <version>${hadoop.version}</version>
                         <scope>provided</scope>
                     </dependency>
+
+                    <dependency>
+                        <groupId>org.apache.hadoop</groupId>
+                        <artifactId>hadoop-mapreduce-client-core</artifactId>
+                        <version>${hadoop.version}</version>
+                        <scope>provided</scope>
+                    </dependency>
                 </dependencies>
           </dependencyManagement>
         </profile>
@@ -255,7 +262,7 @@
                             <descriptors>
                                 
<descriptor>src/main/assemblies/distributed-package.xml</descriptor>
                                 
<descriptor>src/main/assemblies/src-package.xml</descriptor>
-                           </descriptors>
+                                       </descriptors>
                             
<finalName>apache-falcon-distributed-${project.version}</finalName>
                         </configuration>
                     </plugin>
@@ -368,11 +375,47 @@
             <properties>
                 <excluded.test.groups/>
             </properties>
-         </profile>
+        </profile>
+        <profile>
+            <id>hivedr</id>
+            <build>
+                <plugins>
+                    <plugin>
+                        <groupId>org.apache.maven.plugins</groupId>
+                        <artifactId>maven-enforcer-plugin</artifactId>
+                        <version>1.3.1</version>
+                        <executions>
+                            <execution>
+                                <id>enforce-property</id>
+                                <goals>
+                                    <goal>enforce</goal>
+                                </goals>
+                                <configuration>
+                                    <rules>
+                                        <requireProperty>
+                                            <property>hive.version</property>
+                                            <regex>^(1.2.*)</regex>
+                                            <regexMessage>HiveDR only supports 
hive version >= 1.2.0</regexMessage>
+                                            <property>oozie.version</property>
+                                            <regex>^(4.2.*)</regex>
+                                            <regexMessage>HiveDR only supports 
oozie version >= 4.2.0</regexMessage>
+                                        </requireProperty>
+                                    </rules>
+                                    <fail>true</fail>
+                                </configuration>
+                            </execution>
+                        </executions>
+                    </plugin>
+                </plugins>
+            </build>
+            <modules>
+                <module>addons/hivedr</module>
+            </modules>
+        </profile>
     </profiles>
 
     <modules>
-       <module>falcon-ui</module>
+        <module>falcon-ui</module>
         <module>checkstyle</module>
         <module>build-tools</module>
         <module>client</module>
@@ -882,12 +925,15 @@
 
             <!--  this is needed for embedded oozie -->
             <dependency>
-                <groupId>org.apache.hive.hcatalog</groupId>
-                <artifactId>hive-webhcat-java-client</artifactId>
+                <groupId>org.apache.hive</groupId>
+                <artifactId>hive-exec</artifactId>
                 <version>${hive.version}</version>
                 <exclusions>
                     <exclusion>
-                        <!-- This implies you cannot use orc files -->
+                        <groupId>org.apache.hadoop</groupId>
+                        <artifactId>hadoop-hdfs</artifactId>
+                    </exclusion>
+                    <exclusion>
                         <groupId>com.google.protobuf</groupId>
                         <artifactId>protobuf-java</artifactId>
                     </exclusion>
@@ -907,6 +953,18 @@
             </dependency>
 
             <dependency>
+                <groupId>org.apache.hive.hcatalog</groupId>
+                <artifactId>hive-webhcat-java-client</artifactId>
+                <version>${hive.version}</version>
+                <exclusions>
+                    <exclusion> <!-- conflict with hadoop-auth -->
+                        <groupId>org.apache.httpcomponents</groupId>
+                        <artifactId>httpclient</artifactId>
+                    </exclusion>
+                </exclusions>
+            </dependency>
+
+            <dependency>
                 <groupId>com.github.stephenc.findbugs</groupId>
                 <artifactId>findbugs-annotations</artifactId>
                 <version>1.3.9-1</version>
@@ -1034,7 +1092,7 @@
                     <version>2.8.1</version>
                 </plugin>
 
-               <plugin>
+                <plugin>
                     <groupId>org.apache.maven.plugins</groupId>
                     <artifactId>maven-surefire-plugin</artifactId>
                     <version>2.16</version>

http://git-wip-us.apache.org/repos/asf/falcon/blob/cc1d3840/replication/pom.xml
----------------------------------------------------------------------
diff --git a/replication/pom.xml b/replication/pom.xml
index 8c4d6b4..43b6463 100644
--- a/replication/pom.xml
+++ b/replication/pom.xml
@@ -26,9 +26,9 @@
         <artifactId>falcon-main</artifactId>
         <version>0.7-SNAPSHOT</version>
     </parent>
-    <artifactId>falcon-replication</artifactId>
-    <description>Apache Falcon Replication Module</description>
-    <name>Apache Falcon Replication</name>
+    <artifactId>falcon-distcp-replication</artifactId>
+    <description>Apache Falcon Distcp Replication Module</description>
+    <name>Apache Falcon Distcp Replication</name>
     <packaging>jar</packaging>
 
     <profiles>
@@ -46,6 +46,10 @@
                     <groupId>org.apache.hadoop</groupId>
                     <artifactId>hadoop-distcp</artifactId>
                 </dependency>
+                <dependency>
+                    <groupId>org.apache.hadoop</groupId>
+                    <artifactId>hadoop-auth</artifactId>
+                </dependency>
             </dependencies>
         </profile>
     </profiles>

Reply via email to