FALCON-1861 Support HDFS Snapshot based replication in Falcon

Documentation will be added in Jira FALCON-1908

Author: bvellanki <[email protected]>

Reviewers: "Sowmya <[email protected]>, sandeepSamudrala 
<[email protected]>, Ying Zheng <[email protected]>, Venkat Ranganathan 
<[email protected]>"

Closes #105 from bvellanki/master


Project: http://git-wip-us.apache.org/repos/asf/falcon/repo
Commit: http://git-wip-us.apache.org/repos/asf/falcon/commit/aba79aae
Tree: http://git-wip-us.apache.org/repos/asf/falcon/tree/aba79aae
Diff: http://git-wip-us.apache.org/repos/asf/falcon/diff/aba79aae

Branch: refs/heads/master
Commit: aba79aae2c7235f065413fe0c9f0af5077079371
Parents: 7c0481e
Author: bvellanki <[email protected]>
Authored: Thu Apr 21 16:48:16 2016 -0700
Committer: bvellanki <[email protected]>
Committed: Thu Apr 21 16:48:16 2016 -0700

----------------------------------------------------------------------
 .../extensions/hdfs-snapshot-mirroring/README   |  31 +++
 .../extensions/hdfs-snapshot-mirroring/pom.xml  |  32 +++
 .../hdfs-snapshot-mirroring-properties.json     | 173 ++++++++++++
 .../hdfs-snapshot-mirroring-template.xml        |  45 +++
 .../hdfs-snapshot-mirroring-workflow.xml        | 172 ++++++++++++
 addons/hdfs-snapshot-mirroring/README           |  31 +++
 addons/hdfs-snapshot-mirroring/pom.xml          | 188 +++++++++++++
 .../replication/HdfsSnapshotReplicator.java     | 277 +++++++++++++++++++
 .../retention/HdfsSnapshotEvictor.java          | 208 ++++++++++++++
 .../falcon/snapshots/util/HdfsSnapshotUtil.java |  67 +++++
 .../replication/HdfsSnapshotReplicatorTest.java | 163 +++++++++++
 .../retention/HdfsSnapshotEvictorTest.java      |  98 +++++++
 .../src/test/resources/backup-cluster-0.1.xml   |  44 +++
 .../src/test/resources/primary-cluster-0.1.xml  |  43 +++
 .../org/apache/falcon/entity/ClusterHelper.java |  14 +
 .../falcon/hadoop/HadoopClientFactory.java      |  93 ++++++-
 .../apache/falcon/retention/EvictionHelper.java |   7 +-
 common/src/main/resources/startup.properties    |   2 +-
 .../falcon/retention/EvictionHelperTest.java    |  35 +++
 extensions/pom.xml                              |  29 ++
 .../falcon/extensions/AbstractExtension.java    |   3 +
 .../HdfsSnapshotMirrorProperties.java           |  84 ++++++
 .../HdfsSnapshotMirroringExtension.java         | 234 ++++++++++++++++
 .../util/ExtensionProcessBuilderUtils.java      |   2 +-
 .../apache/falcon/extensions/ExtensionTest.java | 165 ++++++++++-
 .../extensions/store/ExtensionStoreTest.java    |   6 +-
 .../src/test/resources/backup-cluster-0.1.xml   |   3 +-
 .../hdfs-snapshot-mirroring-template.xml        |  45 +++
 .../src/test/resources/primary-cluster-0.1.xml  |   2 +-
 pom.xml                                         |  43 ++-
 scheduler/src/test/resources/startup.properties |   2 +-
 src/conf/startup.properties                     |   2 +-
 src/main/assemblies/assembly-standalone.xml     |  40 +++
 src/main/assemblies/distributed-package.xml     |  40 +++
 src/main/assemblies/standalone-package.xml      |  40 +++
 test-util/pom.xml                               |   6 +
 .../cluster/util/MiniHdfsClusterUtil.java       |  52 ++++
 unit/pom.xml                                    |  24 +-
 .../unit/LocalFalconRPCClientFactory.java       |  16 +-
 unit/src/main/resources/startup.properties      |   2 +-
 webapp/pom.xml                                  |  10 +
 webapp/src/test/resources/startup.properties    |   2 +-
 42 files changed, 2537 insertions(+), 38 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/falcon/blob/aba79aae/addons/extensions/hdfs-snapshot-mirroring/README
----------------------------------------------------------------------
diff --git a/addons/extensions/hdfs-snapshot-mirroring/README 
b/addons/extensions/hdfs-snapshot-mirroring/README
new file mode 100644
index 0000000..fc33d3a
--- /dev/null
+++ b/addons/extensions/hdfs-snapshot-mirroring/README
@@ -0,0 +1,31 @@
+# Licensed to the Apache Software Foundation (ASF) under one
+# or more contributor license agreements.  See the NOTICE file
+# distributed with this work for additional information
+# regarding copyright ownership.  The ASF licenses this file
+# to you under the Apache License, Version 2.0 (the
+# "License"); you may not use this file except in compliance
+# with the License.  You may obtain a copy of the License at
+#
+#     http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing, software
+# distributed under the License is distributed on an "AS IS" BASIS,
+# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+# See the License for the specific language governing permissions and
+# limitations under the License.
+
+HDFS Snapshot Mirroring Extension.
+
+Overview
+This extension implements replication for snapshot-able directories on HDFS 
from one
+Hadoop cluster to another. This piggybacks on snapshot solution supported in 
HDFS (HDFS-7535).
+It also performs retention on the snapshots generated in source and target.
+
+Use Case
+* Create snapshots in source directory
+* Copy this directory between HDFS clusters
+* Create snapshot in target directory
+* Handle snapshot retention in source and target directories
+
+Limitations
+If TDE encryption is enabled, this snapshot based replication is not efficient.
\ No newline at end of file

http://git-wip-us.apache.org/repos/asf/falcon/blob/aba79aae/addons/extensions/hdfs-snapshot-mirroring/pom.xml
----------------------------------------------------------------------
diff --git a/addons/extensions/hdfs-snapshot-mirroring/pom.xml 
b/addons/extensions/hdfs-snapshot-mirroring/pom.xml
new file mode 100644
index 0000000..b0b4819
--- /dev/null
+++ b/addons/extensions/hdfs-snapshot-mirroring/pom.xml
@@ -0,0 +1,32 @@
+<?xml version="1.0" encoding="UTF-8"?>
+
+<!--
+  Licensed to the Apache Software Foundation (ASF) under one
+  or more contributor license agreements.  See the NOTICE file
+  distributed with this work for additional information
+  regarding copyright ownership.  The ASF licenses this file
+  to you under the Apache License, Version 2.0 (the
+  "License"); you may not use this file except in compliance
+  with the License.  You may obtain a copy of the License at
+
+      http://www.apache.org/licenses/LICENSE-2.0
+
+  Unless required by applicable law or agreed to in writing, software
+  distributed under the License is distributed on an "AS IS" BASIS,
+  WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+  See the License for the specific language governing permissions and
+  limitations under the License.
+  -->
+
+<project xmlns="http://maven.apache.org/POM/4.0.0";
+         xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance";
+         xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 
http://maven.apache.org/maven-v4_0_0.xsd";>
+
+    <modelVersion>4.0.0</modelVersion>
+    <groupId>org.apache.falcon.extensions</groupId>
+    <artifactId>falcon-hdfs-snapshot-mirroring-extension</artifactId>
+    <version>0.10-SNAPSHOT</version>
+    <description>Apache Falcon HDFS Snapshot Mirroring Extension</description>
+    <name>Apache Falcon Sample HDFS Snapshot Mirroring Extension</name>
+    <packaging>jar</packaging>
+</project>
\ No newline at end of file

http://git-wip-us.apache.org/repos/asf/falcon/blob/aba79aae/addons/extensions/hdfs-snapshot-mirroring/src/main/META/hdfs-snapshot-mirroring-properties.json
----------------------------------------------------------------------
diff --git 
a/addons/extensions/hdfs-snapshot-mirroring/src/main/META/hdfs-snapshot-mirroring-properties.json
 
b/addons/extensions/hdfs-snapshot-mirroring/src/main/META/hdfs-snapshot-mirroring-properties.json
new file mode 100644
index 0000000..46554c1
--- /dev/null
+++ 
b/addons/extensions/hdfs-snapshot-mirroring/src/main/META/hdfs-snapshot-mirroring-properties.json
@@ -0,0 +1,173 @@
+{
+  "shortDescription": "This extension implements replicating snapshotable 
directories on HDFS from one Hadoop cluster to another.",
+  "properties":[
+    {
+      "propertyName":"jobName",
+      "required":true,
+      "description":"Unique hdfs snapshot mirroring job name",
+      "example":"hdfs-snapshot-daily-mirror"
+    },
+    {
+      "propertyName":"jobClusterName",
+      "required":true,
+      "description":"Cluster where job should run",
+      "example":"backupCluster"
+    },
+    {
+      "propertyName":"jobValidityStart",
+      "required":true,
+      "description":"Job cluster validity start time",
+      "example":"2016-03-03T00:00Z"
+    },
+    {
+      "propertyName":"jobValidityEnd",
+      "required":true,
+      "description":"Job cluster validity end time",
+      "example":"2018-03-13T00:00Z"
+    },
+    {
+      "propertyName":"jobFrequency",
+      "required":true,
+      "description":"Frequency of mirroring job. Valid frequency types are 
minutes(int), hours(int), days(int), months(int)",
+      "example":"months(1)"
+    },
+    {
+      "propertyName":"jobTimezone",
+      "required":false,
+      "description":"Time zone for the job",
+      "example":"UTC"
+    },
+    {
+      "propertyName":"jobTags",
+      "required":false,
+      "description":"List of comma separated tags. Key Value pairs, separated 
by comma",
+      "example":"[email protected], [email protected], 
_department_type=forecasting"
+    },
+    {
+      "propertyName":"jobRetryPolicy",
+      "required":false,
+      "description":"Job retry policy",
+      "example":"periodic"
+    },
+    {
+      "propertyName":"jobRetryDelay",
+      "required":false,
+      "description":"Job retry delay",
+      "example":"minutes(30)"
+    },
+    {
+      "propertyName":"jobRetryAttempts",
+      "required":false,
+      "description":"Job retry attempts",
+      "example":"3"
+    },
+    {
+      "propertyName":"jobAclOwner",
+      "required":false,
+      "description":"ACL owner",
+      "example":"ambari-qa"
+    },
+    {
+      "propertyName":"jobAclGroup",
+      "required":false,
+      "description":"ACL group",
+      "example":"users"
+    },
+    {
+      "propertyName":"jobAclPermission",
+      "required":false,
+      "description":"ACL permission",
+      "example":"*"
+    },
+    {
+      "propertyName":"sourceCluster",
+      "required":true,
+      "description":"Source cluster for hdfs snapshot replication",
+      "example":"primaryCluster"
+    },
+    {
+      "propertyName":"sourceSnapshotDir",
+      "required":true,
+      "description":"Snapshot-able source directory which should be 
replicated",
+      "example":"/user/ambari-qa/snapshot/test/primaryCluster/input"
+    },
+    {
+      "propertyName":"sourceSnapshotRetentionPolicy",
+      "required":false,
+      "description":"Retention policy for snapshots created on source. Default 
is delete (Right now,only delete is supported)",
+      "example":"delete"
+    },
+    {
+      "propertyName":"sourceSnapshotRetentionAgeLimit",
+      "required":true,
+      "description":"Snapshots on source older than this age limit will be 
eligible for deletion.",
+      "example":"days(7)"
+    },
+    {
+      "propertyName":"sourceSnapshotRetentionNumber",
+      "required":true,
+      "description":"These many latest snapshots on source will be retained, 
the rest of them eligible for deletion.",
+      "example":"10"
+    },
+    {
+      "propertyName":"targetCluster",
+      "required":true,
+      "description":"Target cluster for hdfs snapshot replication",
+      "example":"backupCluster"
+    },
+    {
+      "propertyName":"targetSnapshotDir",
+      "required":true,
+      "description":"Snapshot-able target directory to which source should be 
replicated",
+      "example":"/user/ambari-qa/snapshot/test/backupCluster/replica/"
+    },
+    {
+      "propertyName":"targetSnapshotRetentionPolicy",
+      "required":false,
+      "description":"Retention policy for snapshots created on target. Default 
is delete (Right now,only delete is supported)",
+      "example":"delete"
+    },
+    {
+      "propertyName":"targetSnapshotRetentionAgeLimit",
+      "required":true,
+      "description":"Snapshots on target older than this age limit will be 
eligible for deletion.",
+      "example":"days(7)"
+    },
+    {
+      "propertyName":"targetSnapshotRetentionNumber",
+      "required":true,
+      "description":"These many latest snapshots on target will be retained, 
the rest of them eligible for deletion.",
+      "example":"10"
+    },
+    {
+      "propertyName":"distcpMaxMaps",
+      "required":false,
+      "description":"Maximum number of mappers for DistCP",
+      "example":"1"
+    },
+    {
+      "propertyName":"distcpMapBandwidth",
+      "required":false,
+      "description":"Bandwidth in MB for each mapper in DistCP",
+      "example":"100"
+    },
+    {
+      "propertyName":"tdeEncryptionEnabled",
+      "required":false,
+      "description":"Specify if TDE based encryption is enabled on source and 
target dirs",
+      "example":"false"
+    },
+    {
+      "propertyName":"jobNotificationType",
+      "required":false,
+      "description":"Email Notification for Falcon instance completion",
+      "example":"email"
+    },
+    {
+      "propertyName":"jobNotificationReceivers",
+      "required":false,
+      "description":"Comma separated email Id's",
+      "example":"[email protected], [email protected]"
+    }
+  ]
+}
\ No newline at end of file

http://git-wip-us.apache.org/repos/asf/falcon/blob/aba79aae/addons/extensions/hdfs-snapshot-mirroring/src/main/resources/runtime/hdfs-snapshot-mirroring-template.xml
----------------------------------------------------------------------
diff --git 
a/addons/extensions/hdfs-snapshot-mirroring/src/main/resources/runtime/hdfs-snapshot-mirroring-template.xml
 
b/addons/extensions/hdfs-snapshot-mirroring/src/main/resources/runtime/hdfs-snapshot-mirroring-template.xml
new file mode 100644
index 0000000..29131da
--- /dev/null
+++ 
b/addons/extensions/hdfs-snapshot-mirroring/src/main/resources/runtime/hdfs-snapshot-mirroring-template.xml
@@ -0,0 +1,45 @@
+<?xml version="1.0" encoding="UTF-8"?>
+<!--
+  Licensed to the Apache Software Foundation (ASF) under one
+  or more contributor license agreements.  See the NOTICE file
+  distributed with this work for additional information
+  regarding copyright ownership.  The ASF licenses this file
+  to you under the Apache License, Version 2.0 (the
+  "License"); you may not use this file except in compliance
+  with the License.  You may obtain a copy of the License at
+
+      http://www.apache.org/licenses/LICENSE-2.0
+
+  Unless required by applicable law or agreed to in writing, software
+  distributed under the License is distributed on an "AS IS" BASIS,
+  WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+  See the License for the specific language governing permissions and
+  limitations under the License.
+  -->
+
+<process name="##jobName##" xmlns="uri:falcon:process:0.1">
+    <clusters>
+        <!--  source  -->
+        <cluster name="##jobClusterName##">
+            <validity end="##jobValidityEnd##" start="##jobValidityStart##"/>
+        </cluster>
+    </clusters>
+
+    <tags/>
+
+    <parallel>1</parallel>
+    <!-- Replication needs to run only once to catch up -->
+    <order>LAST_ONLY</order>
+    <frequency>##jobFrequency##</frequency>
+    <timezone>##jobTimezone##</timezone>
+
+    <properties>
+        <property name="oozie.wf.subworkflow.classpath.inheritance" 
value="true"/>
+    </properties>
+
+    <workflow name="##jobWorkflowName##" engine="##jobWorkflowEngine##"
+              path="##jobWorkflowPath##" lib="##jobWorkflowLibPath##"/>
+    <retry policy="##jobRetryPolicy##" delay="##jobRetryDelay##" attempts="3"/>
+    <notification type="##jobNotificationType##" 
to="##jobNotificationReceivers##"/>
+    <ACL/>
+</process>
\ No newline at end of file

http://git-wip-us.apache.org/repos/asf/falcon/blob/aba79aae/addons/extensions/hdfs-snapshot-mirroring/src/main/resources/runtime/hdfs-snapshot-mirroring-workflow.xml
----------------------------------------------------------------------
diff --git 
a/addons/extensions/hdfs-snapshot-mirroring/src/main/resources/runtime/hdfs-snapshot-mirroring-workflow.xml
 
b/addons/extensions/hdfs-snapshot-mirroring/src/main/resources/runtime/hdfs-snapshot-mirroring-workflow.xml
new file mode 100644
index 0000000..c735167
--- /dev/null
+++ 
b/addons/extensions/hdfs-snapshot-mirroring/src/main/resources/runtime/hdfs-snapshot-mirroring-workflow.xml
@@ -0,0 +1,172 @@
+<!--
+  Licensed to the Apache Software Foundation (ASF) under one
+  or more contributor license agreements.  See the NOTICE file
+  distributed with this work for additional information
+  regarding copyright ownership.  The ASF licenses this file
+  to you under the Apache License, Version 2.0 (the
+  "License"); you may not use this file except in compliance
+  with the License.  You may obtain a copy of the License at
+
+      http://www.apache.org/licenses/LICENSE-2.0
+
+  Unless required by applicable law or agreed to in writing, software
+  distributed under the License is distributed on an "AS IS" BASIS,
+  WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+  See the License for the specific language governing permissions and
+  limitations under the License.
+  -->
+<workflow-app xmlns='uri:oozie:workflow:0.3' 
name='falcon-hdfs-snapshot-mirroring'>
+    <start to='snapshot-replication'/>
+    <!-- Snapshot replication action -->
+    <action name="snapshot-replication">
+        <java>
+            <job-tracker>${jobTracker}</job-tracker>
+            <name-node>${nameNode}</name-node>
+            <configuration>
+                <property> <!-- hadoop 2 parameter -->
+                    
<name>oozie.launcher.mapreduce.job.user.classpath.first</name>
+                    <value>true</value>
+                </property>
+                <property>
+                    <name>mapred.job.queue.name</name>
+                    <value>${queueName}</value>
+                </property>
+                <property>
+                    <name>oozie.launcher.mapred.job.priority</name>
+                    <value>${jobPriority}</value>
+                </property>
+                <property>
+                    <name>oozie.use.system.libpath</name>
+                    <value>true</value>
+                </property>
+                <property>
+                    <name>oozie.launcher.oozie.libpath</name>
+                    <value>${wf:conf("falcon.libpath")}</value>
+                </property>
+                <property>
+                    <name>oozie.action.sharelib.for.java</name>
+                    <value>distcp</value>
+                </property>
+                <property>
+                    <name>oozie.launcher.mapreduce.job.hdfs-servers</name>
+                    <value>${sourceNN},${targetNN}</value>
+                </property>
+                <property>
+                    <name>mapreduce.job.hdfs-servers</name>
+                    <value>${sourceNN},${targetNN}</value>
+                </property>
+            </configuration>
+            
<main-class>org.apache.falcon.snapshots.replication.HdfsSnapshotReplicator</main-class>
+            <arg>-Dmapred.job.queue.name=${queueName}</arg>
+            <arg>-Dmapred.job.priority=${jobPriority}</arg>
+            <arg>-distcpMaxMaps</arg>
+            <arg>${distcpMaxMaps}</arg>
+            <arg>-distcpMapBandwidth</arg>
+            <arg>${distcpMapBandwidth}</arg>
+            <arg>-sourceNN</arg>
+            <arg>${sourceNN}</arg>
+            <arg>-sourceExecUrl</arg>
+            <arg>${sourceExecUrl}</arg>
+            <arg>-sourceNNKerberosPrincipal</arg>
+            <arg>${sourceNNKerberosPrincipal}</arg>
+            <arg>-sourceSnapshotDir</arg>
+            <arg>${sourceSnapshotDir}</arg>
+            <arg>-targetNN</arg>
+            <arg>${targetNN}</arg>
+            <arg>-targetExecUrl</arg>
+            <arg>${targetExecUrl}</arg>
+            <arg>-targetNNKerberosPrincipal</arg>
+            <arg>${targetNNKerberosPrincipal}</arg>
+            <arg>-targetSnapshotDir</arg>
+            <arg>${targetSnapshotDir}</arg>
+            <arg>-tdeEncryptionEnabled</arg>
+            <arg>${tdeEncryptionEnabled}</arg>
+            <arg>-snapshotJobName</arg>
+            <arg>${snapshotJobName}-${nominalTime}</arg>
+        </java>
+        <ok to="snapshot-retention"/>
+        <error to="fail"/>
+    </action>
+    <!-- Snapshot retention action -->
+    <action name="snapshot-retention">
+        <java>
+            <job-tracker>${jobTracker}</job-tracker>
+            <name-node>${nameNode}</name-node>
+            <configuration>
+                <property> <!-- hadoop 2 parameter -->
+                    
<name>oozie.launcher.mapreduce.job.user.classpath.first</name>
+                    <value>true</value>
+                </property>
+                <property>
+                    <name>mapred.job.queue.name</name>
+                    <value>${queueName}</value>
+                </property>
+                <property>
+                    <name>oozie.launcher.mapred.job.priority</name>
+                    <value>${jobPriority}</value>
+                </property>
+                <property>
+                    <name>oozie.use.system.libpath</name>
+                    <value>true</value>
+                </property>
+                <property>
+                    <name>oozie.action.sharelib.for.java</name>
+                    <value>distcp</value>
+                </property>
+                <property>
+                    <name>oozie.launcher.oozie.libpath</name>
+                    <value>${wf:conf("falcon.libpath")}</value>
+                </property>
+                <property>
+                    <name>oozie.launcher.mapreduce.job.hdfs-servers</name>
+                    <value>${sourceNN},${targetNN}</value>
+                </property>
+                <property>
+                    <name>mapreduce.job.hdfs-servers</name>
+                    <value>${sourceNN},${targetNN}</value>
+                </property>
+            </configuration>
+            
<main-class>org.apache.falcon.snapshots.retention.HdfsSnapshotEvictor</main-class>
+            <arg>-Dmapred.job.queue.name=${queueName}</arg>
+            <arg>-Dmapred.job.priority=${jobPriority}</arg>
+            <arg>-sourceNN</arg>
+            <arg>${sourceNN}</arg>
+            <arg>-sourceExecUrl</arg>
+            <arg>${sourceExecUrl}</arg>
+            <arg>-sourceNNKerberosPrincipal</arg>
+            <arg>${sourceNNKerberosPrincipal}</arg>
+            <arg>-sourceSnapshotDir</arg>
+            <arg>${sourceSnapshotDir}</arg>
+            <arg>-sourceSnapshotRetentionPolicy</arg>
+            <arg>${sourceSnapshotRetentionPolicy}</arg>
+            <arg>-sourceSnapshotRetentionAgeLimit</arg>
+            <arg>${sourceSnapshotRetentionAgeLimit}</arg>
+            <arg>-sourceSnapshotRetentionNumber</arg>
+            <arg>${sourceSnapshotRetentionNumber}</arg>
+            <arg>-targetNN</arg>
+            <arg>${targetNN}</arg>
+            <arg>-targetExecUrl</arg>
+            <arg>${targetExecUrl}</arg>
+            <arg>-targetNNKerberosPrincipal</arg>
+            <arg>${targetNNKerberosPrincipal}</arg>
+            <arg>-targetSnapshotDir</arg>
+            <arg>${targetSnapshotDir}</arg>
+            <arg>-targetSnapshotRetentionPolicy</arg>
+            <arg>${targetSnapshotRetentionPolicy}</arg>
+            <arg>-targetSnapshotRetentionAgeLimit</arg>
+            <arg>${targetSnapshotRetentionAgeLimit}</arg>
+            <arg>-targetSnapshotRetentionNumber</arg>
+            <arg>${targetSnapshotRetentionNumber}</arg>
+            <arg>-snapshotJobName</arg>
+            <arg>${snapshotJobName}-${nominalTime}</arg>
+        </java>
+        <ok to="end"/>
+        <error to="fail"/>
+    </action>
+    <kill name="fail">
+        <message>
+            Workflow action failed, error 
message[${wf:errorMessage(wf:lastErrorNode())}]
+        </message>
+    </kill>
+    <end name="end"/>
+</workflow-app>
\ No newline at end of file

http://git-wip-us.apache.org/repos/asf/falcon/blob/aba79aae/addons/hdfs-snapshot-mirroring/README
----------------------------------------------------------------------
diff --git a/addons/hdfs-snapshot-mirroring/README 
b/addons/hdfs-snapshot-mirroring/README
new file mode 100644
index 0000000..fc33d3a
--- /dev/null
+++ b/addons/hdfs-snapshot-mirroring/README
@@ -0,0 +1,31 @@
+# Licensed to the Apache Software Foundation (ASF) under one
+# or more contributor license agreements.  See the NOTICE file
+# distributed with this work for additional information
+# regarding copyright ownership.  The ASF licenses this file
+# to you under the Apache License, Version 2.0 (the
+# "License"); you may not use this file except in compliance
+# with the License.  You may obtain a copy of the License at
+#
+#     http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing, software
+# distributed under the License is distributed on an "AS IS" BASIS,
+# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+# See the License for the specific language governing permissions and
+# limitations under the License.
+
+HDFS Snapshot Mirroring Extension.
+
+Overview
+This extension implements replication for snapshot-able directories on HDFS 
from one
+Hadoop cluster to another. This piggybacks on snapshot solution supported in 
HDFS (HDFS-7535).
+It also performs retention on the snapshots generated in source and target.
+
+Use Case
+* Create snapshots in source directory
+* Copy this directory between HDFS clusters
+* Create snapshot in target directory
+* Handle snapshot retention in source and target directories
+
+Limitations
+If TDE encryption is enabled, this snapshot based replication is not efficient.
\ No newline at end of file

http://git-wip-us.apache.org/repos/asf/falcon/blob/aba79aae/addons/hdfs-snapshot-mirroring/pom.xml
----------------------------------------------------------------------
diff --git a/addons/hdfs-snapshot-mirroring/pom.xml 
b/addons/hdfs-snapshot-mirroring/pom.xml
new file mode 100644
index 0000000..5240d62
--- /dev/null
+++ b/addons/hdfs-snapshot-mirroring/pom.xml
@@ -0,0 +1,188 @@
+<?xml version="1.0" encoding="UTF-8"?>
+<!--
+  Licensed to the Apache Software Foundation (ASF) under one
+  or more contributor license agreements.  See the NOTICE file
+  distributed with this work for additional information
+  regarding copyright ownership.  The ASF licenses this file
+  to you under the Apache License, Version 2.0 (the
+  "License"); you may not use this file except in compliance
+  with the License.  You may obtain a copy of the License at
+
+      http://www.apache.org/licenses/LICENSE-2.0
+
+  Unless required by applicable law or agreed to in writing, software
+  distributed under the License is distributed on an "AS IS" BASIS,
+  WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+  See the License for the specific language governing permissions and
+  limitations under the License.
+  -->
+<project xmlns="http://maven.apache.org/POM/4.0.0";
+         xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance";
+         xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 
http://maven.apache.org/xsd/maven-4.0.0.xsd";>
+    <modelVersion>4.0.0</modelVersion>
+
+    <parent>
+        <groupId>org.apache.falcon</groupId>
+        <artifactId>falcon-main</artifactId>
+        <version>0.10-SNAPSHOT</version>
+        <relativePath>../../pom.xml</relativePath>
+    </parent>
+    <artifactId>falcon-hdfs-snapshot-mirroring</artifactId>
+    <description>Apache Falcon HDFS Snapshot based Replication 
Module</description>
+    <name>Apache Falcon HDFS Snapshot Replication</name>
+    <packaging>jar</packaging>
+
+    <dependencies>
+        <!-- dependencies are always listed in sorted order by groupId, 
artifactId -->
+        <!-- intra-project -->
+
+        <dependency>
+            <groupId>org.testng</groupId>
+            <artifactId>testng</artifactId>
+        </dependency>
+        <!-- inter-project -->
+        <dependency>
+            <groupId>javax.jms</groupId>
+            <artifactId>jms</artifactId>
+        </dependency>
+
+        <dependency>
+            <groupId>junit</groupId>
+            <artifactId>junit</artifactId>
+            <version>3.8.1</version>
+            <scope>test</scope>
+        </dependency>
+        <dependency>
+            <groupId>org.apache.falcon</groupId>
+            <artifactId>falcon-test-util</artifactId>
+            <scope>test</scope>
+        </dependency>
+        <dependency>
+            <groupId>org.apache.falcon</groupId>
+            <artifactId>falcon-extensions</artifactId>
+        </dependency>
+        <dependency>
+            <groupId>org.apache.falcon</groupId>
+            <artifactId>falcon-hadoop-dependencies</artifactId>
+            <scope>test</scope>
+        </dependency>
+        <dependency>
+            <groupId>org.apache.falcon</groupId>
+            <artifactId>falcon-metrics</artifactId>
+        </dependency>
+    </dependencies>
+
+    <profiles>
+        <profile>
+            <id>hadoop-2</id>
+            <activation>
+                <activeByDefault>true</activeByDefault>
+            </activation>
+            <dependencies>
+                <dependency>
+                    <groupId>org.apache.hadoop</groupId>
+                    <artifactId>hadoop-hdfs</artifactId>
+                    <classifier>tests</classifier>
+                </dependency>
+
+                <dependency>
+                    <groupId>org.apache.hadoop</groupId>
+                    <artifactId>hadoop-common</artifactId>
+                    <classifier>tests</classifier>
+                </dependency>
+
+                <dependency>
+                    <groupId>org.apache.hadoop</groupId>
+                    <artifactId>hadoop-common</artifactId>
+                    <scope>compile</scope>
+                </dependency>
+
+                <dependency>
+                    <groupId>org.apache.hadoop</groupId>
+                    <artifactId>hadoop-hdfs</artifactId>
+                    <scope>compile</scope>
+                </dependency>
+
+                <dependency>
+                    <groupId>org.apache.hadoop</groupId>
+                    <artifactId>hadoop-mapreduce-client-core</artifactId>
+                    <scope>compile</scope>
+                    <version>${hadoop.version}</version>
+                </dependency>
+
+                <dependency>
+                    <groupId>org.apache.hadoop</groupId>
+                    <artifactId>hadoop-client</artifactId>
+                    <scope>compile</scope>
+                </dependency>
+
+                <dependency>
+                    <groupId>org.apache.hadoop</groupId>
+                    <artifactId>hadoop-mapreduce-client-common</artifactId>
+                    <scope>compile</scope>
+                </dependency>
+
+                <dependency>
+                    <groupId>org.apache.hadoop</groupId>
+                    <artifactId>hadoop-yarn-server-resourcemanager</artifactId>
+                    <scope>compile</scope>
+                </dependency>
+
+                <dependency>
+                    <groupId>org.apache.hadoop</groupId>
+                    <artifactId>hadoop-yarn-server-nodemanager</artifactId>
+                    <scope>compile</scope>
+                </dependency>
+
+                <dependency>
+                    <groupId>org.apache.hadoop</groupId>
+                    <artifactId>hadoop-auth</artifactId>
+                    <scope>compile</scope>
+                </dependency>
+
+                <dependency>
+                    <groupId>org.apache.hadoop</groupId>
+                    <artifactId>hadoop-distcp</artifactId>
+                    <scope>compile</scope>
+                </dependency>
+
+                <dependency>
+                    <groupId>org.apache.derby</groupId>
+                    <artifactId>derby</artifactId>
+                    <version>${derby.version}</version>
+                </dependency>
+            </dependencies>
+        </profile>
+    </profiles>
+
+    <build>
+        <sourceDirectory>${basedir}/src/main/java</sourceDirectory>
+        <testSourceDirectory>${basedir}/src/test/java</testSourceDirectory>
+        <plugins>
+            <plugin>
+                <groupId>org.apache.maven.plugins</groupId>
+                <artifactId>maven-jar-plugin</artifactId>
+                <executions>
+                    <execution>
+                        <goals>
+                            <goal>test-jar</goal>
+                        </goals>
+                    </execution>
+                </executions>
+            </plugin>
+            <plugin>
+                <groupId>org.apache.maven.plugins</groupId>
+                <artifactId>maven-surefire-plugin</artifactId>
+                <configuration>
+                    <systemProperties>
+                        <property>
+                            <name>derby.stream.error.file</name>
+                            <value>target/derby.log</value>
+                        </property>
+                    </systemProperties>
+                </configuration>
+            </plugin>
+        </plugins>
+    </build>
+
+</project>

http://git-wip-us.apache.org/repos/asf/falcon/blob/aba79aae/addons/hdfs-snapshot-mirroring/src/main/java/org/apache/falcon/snapshots/replication/HdfsSnapshotReplicator.java
----------------------------------------------------------------------
diff --git 
a/addons/hdfs-snapshot-mirroring/src/main/java/org/apache/falcon/snapshots/replication/HdfsSnapshotReplicator.java
 
b/addons/hdfs-snapshot-mirroring/src/main/java/org/apache/falcon/snapshots/replication/HdfsSnapshotReplicator.java
new file mode 100644
index 0000000..2e41cc0
--- /dev/null
+++ 
b/addons/hdfs-snapshot-mirroring/src/main/java/org/apache/falcon/snapshots/replication/HdfsSnapshotReplicator.java
@@ -0,0 +1,277 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.falcon.snapshots.replication;
+
+import org.apache.commons.cli.CommandLine;
+import org.apache.commons.cli.GnuParser;
+import org.apache.commons.cli.Option;
+import org.apache.commons.cli.Options;
+import org.apache.commons.cli.ParseException;
+import org.apache.commons.lang3.StringUtils;
+import org.apache.falcon.FalconException;
+import 
org.apache.falcon.extensions.mirroring.hdfsSnapshot.HdfsSnapshotMirrorProperties;
+import org.apache.falcon.hadoop.HadoopClientFactory;
+import org.apache.falcon.snapshots.util.HdfsSnapshotUtil;
+import org.apache.falcon.workflow.util.OozieActionConfigurationHelper;
+import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.conf.Configured;
+import org.apache.hadoop.fs.FileStatus;
+import org.apache.hadoop.fs.FileSystem;
+import org.apache.hadoop.fs.Path;
+import org.apache.hadoop.hdfs.DistributedFileSystem;
+import org.apache.hadoop.mapreduce.Job;
+import org.apache.hadoop.tools.DistCp;
+import org.apache.hadoop.tools.DistCpOptions;
+import org.apache.hadoop.util.Tool;
+import org.apache.hadoop.util.ToolRunner;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import java.io.IOException;
+import java.util.ArrayList;
+import java.util.Arrays;
+import java.util.Comparator;
+import java.util.HashSet;
+import java.util.List;
+import java.util.Set;
+
+/**
+ * HDFS snapshot generator and snapshot based replicator.
+ */
+public class HdfsSnapshotReplicator extends Configured implements Tool {
+    private static final Logger LOG = 
LoggerFactory.getLogger(HdfsSnapshotReplicator.class);
+    protected CommandLine cmd;
+
+    public static void main(String[] args) throws Exception {
+        Configuration conf = OozieActionConfigurationHelper.createActionConf();
+        int ret = ToolRunner.run(conf, new HdfsSnapshotReplicator(), args);
+        if (ret != 0) {
+            throw new Exception("Unable to perform Snapshot based replication 
action args: " + Arrays.toString(args));
+        }
+    }
+
+    @Override
+    public int run(String[] args) throws FalconException {
+        cmd = getCommand(args);
+
+        String sourceStorageUrl = 
cmd.getOptionValue(HdfsSnapshotMirrorProperties.SOURCE_NN.getName());
+        String targetStorageUrl = 
cmd.getOptionValue(HdfsSnapshotMirrorProperties.TARGET_NN.getName());
+
+        DistributedFileSystem sourceFs = 
HdfsSnapshotUtil.getSourceFileSystem(cmd);
+        DistributedFileSystem targetFs = 
HdfsSnapshotUtil.getTargetFileSystem(cmd);
+
+        String currentSnapshotName = HdfsSnapshotUtil.SNAPSHOT_PREFIX
+                + 
cmd.getOptionValue(HdfsSnapshotMirrorProperties.SNAPSHOT_JOB_NAME.getName())
+                + "-" + System.currentTimeMillis();
+        String sourceDir = 
cmd.getOptionValue(HdfsSnapshotMirrorProperties.SOURCE_SNAPSHOT_DIR.getName());
+        String targetDir = 
cmd.getOptionValue(HdfsSnapshotMirrorProperties.TARGET_SNAPSHOT_DIR.getName());
+
+        // Generate snapshot on source.
+        createSnapshotInFileSystem(sourceDir, currentSnapshotName, sourceFs);
+
+        // Find most recently recplicated snapshot. If it exists, distCp using 
the snapshots.
+        // If not, do regular distcp as this is the first time job is running.
+        invokeCopy(sourceStorageUrl, targetStorageUrl, sourceFs, targetFs,
+                sourceDir, targetDir, currentSnapshotName);
+
+        // Generate snapshot on target if distCp succeeds.
+        createSnapshotInFileSystem(targetDir, currentSnapshotName, targetFs);
+
+        LOG.info("Completed HDFS Snapshot Replication.");
+        return 0;
+    }
+
+    private static void createSnapshotInFileSystem(String dirName, String 
snapshotName,
+                                                   FileSystem fs) throws 
FalconException {
+        try {
+            LOG.info("Creating snapshot {} in directory {}", snapshotName, 
dirName);
+            fs.createSnapshot(new Path(dirName), snapshotName);
+        } catch (IOException e) {
+            LOG.warn("Unable to create snapshot {} in filesystem {}. Exception 
is {}",
+                    snapshotName, 
fs.getConf().get(HadoopClientFactory.FS_DEFAULT_NAME_KEY), e.getMessage());
+            throw new FalconException("Unable to create snapshot " + 
snapshotName, e);
+        }
+    }
+
+    protected void invokeCopy(String sourceStorageUrl, String targetStorageUrl,
+                              DistributedFileSystem sourceFs, 
DistributedFileSystem targetFs,
+                              String sourceDir, String targetDir,
+                              String currentSnapshotName) throws 
FalconException {
+        try {
+            Configuration jobConf = this.getConf();
+            DistCpOptions options = getDistCpOptions(sourceStorageUrl, 
targetStorageUrl,
+                    sourceFs, targetFs, sourceDir, targetDir, 
currentSnapshotName);
+            DistCp distCp = new DistCp(jobConf, options);
+            LOG.info("Started Snapshot based DistCp from {} to {} ", 
getStagingUri(sourceStorageUrl, sourceDir),
+                    getStagingUri(targetStorageUrl, targetDir));
+            Job distcpJob = distCp.execute();
+            LOG.info("Distp Hadoop job: {}", distcpJob.getJobID().toString());
+            LOG.info("Completed Snapshot based DistCp");
+
+        } catch (FalconException fe) {
+            throw fe;
+        } catch (Exception e) {
+            throw new FalconException("Unable to replicate HDFS directory 
using snapshots.", e);
+        }
+    }
+
+    private DistCpOptions getDistCpOptions(String sourceStorageUrl, String 
targetStorageUrl,
+                                           DistributedFileSystem sourceFs, 
DistributedFileSystem targetFs,
+                                           String sourceDir, String targetDir,
+                                           String currentSnapshotName) throws 
FalconException {
+
+        List<Path> sourceUris=new ArrayList<Path>();
+        sourceUris.add(new Path(getStagingUri(sourceStorageUrl, sourceDir)));
+
+        DistCpOptions distcpOptions = new DistCpOptions(sourceUris,
+                new Path(getStagingUri(targetStorageUrl, targetDir)));
+
+        // Settings needed for Snapshot distCp.
+        distcpOptions.setSyncFolder(true);
+        distcpOptions.setDeleteMissing(true);
+
+        // Use snapshot diff if two snapshots exist. Else treat it as simple 
distCp.
+        // get latest replicated snapshot.
+        String replicatedSnapshotName = findLatestReplicatedSnapshot(sourceFs, 
targetFs, sourceDir, targetDir);
+        if (StringUtils.isNotBlank(replicatedSnapshotName)) {
+            distcpOptions.setUseDiff(true, replicatedSnapshotName, 
currentSnapshotName);
+        }
+
+        if 
(Boolean.valueOf(cmd.getOptionValue(HdfsSnapshotMirrorProperties.TDE_ENCRYPTION_ENABLED.getName())))
 {
+            // skipCRCCheck and update enabled
+            distcpOptions.setSkipCRC(true);
+        }
+
+        distcpOptions.setBlocking(true);
+        distcpOptions.setMaxMaps(
+                
Integer.parseInt(cmd.getOptionValue(HdfsSnapshotMirrorProperties.DISTCP_MAX_MAPS.getName())));
+        distcpOptions.setMapBandwidth(
+                
Integer.parseInt(cmd.getOptionValue(HdfsSnapshotMirrorProperties.MAP_BANDWIDTH_IN_MB.getName())));
+        return distcpOptions;
+    }
+
+    private String findLatestReplicatedSnapshot(DistributedFileSystem 
sourceFs, DistributedFileSystem targetFs,
+            String sourceDir, String targetDir) throws FalconException {
+        try {
+            FileStatus[] sourceSnapshots = sourceFs.listStatus(new 
Path(getSnapshotDir(sourceDir)));
+            Set<String> sourceSnapshotNames = new HashSet<String>();
+            for (FileStatus snapshot : sourceSnapshots) {
+                sourceSnapshotNames.add(snapshot.getPath().getName());
+            }
+
+            FileStatus[] targetSnapshots = targetFs.listStatus(new 
Path(getSnapshotDir(targetDir)));
+            if (targetSnapshots.length > 0) {
+                //sort target snapshots in desc order of creation time.
+                Arrays.sort(targetSnapshots, new Comparator<FileStatus>() {
+                    @Override
+                    public int compare(FileStatus f1, FileStatus f2) {
+                        return Long.compare(f2.getModificationTime(), 
f1.getModificationTime());
+                    }
+                });
+
+                // get most recent snapshot name that exists in source.
+                for (int i = 0; i < targetSnapshots.length; i++) {
+                    String name = targetSnapshots[i].getPath().getName();
+                    if (sourceSnapshotNames.contains(name)) {
+                        return name;
+                    }
+                }
+                // If control reaches here,
+                // there are snapshots on target, but none are replicated from 
source. Return null.
+            } // No target snapshots, return null
+            return null;
+        } catch (IOException e) {
+            LOG.error("Unable to find latest snapshot on targetDir {} {}", 
targetDir, e.getMessage());
+            throw new FalconException("Unable to find latest snapshot on 
targetDir " + targetDir, e);
+        }
+    }
+
+    private String getStagingUri(String storageUrl, String dir) {
+        storageUrl = StringUtils.removeEnd(storageUrl, Path.SEPARATOR);
+        return storageUrl + Path.SEPARATOR + dir;
+    }
+
+    private String getSnapshotDir(String dirName) {
+        dirName = StringUtils.removeEnd(dirName, Path.SEPARATOR);
+        return dirName + Path.SEPARATOR + HdfsSnapshotUtil.SNAPSHOT_DIR_PREFIX 
+ Path.SEPARATOR;
+    }
+
+    protected CommandLine getCommand(String[] args) throws FalconException {
+        Options options = new Options();
+
+        Option opt = new 
Option(HdfsSnapshotMirrorProperties.DISTCP_MAX_MAPS.getName(),
+                true, "max number of maps to use for distcp");
+        opt.setRequired(true);
+        options.addOption(opt);
+        opt = new 
Option(HdfsSnapshotMirrorProperties.MAP_BANDWIDTH_IN_MB.getName(),
+                true, "Bandwidth in MB/s used by each mapper during 
replication");
+        opt.setRequired(true);
+        options.addOption(opt);
+
+        opt = new Option(HdfsSnapshotMirrorProperties.SOURCE_NN.getName(), 
true, "Source NN");
+        opt.setRequired(true);
+        options.addOption(opt);
+        opt = new 
Option(HdfsSnapshotMirrorProperties.SOURCE_EXEC_URL.getName(),
+                true, "Replication instance job Exec Url");
+        opt.setRequired(true);
+        options.addOption(opt);
+        opt = new 
Option(HdfsSnapshotMirrorProperties.SOURCE_NN_KERBEROS_PRINCIPAL.getName(),
+                true, "Replication instance job NN Kerberos Principal");
+        opt.setRequired(false);
+        options.addOption(opt);
+        opt = new 
Option(HdfsSnapshotMirrorProperties.SOURCE_SNAPSHOT_DIR.getName(),
+                true, "Source snapshot-able dir to replicate");
+        opt.setRequired(true);
+        options.addOption(opt);
+
+        opt = new Option(HdfsSnapshotMirrorProperties.TARGET_NN.getName(), 
true, "Target NN");
+        opt.setRequired(true);
+        options.addOption(opt);
+        opt = new 
Option(HdfsSnapshotMirrorProperties.TARGET_EXEC_URL.getName(),
+                true, "Replication instance target Exec Url");
+        opt.setRequired(true);
+        options.addOption(opt);
+        opt = new 
Option(HdfsSnapshotMirrorProperties.TARGET_NN_KERBEROS_PRINCIPAL.getName(),
+                true, "Replication instance target NN Kerberos Principal");
+        opt.setRequired(false);
+        options.addOption(opt);
+        opt = new 
Option(HdfsSnapshotMirrorProperties.TARGET_SNAPSHOT_DIR.getName(),
+                true, "Target snapshot-able dir to replicate");
+        opt.setRequired(true);
+        options.addOption(opt);
+
+        opt = new 
Option(HdfsSnapshotMirrorProperties.TDE_ENCRYPTION_ENABLED.getName(),
+                true, "Is TDE encryption enabled on dirs being replicated?");
+        opt.setRequired(false);
+        options.addOption(opt);
+
+        opt = new 
Option(HdfsSnapshotMirrorProperties.SNAPSHOT_JOB_NAME.getName(),
+                true, "Replication instance job name");
+        opt.setRequired(true);
+        options.addOption(opt);
+
+        try {
+            return new GnuParser().parse(options, args);
+        } catch (ParseException pe) {
+            LOG.info("Unabel to parse commad line arguments for 
HdfsSnapshotReplicator " + pe.getMessage());
+            throw  new FalconException(pe.getMessage());
+        }
+    }
+
+}

http://git-wip-us.apache.org/repos/asf/falcon/blob/aba79aae/addons/hdfs-snapshot-mirroring/src/main/java/org/apache/falcon/snapshots/retention/HdfsSnapshotEvictor.java
----------------------------------------------------------------------
diff --git 
a/addons/hdfs-snapshot-mirroring/src/main/java/org/apache/falcon/snapshots/retention/HdfsSnapshotEvictor.java
 
b/addons/hdfs-snapshot-mirroring/src/main/java/org/apache/falcon/snapshots/retention/HdfsSnapshotEvictor.java
new file mode 100644
index 0000000..22e3377
--- /dev/null
+++ 
b/addons/hdfs-snapshot-mirroring/src/main/java/org/apache/falcon/snapshots/retention/HdfsSnapshotEvictor.java
@@ -0,0 +1,208 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.falcon.snapshots.retention;
+
+import org.apache.commons.cli.CommandLine;
+import org.apache.commons.cli.GnuParser;
+import org.apache.commons.cli.Option;
+import org.apache.commons.cli.Options;
+import org.apache.falcon.FalconException;
+import 
org.apache.falcon.extensions.mirroring.hdfsSnapshot.HdfsSnapshotMirrorProperties;
+import org.apache.falcon.retention.EvictionHelper;
+import org.apache.falcon.snapshots.util.HdfsSnapshotUtil;
+import org.apache.falcon.workflow.util.OozieActionConfigurationHelper;
+import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.conf.Configured;
+import org.apache.hadoop.fs.FileStatus;
+import org.apache.hadoop.fs.Path;
+import org.apache.hadoop.hdfs.DistributedFileSystem;
+import org.apache.commons.lang3.StringUtils;
+import org.apache.hadoop.util.Tool;
+import org.apache.hadoop.util.ToolRunner;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import javax.servlet.jsp.el.ELException;
+import java.io.IOException;
+import java.util.Arrays;
+import java.util.Comparator;
+
+/**
+ * HDFS snapshot evictor.
+ */
+public class HdfsSnapshotEvictor extends Configured implements Tool {
+    private static final Logger LOG = 
LoggerFactory.getLogger(HdfsSnapshotEvictor.class);
+
+    public static void main(String[] args) throws Exception {
+        Configuration conf = OozieActionConfigurationHelper.createActionConf();
+        int ret = ToolRunner.run(conf, new HdfsSnapshotEvictor(), args);
+        if (ret != 0) {
+            throw new Exception("Unable to perform eviction action args: " + 
Arrays.toString(args));
+        }
+    }
+
+    @Override
+    public int run(String[] args) throws Exception {
+        CommandLine cmd = getCommand(args);
+        DistributedFileSystem sourceFs = 
HdfsSnapshotUtil.getSourceFileSystem(cmd);
+        DistributedFileSystem targetFs = 
HdfsSnapshotUtil.getTargetFileSystem(cmd);
+
+        String sourceDir = 
cmd.getOptionValue(HdfsSnapshotMirrorProperties.SOURCE_SNAPSHOT_DIR.getName());
+        String targetDir = 
cmd.getOptionValue(HdfsSnapshotMirrorProperties.TARGET_SNAPSHOT_DIR.getName());
+
+        // evict on source
+        String retPolicy = 
cmd.getOptionValue(HdfsSnapshotMirrorProperties.SOURCE_SNAPSHOT_RETENTION_POLICY.getName());
+        String ageLimit = cmd.getOptionValue(
+                
HdfsSnapshotMirrorProperties.SOURCE_SNAPSHOT_RETENTION_AGE_LIMIT.getName());
+        int numSnapshots = Integer.parseInt(
+                
cmd.getOptionValue(HdfsSnapshotMirrorProperties.SOURCE_SNAPSHOT_RETENTION_NUMBER.getName()));
+        if (retPolicy.equalsIgnoreCase("delete")) {
+            evictSnapshots(sourceFs, sourceDir, ageLimit, numSnapshots);
+        } else {
+            LOG.warn("Unsupported source retention policy {}", retPolicy);
+            throw new FalconException("Unsupported source retention policy " + 
retPolicy);
+        }
+
+        // evict on target
+        retPolicy = 
cmd.getOptionValue(HdfsSnapshotMirrorProperties.TARGET_SNAPSHOT_RETENTION_POLICY.getName());
+        ageLimit = cmd.getOptionValue(
+                
HdfsSnapshotMirrorProperties.TARGET_SNAPSHOT_RETENTION_AGE_LIMIT.getName());
+        numSnapshots = Integer.parseInt(
+                
cmd.getOptionValue(HdfsSnapshotMirrorProperties.TARGET_SNAPSHOT_RETENTION_NUMBER.getName()));
+        if (retPolicy.equalsIgnoreCase("delete")) {
+            evictSnapshots(targetFs, targetDir, ageLimit, numSnapshots);
+        } else {
+            LOG.warn("Unsupported target retention policy {}", retPolicy);
+            throw new FalconException("Unsupported target retention policy " + 
retPolicy);
+        }
+
+        LOG.info("Completed HDFS Snapshot Eviction.");
+        return 0;
+    }
+
+    protected static void evictSnapshots(DistributedFileSystem fs, String 
dirName, String ageLimit,
+                                         int numSnapshots) throws 
FalconException {
+        try {
+            LOG.info("Started evicting snapshots on dir {}{} using policy {}, 
agelimit {}, numSnapshot {}",
+                    fs.getUri(), dirName, ageLimit, numSnapshots);
+
+            long evictionTime = System.currentTimeMillis() - 
EvictionHelper.evalExpressionToMilliSeconds(ageLimit);
+
+            dirName = StringUtils.removeEnd(dirName, Path.SEPARATOR);
+            String snapshotDir = dirName + Path.SEPARATOR + 
HdfsSnapshotUtil.SNAPSHOT_DIR_PREFIX + Path.SEPARATOR;
+            FileStatus[] snapshots = fs.listStatus(new Path(snapshotDir));
+            if (snapshots.length <= numSnapshots) {
+                // no eviction needed
+                return;
+            }
+
+            // Sort by last modified time, ascending order.
+            Arrays.sort(snapshots, new Comparator<FileStatus>() {
+                @Override
+                public int compare(FileStatus f1, FileStatus f2) {
+                    return Long.compare(f1.getModificationTime(), 
f2.getModificationTime());
+                }
+            });
+
+            for (int i = 0; i < (snapshots.length - numSnapshots); i++) {
+                // delete if older than ageLimit while retaining numSnapshots
+                if (snapshots[i].getModificationTime() < evictionTime) {
+                    fs.deleteSnapshot(new Path(dirName), 
snapshots[i].getPath().getName());
+                }
+            }
+
+        } catch (ELException ele) {
+            LOG.warn("Unable to parse retention age limit {} {}", ageLimit, 
ele.getMessage());
+            throw new FalconException("Unable to parse retention age limit " + 
ageLimit, ele);
+        } catch (IOException ioe) {
+            LOG.warn("Unable to evict snapshots from dir {} {}", dirName, ioe);
+            throw new FalconException("Unable to evict snapshots from dir " + 
dirName, ioe);
+        }
+
+    }
+
+    private CommandLine getCommand(String[] args) throws 
org.apache.commons.cli.ParseException {
+        Options options = new Options();
+
+        Option opt = new 
Option(HdfsSnapshotMirrorProperties.SOURCE_NN.getName(), true, "Source 
Cluster");
+        opt.setRequired(true);
+        options.addOption(opt);
+        opt = new 
Option(HdfsSnapshotMirrorProperties.SOURCE_EXEC_URL.getName(),
+                true, "Replication instance job Exec Url");
+        opt.setRequired(true);
+        options.addOption(opt);
+        opt = new 
Option(HdfsSnapshotMirrorProperties.SOURCE_NN_KERBEROS_PRINCIPAL.getName(),
+                true, "Replication instance job NN Kerberos Principal");
+        opt.setRequired(false);
+        options.addOption(opt);
+        opt = new 
Option(HdfsSnapshotMirrorProperties.SOURCE_SNAPSHOT_DIR.getName(),
+                true, "Source snapshot-able dir to replicate");
+        opt.setRequired(true);
+        options.addOption(opt);
+
+        opt = new Option(HdfsSnapshotMirrorProperties.TARGET_NN.getName(), 
true, "Target Cluster");
+        opt.setRequired(true);
+        options.addOption(opt);
+        opt = new 
Option(HdfsSnapshotMirrorProperties.TARGET_SNAPSHOT_DIR.getName(),
+                true, "Target snapshot-able dir to replicate");
+        opt.setRequired(true);
+        options.addOption(opt);
+        opt = new 
Option(HdfsSnapshotMirrorProperties.TARGET_EXEC_URL.getName(),
+                true, "Replication instance target Exec Url");
+        opt.setRequired(true);
+        options.addOption(opt);
+        opt = new 
Option(HdfsSnapshotMirrorProperties.TARGET_NN_KERBEROS_PRINCIPAL.getName(),
+                true, "Replication instance target NN Kerberos Principal");
+        opt.setRequired(false);
+        options.addOption(opt);
+
+        opt = new 
Option(HdfsSnapshotMirrorProperties.SNAPSHOT_JOB_NAME.getName(), true,
+                "Replication instance job name");
+        opt.setRequired(true);
+        options.addOption(opt);
+
+        opt = new 
Option(HdfsSnapshotMirrorProperties.SOURCE_SNAPSHOT_RETENTION_POLICY.getName(),
+                true, "Source retention policy");
+        opt.setRequired(false);
+        options.addOption(opt);
+        opt = new 
Option(HdfsSnapshotMirrorProperties.SOURCE_SNAPSHOT_RETENTION_AGE_LIMIT.getName(),
+                true, "Source delete snapshots older than agelimit");
+        opt.setRequired(true);
+        options.addOption(opt);
+        opt = new 
Option(HdfsSnapshotMirrorProperties.SOURCE_SNAPSHOT_RETENTION_NUMBER.getName(),
+                true, "Source number of snapshots to retain");
+        opt.setRequired(true);
+        options.addOption(opt);
+
+        opt = new 
Option(HdfsSnapshotMirrorProperties.TARGET_SNAPSHOT_RETENTION_POLICY.getName(),
+                true, "Target retention policy");
+        opt.setRequired(false);
+        options.addOption(opt);
+        opt = new 
Option(HdfsSnapshotMirrorProperties.TARGET_SNAPSHOT_RETENTION_AGE_LIMIT.getName(),
+                true, "Target delete snapshots older than agelimit");
+        opt.setRequired(true);
+        options.addOption(opt);
+        opt = new 
Option(HdfsSnapshotMirrorProperties.TARGET_SNAPSHOT_RETENTION_NUMBER.getName(),
+                true, "Target number of snapshots to retain");
+        opt.setRequired(true);
+        options.addOption(opt);
+
+        return new GnuParser().parse(options, args);
+    }
+}

http://git-wip-us.apache.org/repos/asf/falcon/blob/aba79aae/addons/hdfs-snapshot-mirroring/src/main/java/org/apache/falcon/snapshots/util/HdfsSnapshotUtil.java
----------------------------------------------------------------------
diff --git 
a/addons/hdfs-snapshot-mirroring/src/main/java/org/apache/falcon/snapshots/util/HdfsSnapshotUtil.java
 
b/addons/hdfs-snapshot-mirroring/src/main/java/org/apache/falcon/snapshots/util/HdfsSnapshotUtil.java
new file mode 100644
index 0000000..5196791
--- /dev/null
+++ 
b/addons/hdfs-snapshot-mirroring/src/main/java/org/apache/falcon/snapshots/util/HdfsSnapshotUtil.java
@@ -0,0 +1,67 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.falcon.snapshots.util;
+
+import org.apache.commons.cli.CommandLine;
+import org.apache.falcon.FalconException;
+import org.apache.falcon.entity.ClusterHelper;
+import 
org.apache.falcon.extensions.mirroring.hdfsSnapshot.HdfsSnapshotMirrorProperties;
+import 
org.apache.falcon.extensions.mirroring.hdfsSnapshot.HdfsSnapshotMirroringExtension;
+import org.apache.falcon.hadoop.HadoopClientFactory;
+import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.hdfs.DistributedFileSystem;
+
+/**
+ * Util class for HDFS snapshot based mirroring.
+ */
+public final class HdfsSnapshotUtil {
+
+    public static final String SNAPSHOT_PREFIX = "falcon-snapshot-";
+    public static final String SNAPSHOT_DIR_PREFIX = ".snapshot";
+
+    private HdfsSnapshotUtil() {}
+
+    public static DistributedFileSystem getSourceFileSystem(CommandLine cmd) 
throws FalconException {
+        String sourceStorageUrl = 
cmd.getOptionValue(HdfsSnapshotMirrorProperties.SOURCE_NN.getName());
+        String sourceExecuteEndpoint = 
cmd.getOptionValue(HdfsSnapshotMirrorProperties.SOURCE_EXEC_URL.getName());
+        String sourcePrincipal = parseKerberosPrincipal(cmd.getOptionValue(
+                
HdfsSnapshotMirrorProperties.SOURCE_NN_KERBEROS_PRINCIPAL.getName()));
+        Configuration sourceConf = 
ClusterHelper.getConfiguration(sourceStorageUrl,
+                sourceExecuteEndpoint, sourcePrincipal);
+        return 
HadoopClientFactory.get().createDistributedProxiedFileSystem(sourceConf);
+    }
+
+    public static DistributedFileSystem getTargetFileSystem(CommandLine cmd) 
throws FalconException {
+        String targetStorageUrl = 
cmd.getOptionValue(HdfsSnapshotMirrorProperties.TARGET_NN.getName());
+        String taregtExecuteEndpoint = 
cmd.getOptionValue(HdfsSnapshotMirrorProperties.TARGET_EXEC_URL.getName());
+        String targetPrincipal = parseKerberosPrincipal(cmd.getOptionValue(
+                
HdfsSnapshotMirrorProperties.TARGET_NN_KERBEROS_PRINCIPAL.getName()));
+
+        Configuration targetConf = 
ClusterHelper.getConfiguration(targetStorageUrl,
+                taregtExecuteEndpoint, targetPrincipal);
+        return 
HadoopClientFactory.get().createDistributedProxiedFileSystem(targetConf);
+    }
+
+    public static String parseKerberosPrincipal(String principal) {
+        if 
(principal.equals(HdfsSnapshotMirroringExtension.EMPTY_KERBEROS_PRINCIPAL)) {
+            return null;
+        }
+        return principal;
+    }
+}

http://git-wip-us.apache.org/repos/asf/falcon/blob/aba79aae/addons/hdfs-snapshot-mirroring/src/test/java/org/apache/falcon/snapshots/replication/HdfsSnapshotReplicatorTest.java
----------------------------------------------------------------------
diff --git 
a/addons/hdfs-snapshot-mirroring/src/test/java/org/apache/falcon/snapshots/replication/HdfsSnapshotReplicatorTest.java
 
b/addons/hdfs-snapshot-mirroring/src/test/java/org/apache/falcon/snapshots/replication/HdfsSnapshotReplicatorTest.java
new file mode 100644
index 0000000..7924214
--- /dev/null
+++ 
b/addons/hdfs-snapshot-mirroring/src/test/java/org/apache/falcon/snapshots/replication/HdfsSnapshotReplicatorTest.java
@@ -0,0 +1,163 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.falcon.snapshots.replication;
+
+import org.apache.falcon.FalconException;
+import org.apache.falcon.cluster.util.MiniHdfsClusterUtil;
+import org.apache.falcon.entity.ClusterHelper;
+import org.apache.falcon.entity.store.ConfigurationStore;
+import org.apache.falcon.entity.v0.EntityType;
+import org.apache.falcon.entity.v0.cluster.Cluster;
+import 
org.apache.falcon.extensions.mirroring.hdfsSnapshot.HdfsSnapshotMirrorProperties;
+import org.apache.falcon.hadoop.HadoopClientFactory;
+import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.fs.Path;
+import org.apache.hadoop.fs.permission.FsAction;
+import org.apache.hadoop.fs.permission.FsPermission;
+import org.apache.hadoop.hdfs.DistributedFileSystem;
+import org.apache.hadoop.hdfs.MiniDFSCluster;
+import org.testng.Assert;
+import org.testng.annotations.AfterClass;
+import org.testng.annotations.BeforeClass;
+import org.testng.annotations.Test;
+
+import java.io.File;
+import java.io.InputStream;
+import java.nio.file.Files;
+
+/**
+ * Hdfs Snapshot replicator unit tests.
+ */
+public class HdfsSnapshotReplicatorTest extends HdfsSnapshotReplicator {
+    private MiniDFSCluster miniDFSCluster;
+    private DistributedFileSystem miniDfs;
+    private File baseDir;
+    private Cluster sourceCluster;
+    private Cluster targetCluster;
+    private String sourceStorageUrl;
+    private String targetStorageUrl;
+    private Path sourceDir = new 
Path("/apps/falcon/snapshot-replication/sourceDir/");
+    private Path targetDir = new 
Path("/apps/falcon/snapshot-replication/targetDir/");
+
+    private FsPermission fsPermission = new FsPermission(FsAction.ALL, 
FsAction.ALL, FsAction.ALL);
+
+    private String[] args = {"--" + 
HdfsSnapshotMirrorProperties.DISTCP_MAX_MAPS.getName(), "1",
+        "--" + HdfsSnapshotMirrorProperties.MAP_BANDWIDTH_IN_MB.getName(), 
"100",
+        "--" + HdfsSnapshotMirrorProperties.SOURCE_NN.getName(), 
"hdfs://localhost:54136",
+        "--" + HdfsSnapshotMirrorProperties.SOURCE_EXEC_URL.getName(), 
"localhost:8021",
+        "--" + HdfsSnapshotMirrorProperties.TARGET_EXEC_URL.getName(), 
"localhost:8021",
+        "--" + HdfsSnapshotMirrorProperties.TARGET_NN.getName(), 
"hdfs://localhost:54136",
+        "--" + HdfsSnapshotMirrorProperties.SOURCE_SNAPSHOT_DIR.getName(),
+        "/apps/falcon/snapshot-replication/sourceDir/",
+        "--" + HdfsSnapshotMirrorProperties.TARGET_SNAPSHOT_DIR.getName(),
+        "/apps/falcon/snapshot-replication/targetDir/",
+        "--" + HdfsSnapshotMirrorProperties.TDE_ENCRYPTION_ENABLED.getName(), 
"false",
+        "--" + HdfsSnapshotMirrorProperties.SNAPSHOT_JOB_NAME.getName(), 
"snapshotJobName", };
+
+    @BeforeClass
+    public void init() throws Exception {
+        baseDir = 
Files.createTempDirectory("test_snapshot-replication").toFile().getAbsoluteFile();
+        miniDFSCluster = 
MiniHdfsClusterUtil.initMiniDfs(MiniHdfsClusterUtil.SNAPSHOT_REPL_TEST_PORT, 
baseDir);
+        miniDfs = miniDFSCluster.getFileSystem();
+
+        sourceCluster = initCluster("/primary-cluster-0.1.xml");
+        targetCluster = initCluster("/backup-cluster-0.1.xml");
+
+        miniDfs.mkdirs(sourceDir, fsPermission);
+        miniDfs.mkdirs(targetDir, fsPermission);
+
+        miniDfs.allowSnapshot(sourceDir);
+        miniDfs.allowSnapshot(targetDir);
+
+        cmd = getCommand(args);
+        
Assert.assertEquals(cmd.getOptionValue(HdfsSnapshotMirrorProperties.DISTCP_MAX_MAPS.getName()),
 "1");
+        
Assert.assertEquals(cmd.getOptionValue(HdfsSnapshotMirrorProperties.MAP_BANDWIDTH_IN_MB.getName()),
 "100");
+
+    }
+
+    private Cluster initCluster(String clusterName) throws Exception {
+        InputStream inputStream = getClass().getResourceAsStream(clusterName);
+        Cluster cluster = (Cluster) 
EntityType.CLUSTER.getUnmarshaller().unmarshal(inputStream);
+        ConfigurationStore.get().publish(EntityType.CLUSTER, cluster);
+        return cluster;
+    }
+
+    @Test
+    public void replicationTest() throws Exception {
+        Configuration sourceConf = 
ClusterHelper.getConfiguration(sourceCluster);
+        this.setConf(sourceConf);
+        Configuration targetConf = 
ClusterHelper.getConfiguration(targetCluster);
+        sourceStorageUrl = ClusterHelper.getStorageUrl(sourceCluster);
+        targetStorageUrl = ClusterHelper.getStorageUrl(targetCluster);
+
+        DistributedFileSystem sourceFs = 
HadoopClientFactory.get().createDistributedProxiedFileSystem(sourceConf);
+        DistributedFileSystem targetFs = 
HadoopClientFactory.get().createDistributedProxiedFileSystem(targetConf);
+
+        // create dir1, create snapshot, invoke copy, check file in target, 
create snapshot on target
+        Path dir1 = new Path(sourceDir, "dir1");
+        miniDfs.mkdir(dir1, fsPermission);
+        miniDfs.createSnapshot(sourceDir, "snapshot1");
+        invokeCopy(sourceStorageUrl, targetStorageUrl, sourceFs, targetFs,
+                sourceDir.toString(), targetDir.toString(), "snapshot1");
+        miniDfs.createSnapshot(targetDir, "snapshot1");
+        Assert.assertTrue(miniDfs.exists(new Path(targetDir, "dir1")));
+
+        // create dir2, create snapshot, invoke copy, check dir in target, 
create snapshot on target
+        Path dir2 = new Path(sourceDir, "dir2");
+        miniDfs.mkdir(dir2, fsPermission);
+        miniDfs.createSnapshot(sourceDir, "snapshot2");
+        invokeCopy(sourceStorageUrl, targetStorageUrl, sourceFs, targetFs,
+                sourceDir.toString(), targetDir.toString(), "snapshot2");
+        miniDfs.createSnapshot(targetDir, "snapshot2");
+        Assert.assertTrue(miniDfs.exists(new Path(targetDir, "dir1")));
+        Assert.assertTrue(miniDfs.exists(new Path(targetDir, "dir2")));
+
+        // delete dir1, create snapshot, invoke copy, check file not in target
+        miniDfs.delete(dir1, true);
+        miniDfs.createSnapshot(sourceDir, "snapshot3");
+        invokeCopy(sourceStorageUrl, targetStorageUrl, sourceFs, targetFs,
+                sourceDir.toString(), targetDir.toString(), "snapshot3");
+        miniDfs.createSnapshot(targetDir, "snapshot3");
+        Assert.assertFalse(miniDfs.exists(new Path(targetDir, "dir1")));
+        Assert.assertTrue(miniDfs.exists(new Path(targetDir, "dir2")));
+    }
+
+    @Test(dependsOnMethods = "replicationTest",
+            expectedExceptions = FalconException.class,
+            expectedExceptionsMessageRegExp = "Unable to find latest snapshot 
on targetDir "
+                    + "/apps/falcon/snapshot-replication/targetDir")
+    public void removeSnapshotabilityOnTargetTest()  throws Exception {
+        // remove snapshotability on target, create snapshot on source, invoke 
copy should fail
+        miniDfs.deleteSnapshot(targetDir, "snapshot1");
+        miniDfs.deleteSnapshot(targetDir, "snapshot2");
+        miniDfs.deleteSnapshot(targetDir, "snapshot3");
+
+        miniDfs.disallowSnapshot(targetDir);
+        Path dir1 = new Path(sourceDir, "dir4");
+        miniDfs.mkdir(dir1, fsPermission);
+        miniDfs.createSnapshot(sourceDir, "snapshot4");
+        invokeCopy(sourceStorageUrl, targetStorageUrl, miniDfs, miniDfs,
+                sourceDir.toString(), targetDir.toString(), "snapshot4");
+    }
+
+    @AfterClass
+    public void cleanup() throws Exception {
+        MiniHdfsClusterUtil.cleanupDfs(miniDFSCluster, baseDir);
+    }
+}

http://git-wip-us.apache.org/repos/asf/falcon/blob/aba79aae/addons/hdfs-snapshot-mirroring/src/test/java/org/apache/falcon/snapshots/retention/HdfsSnapshotEvictorTest.java
----------------------------------------------------------------------
diff --git 
a/addons/hdfs-snapshot-mirroring/src/test/java/org/apache/falcon/snapshots/retention/HdfsSnapshotEvictorTest.java
 
b/addons/hdfs-snapshot-mirroring/src/test/java/org/apache/falcon/snapshots/retention/HdfsSnapshotEvictorTest.java
new file mode 100644
index 0000000..a73c399
--- /dev/null
+++ 
b/addons/hdfs-snapshot-mirroring/src/test/java/org/apache/falcon/snapshots/retention/HdfsSnapshotEvictorTest.java
@@ -0,0 +1,98 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.falcon.snapshots.retention;
+
+import org.apache.falcon.FalconException;
+import org.apache.falcon.cluster.util.MiniHdfsClusterUtil;
+import org.apache.hadoop.fs.FileStatus;
+import org.apache.hadoop.fs.Path;
+import org.apache.hadoop.fs.permission.FsAction;
+import org.apache.hadoop.fs.permission.FsPermission;
+import org.apache.hadoop.hdfs.DistributedFileSystem;
+import org.apache.hadoop.hdfs.MiniDFSCluster;
+import org.testng.Assert;
+import org.testng.annotations.AfterClass;
+import org.testng.annotations.BeforeClass;
+import org.testng.annotations.Test;
+
+import java.io.File;
+import java.nio.file.Files;
+
+/**
+ * Hdfs snapshot evictor unit tests.
+ */
+public class HdfsSnapshotEvictorTest extends HdfsSnapshotEvictor {
+
+    private static final int NUM_FILES = 7;
+    private MiniDFSCluster miniDFSCluster;
+    private DistributedFileSystem miniDfs;
+    private File baseDir;
+    private Path evictionDir = new Path("/apps/falcon/snapshot-eviction/");
+    private FsPermission fsPermission = new FsPermission(FsAction.ALL, 
FsAction.ALL, FsAction.ALL);
+
+    @BeforeClass
+    public void init() throws Exception {
+        baseDir = 
Files.createTempDirectory("test_snapshot-eviction_hdfs").toFile().getAbsoluteFile();
+        miniDFSCluster = 
MiniHdfsClusterUtil.initMiniDfs(MiniHdfsClusterUtil.SNAPSHOT_EVICTION_TEST_PORT,
 baseDir);
+        miniDfs = miniDFSCluster.getFileSystem();
+        miniDfs.mkdirs(evictionDir, fsPermission);
+        miniDfs.allowSnapshot(evictionDir);
+        createSnapshotsForEviction();
+    }
+
+    private void createSnapshotsForEviction() throws Exception {
+        for (int i = 0; i < NUM_FILES; i++) {
+            miniDfs.createSnapshot(evictionDir, String.valueOf(i));
+            Thread.sleep(10000);
+        }
+    }
+
+    @Test
+    public void evictionTest() throws Exception {
+        Path snapshotDir = new Path(evictionDir, ".snapshot");
+        FileStatus[] fileStatuses = miniDfs.listStatus(snapshotDir);
+        Assert.assertEquals(fileStatuses.length, NUM_FILES);
+
+        evictSnapshots(miniDfs, evictionDir.toString(), "minutes(1)", 
NUM_FILES + 1);
+        fileStatuses = miniDfs.listStatus(snapshotDir);
+        Assert.assertEquals(fileStatuses.length, NUM_FILES);
+
+        evictSnapshots(miniDfs, evictionDir.toString(), "minutes(1)", 
NUM_FILES - 1);
+        fileStatuses = miniDfs.listStatus(snapshotDir);
+        Assert.assertEquals(fileStatuses.length, NUM_FILES - 1);
+
+        evictSnapshots(miniDfs, evictionDir.toString(), "minutes(1)", 2);
+        fileStatuses = miniDfs.listStatus(snapshotDir);
+        Assert.assertTrue(fileStatuses.length >= 5);
+    }
+
+    @Test(expectedExceptions = FalconException.class,
+            expectedExceptionsMessageRegExp = "Unable to evict snapshots from 
dir /apps/falcon/non-snapshot-eviction")
+    public void evictionTestCannotSnapshot() throws Exception {
+        Path nonSnapshotDir = new Path("/apps/falcon/non-snapshot-eviction/");
+        miniDfs.mkdirs(nonSnapshotDir, fsPermission);
+        evictSnapshots(miniDfs, nonSnapshotDir.toString(), "minutes(1)", 
NUM_FILES);
+    }
+
+    @AfterClass
+    public void cleanup() throws Exception {
+        MiniHdfsClusterUtil.cleanupDfs(miniDFSCluster, baseDir);
+
+    }
+}

http://git-wip-us.apache.org/repos/asf/falcon/blob/aba79aae/addons/hdfs-snapshot-mirroring/src/test/resources/backup-cluster-0.1.xml
----------------------------------------------------------------------
diff --git 
a/addons/hdfs-snapshot-mirroring/src/test/resources/backup-cluster-0.1.xml 
b/addons/hdfs-snapshot-mirroring/src/test/resources/backup-cluster-0.1.xml
new file mode 100644
index 0000000..f89237e
--- /dev/null
+++ b/addons/hdfs-snapshot-mirroring/src/test/resources/backup-cluster-0.1.xml
@@ -0,0 +1,44 @@
+<?xml version="1.0"?>
+<!--
+  ~ Licensed to the Apache Software Foundation (ASF) under one
+  ~ or more contributor license agreements.  See the NOTICE file
+  ~ distributed with this work for additional information
+  ~ regarding copyright ownership.  The ASF licenses this file
+  ~ to you under the Apache License, Version 2.0 (the
+  ~ "License"); you may not use this file except in compliance
+  ~ with the License.  You may obtain a copy of the License at
+  ~
+  ~     http://www.apache.org/licenses/LICENSE-2.0
+  ~
+  ~ Unless required by applicable law or agreed to in writing, software
+  ~ distributed under the License is distributed on an "AS IS" BASIS,
+  ~ WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+  ~ See the License for the specific language governing permissions and
+  ~ limitations under the License.
+  -->
+
+<cluster colo="gs" description="" name="backupSnapshotRepl" 
xmlns="uri:falcon:cluster:0.1"
+        >
+    <interfaces>
+        <interface type="readonly" endpoint="hftp://localhost:50010";
+                   version="0.20.2"/>
+        <interface type="write" endpoint="hdfs://localhost:54136"
+                   version="0.20.2"/>
+        <interface type="execute" endpoint="localhost:8021" version="0.20.2"/>
+        <interface type="workflow" endpoint="http://localhost:11000/oozie/";
+                   version="4.0"/>
+        <interface type="messaging" 
endpoint="tcp://localhost:61616?daemon=true"
+                   version="5.1.6"/>
+        <interface type="registry" endpoint="Hcat" version="1"/>
+    </interfaces>
+    <locations>
+        <location name="staging" path="/projects/falcon/staging"/>
+        <location name="temp" path="/tmp"/>
+        <location name="working" path="/projects/falcon/working"/>
+    </locations>
+    <properties>
+        <property name="field1" value="value1"/>
+        <property name="field2" value="value2"/>
+        <property name="hive.metastore.client.socket.timeout" value="20"/>
+    </properties>
+</cluster>
\ No newline at end of file

http://git-wip-us.apache.org/repos/asf/falcon/blob/aba79aae/addons/hdfs-snapshot-mirroring/src/test/resources/primary-cluster-0.1.xml
----------------------------------------------------------------------
diff --git 
a/addons/hdfs-snapshot-mirroring/src/test/resources/primary-cluster-0.1.xml 
b/addons/hdfs-snapshot-mirroring/src/test/resources/primary-cluster-0.1.xml
new file mode 100644
index 0000000..30c6242
--- /dev/null
+++ b/addons/hdfs-snapshot-mirroring/src/test/resources/primary-cluster-0.1.xml
@@ -0,0 +1,43 @@
+<?xml version="1.0"?>
+<!--
+  ~ Licensed to the Apache Software Foundation (ASF) under one
+  ~ or more contributor license agreements.  See the NOTICE file
+  ~ distributed with this work for additional information
+  ~ regarding copyright ownership.  The ASF licenses this file
+  ~ to you under the Apache License, Version 2.0 (the
+  ~ "License"); you may not use this file except in compliance
+  ~ with the License.  You may obtain a copy of the License at
+  ~
+  ~     http://www.apache.org/licenses/LICENSE-2.0
+  ~
+  ~ Unless required by applicable law or agreed to in writing, software
+  ~ distributed under the License is distributed on an "AS IS" BASIS,
+  ~ WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+  ~ See the License for the specific language governing permissions and
+  ~ limitations under the License.
+  -->
+
+<cluster colo="gs" description="" name="primarySnapshotRepl" 
xmlns="uri:falcon:cluster:0.1">
+    <interfaces>
+        <interface type="readonly" endpoint="hftp://localhost:50010";
+                   version="0.20.2"/>
+        <interface type="write" endpoint="hdfs://localhost:54136"
+                   version="0.20.2"/>
+        <interface type="execute" endpoint="localhost:8021" version="0.20.2"/>
+        <interface type="workflow" endpoint="http://localhost:11000/oozie/";
+                   version="4.0"/>
+        <interface type="messaging" 
endpoint="tcp://localhost:61616?daemon=true"
+                   version="5.1.6"/>
+        <interface type="registry" endpoint="Hcat" version="1"/>
+    </interfaces>
+    <locations>
+        <location name="staging" path="/projects/falcon/staging"/>
+        <location name="temp" path="/tmp"/>
+        <location name="working" path="/projects/falcon/working"/>
+    </locations>
+    <properties>
+        <property name="field1" value="value1"/>
+        <property name="field2" value="value2"/>
+        <property name="hive.metastore.client.socket.timeout" value="20"/>
+    </properties>
+</cluster>
\ No newline at end of file

http://git-wip-us.apache.org/repos/asf/falcon/blob/aba79aae/common/src/main/java/org/apache/falcon/entity/ClusterHelper.java
----------------------------------------------------------------------
diff --git a/common/src/main/java/org/apache/falcon/entity/ClusterHelper.java 
b/common/src/main/java/org/apache/falcon/entity/ClusterHelper.java
index 24ba7d7..9d79742 100644
--- a/common/src/main/java/org/apache/falcon/entity/ClusterHelper.java
+++ b/common/src/main/java/org/apache/falcon/entity/ClusterHelper.java
@@ -18,6 +18,7 @@
 
 package org.apache.falcon.entity;
 
+import org.apache.commons.lang3.StringUtils;
 import org.apache.falcon.FalconException;
 import org.apache.falcon.entity.store.ConfigurationStore;
 import org.apache.falcon.entity.v0.EntityType;
@@ -28,6 +29,7 @@ import org.apache.falcon.entity.v0.cluster.Interfacetype;
 import org.apache.falcon.entity.v0.cluster.Location;
 import org.apache.falcon.entity.v0.cluster.Property;
 import org.apache.falcon.hadoop.HadoopClientFactory;
+import org.apache.falcon.security.SecurityUtil;
 import org.apache.hadoop.conf.Configuration;
 import org.apache.hadoop.fs.Path;
 
@@ -71,6 +73,18 @@ public final class ClusterHelper {
         return conf;
     }
 
+    public static Configuration getConfiguration(String storageUrl, String 
executeEndPoint,
+                                                 String kerberosPrincipal) {
+        Configuration conf = new Configuration();
+        conf.set(HadoopClientFactory.FS_DEFAULT_NAME_KEY, storageUrl);
+        conf.set(HadoopClientFactory.MR_JT_ADDRESS_KEY, executeEndPoint);
+        conf.set(HadoopClientFactory.YARN_RM_ADDRESS_KEY, executeEndPoint);
+        if (StringUtils.isNotBlank(kerberosPrincipal)) {
+            conf.set(SecurityUtil.NN_PRINCIPAL, kerberosPrincipal);
+        }
+        return conf;
+    }
+
     public static String getOozieUrl(Cluster cluster) {
         return getInterface(cluster, Interfacetype.WORKFLOW).getEndpoint();
     }

http://git-wip-us.apache.org/repos/asf/falcon/blob/aba79aae/common/src/main/java/org/apache/falcon/hadoop/HadoopClientFactory.java
----------------------------------------------------------------------
diff --git 
a/common/src/main/java/org/apache/falcon/hadoop/HadoopClientFactory.java 
b/common/src/main/java/org/apache/falcon/hadoop/HadoopClientFactory.java
index e970439..3d6b16b 100644
--- a/common/src/main/java/org/apache/falcon/hadoop/HadoopClientFactory.java
+++ b/common/src/main/java/org/apache/falcon/hadoop/HadoopClientFactory.java
@@ -29,6 +29,7 @@ import org.apache.hadoop.fs.FileSystem;
 import org.apache.hadoop.fs.Path;
 import org.apache.hadoop.fs.permission.FsAction;
 import org.apache.hadoop.fs.permission.FsPermission;
+import org.apache.hadoop.hdfs.DistributedFileSystem;
 import org.apache.hadoop.mapred.JobClient;
 import org.apache.hadoop.mapred.JobConf;
 import org.apache.hadoop.security.UserGroupInformation;
@@ -132,6 +133,28 @@ public final class HadoopClientFactory {
         }
     }
 
+    /**
+     * Return a DistributedFileSystem created with the authenticated proxy 
user for the specified conf.
+     *
+     * @param conf Configuration with all necessary information to create the 
FileSystem.
+     * @return DistributedFileSystem created with the provided proxyUser/group.
+     * @throws org.apache.falcon.FalconException
+     *          if the filesystem could not be created.
+     */
+    public DistributedFileSystem createDistributedProxiedFileSystem(final 
Configuration conf) throws FalconException {
+        Validate.notNull(conf, "configuration cannot be null");
+
+        String nameNode = getNameNode(conf);
+        try {
+            return createDistributedFileSystem(CurrentUser.getProxyUGI(), new 
URI(nameNode), conf);
+        } catch (URISyntaxException e) {
+            throw new FalconException("Exception while getting Distributed 
FileSystem for: " + nameNode, e);
+        } catch (IOException e) {
+            throw new FalconException("Exception while getting Distributed 
FileSystem for proxy: "
+                    + CurrentUser.getUser(), e);
+        }
+    }
+
     private static String getNameNode(Configuration conf) {
         return conf.get(FS_DEFAULT_NAME_KEY);
     }
@@ -172,19 +195,7 @@ public final class HadoopClientFactory {
     @SuppressWarnings("ResultOfMethodCallIgnored")
     public FileSystem createFileSystem(UserGroupInformation ugi, final URI uri,
                                        final Configuration conf) throws 
FalconException {
-        Validate.notNull(ugi, "ugi cannot be null");
-        Validate.notNull(conf, "configuration cannot be null");
-
-        try {
-            if (UserGroupInformation.isSecurityEnabled()) {
-                ugi.checkTGTAndReloginFromKeytab();
-            }
-        } catch (IOException ioe) {
-            throw new FalconException("Exception while getting FileSystem. 
Unable to check TGT for user "
-                    + ugi.getShortUserName(), ioe);
-        }
-
-        validateNameNode(uri, conf);
+        validateInputs(ugi, uri, conf);
 
         try {
             // prevent falcon impersonating falcon, no need to use doas
@@ -201,13 +212,65 @@ public final class HadoopClientFactory {
                     return FileSystem.get(uri, conf);
                 }
             });
-        } catch (InterruptedException ex) {
+        } catch (InterruptedException | IOException ex) {
             throw new FalconException("Exception creating FileSystem:" + 
ex.getMessage(), ex);
-        } catch (IOException ex) {
+        }
+    }
+
+    /**
+     * Return a DistributedFileSystem created with the provided user for the 
specified URI.
+     *
+     * @param ugi user group information
+     * @param uri  file system URI.
+     * @param conf Configuration with all necessary information to create the 
FileSystem.
+     * @return DistributedFileSystem created with the provided user/group.
+     * @throws org.apache.falcon.FalconException
+     *          if the filesystem could not be created.
+     */
+    @SuppressWarnings("ResultOfMethodCallIgnored")
+    public DistributedFileSystem 
createDistributedFileSystem(UserGroupInformation ugi, final URI uri,
+                                                             final 
Configuration conf) throws FalconException {
+        validateInputs(ugi, uri, conf);
+        FileSystem returnFs;
+        try {
+            // prevent falcon impersonating falcon, no need to use doas
+            final String proxyUserName = ugi.getShortUserName();
+            if 
(proxyUserName.equals(UserGroupInformation.getLoginUser().getShortUserName())) {
+                LOG.info("Creating Distributed FS for the login user {}, 
impersonation not required",
+                        proxyUserName);
+                returnFs = DistributedFileSystem.get(uri, conf);
+            } else {
+                LOG.info("Creating FS impersonating user {}", proxyUserName);
+                returnFs = ugi.doAs(new 
PrivilegedExceptionAction<FileSystem>() {
+                    public FileSystem run() throws Exception {
+                        return DistributedFileSystem.get(uri, conf);
+                    }
+                });
+            }
+
+            return (DistributedFileSystem) returnFs;
+        } catch (InterruptedException | IOException ex) {
             throw new FalconException("Exception creating FileSystem:" + 
ex.getMessage(), ex);
         }
     }
 
+    private void validateInputs(UserGroupInformation ugi, final URI uri,
+                                final Configuration conf) throws 
FalconException {
+        Validate.notNull(ugi, "ugi cannot be null");
+        Validate.notNull(conf, "configuration cannot be null");
+
+        try {
+            if (UserGroupInformation.isSecurityEnabled()) {
+                ugi.checkTGTAndReloginFromKeytab();
+            }
+        } catch (IOException ioe) {
+            throw new FalconException("Exception while getting FileSystem. 
Unable to check TGT for user "
+                    + ugi.getShortUserName(), ioe);
+        }
+
+        validateNameNode(uri, conf);
+    }
+
     /**
      * This method validates if the execute url is able to reach the MR 
endpoint.
      *

Reply via email to