FALCON-1861 Support HDFS Snapshot based replication in Falcon Documentation will be added in Jira FALCON-1908
Author: bvellanki <[email protected]> Reviewers: "Sowmya <[email protected]>, sandeepSamudrala <[email protected]>, Ying Zheng <[email protected]>, Venkat Ranganathan <[email protected]>" Closes #105 from bvellanki/master Project: http://git-wip-us.apache.org/repos/asf/falcon/repo Commit: http://git-wip-us.apache.org/repos/asf/falcon/commit/aba79aae Tree: http://git-wip-us.apache.org/repos/asf/falcon/tree/aba79aae Diff: http://git-wip-us.apache.org/repos/asf/falcon/diff/aba79aae Branch: refs/heads/master Commit: aba79aae2c7235f065413fe0c9f0af5077079371 Parents: 7c0481e Author: bvellanki <[email protected]> Authored: Thu Apr 21 16:48:16 2016 -0700 Committer: bvellanki <[email protected]> Committed: Thu Apr 21 16:48:16 2016 -0700 ---------------------------------------------------------------------- .../extensions/hdfs-snapshot-mirroring/README | 31 +++ .../extensions/hdfs-snapshot-mirroring/pom.xml | 32 +++ .../hdfs-snapshot-mirroring-properties.json | 173 ++++++++++++ .../hdfs-snapshot-mirroring-template.xml | 45 +++ .../hdfs-snapshot-mirroring-workflow.xml | 172 ++++++++++++ addons/hdfs-snapshot-mirroring/README | 31 +++ addons/hdfs-snapshot-mirroring/pom.xml | 188 +++++++++++++ .../replication/HdfsSnapshotReplicator.java | 277 +++++++++++++++++++ .../retention/HdfsSnapshotEvictor.java | 208 ++++++++++++++ .../falcon/snapshots/util/HdfsSnapshotUtil.java | 67 +++++ .../replication/HdfsSnapshotReplicatorTest.java | 163 +++++++++++ .../retention/HdfsSnapshotEvictorTest.java | 98 +++++++ .../src/test/resources/backup-cluster-0.1.xml | 44 +++ .../src/test/resources/primary-cluster-0.1.xml | 43 +++ .../org/apache/falcon/entity/ClusterHelper.java | 14 + .../falcon/hadoop/HadoopClientFactory.java | 93 ++++++- .../apache/falcon/retention/EvictionHelper.java | 7 +- common/src/main/resources/startup.properties | 2 +- .../falcon/retention/EvictionHelperTest.java | 35 +++ extensions/pom.xml | 29 ++ .../falcon/extensions/AbstractExtension.java | 3 + .../HdfsSnapshotMirrorProperties.java | 84 ++++++ .../HdfsSnapshotMirroringExtension.java | 234 ++++++++++++++++ .../util/ExtensionProcessBuilderUtils.java | 2 +- .../apache/falcon/extensions/ExtensionTest.java | 165 ++++++++++- .../extensions/store/ExtensionStoreTest.java | 6 +- .../src/test/resources/backup-cluster-0.1.xml | 3 +- .../hdfs-snapshot-mirroring-template.xml | 45 +++ .../src/test/resources/primary-cluster-0.1.xml | 2 +- pom.xml | 43 ++- scheduler/src/test/resources/startup.properties | 2 +- src/conf/startup.properties | 2 +- src/main/assemblies/assembly-standalone.xml | 40 +++ src/main/assemblies/distributed-package.xml | 40 +++ src/main/assemblies/standalone-package.xml | 40 +++ test-util/pom.xml | 6 + .../cluster/util/MiniHdfsClusterUtil.java | 52 ++++ unit/pom.xml | 24 +- .../unit/LocalFalconRPCClientFactory.java | 16 +- unit/src/main/resources/startup.properties | 2 +- webapp/pom.xml | 10 + webapp/src/test/resources/startup.properties | 2 +- 42 files changed, 2537 insertions(+), 38 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/falcon/blob/aba79aae/addons/extensions/hdfs-snapshot-mirroring/README ---------------------------------------------------------------------- diff --git a/addons/extensions/hdfs-snapshot-mirroring/README b/addons/extensions/hdfs-snapshot-mirroring/README new file mode 100644 index 0000000..fc33d3a --- /dev/null +++ b/addons/extensions/hdfs-snapshot-mirroring/README @@ -0,0 +1,31 @@ +# Licensed to the Apache Software Foundation (ASF) under one +# or more contributor license agreements. See the NOTICE file +# distributed with this work for additional information +# regarding copyright ownership. The ASF licenses this file +# to you under the Apache License, Version 2.0 (the +# "License"); you may not use this file except in compliance +# with the License. You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, +# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +# See the License for the specific language governing permissions and +# limitations under the License. + +HDFS Snapshot Mirroring Extension. + +Overview +This extension implements replication for snapshot-able directories on HDFS from one +Hadoop cluster to another. This piggybacks on snapshot solution supported in HDFS (HDFS-7535). +It also performs retention on the snapshots generated in source and target. + +Use Case +* Create snapshots in source directory +* Copy this directory between HDFS clusters +* Create snapshot in target directory +* Handle snapshot retention in source and target directories + +Limitations +If TDE encryption is enabled, this snapshot based replication is not efficient. \ No newline at end of file http://git-wip-us.apache.org/repos/asf/falcon/blob/aba79aae/addons/extensions/hdfs-snapshot-mirroring/pom.xml ---------------------------------------------------------------------- diff --git a/addons/extensions/hdfs-snapshot-mirroring/pom.xml b/addons/extensions/hdfs-snapshot-mirroring/pom.xml new file mode 100644 index 0000000..b0b4819 --- /dev/null +++ b/addons/extensions/hdfs-snapshot-mirroring/pom.xml @@ -0,0 +1,32 @@ +<?xml version="1.0" encoding="UTF-8"?> + +<!-- + Licensed to the Apache Software Foundation (ASF) under one + or more contributor license agreements. See the NOTICE file + distributed with this work for additional information + regarding copyright ownership. The ASF licenses this file + to you under the Apache License, Version 2.0 (the + "License"); you may not use this file except in compliance + with the License. You may obtain a copy of the License at + + http://www.apache.org/licenses/LICENSE-2.0 + + Unless required by applicable law or agreed to in writing, software + distributed under the License is distributed on an "AS IS" BASIS, + WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + See the License for the specific language governing permissions and + limitations under the License. + --> + +<project xmlns="http://maven.apache.org/POM/4.0.0" + xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance" + xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/maven-v4_0_0.xsd"> + + <modelVersion>4.0.0</modelVersion> + <groupId>org.apache.falcon.extensions</groupId> + <artifactId>falcon-hdfs-snapshot-mirroring-extension</artifactId> + <version>0.10-SNAPSHOT</version> + <description>Apache Falcon HDFS Snapshot Mirroring Extension</description> + <name>Apache Falcon Sample HDFS Snapshot Mirroring Extension</name> + <packaging>jar</packaging> +</project> \ No newline at end of file http://git-wip-us.apache.org/repos/asf/falcon/blob/aba79aae/addons/extensions/hdfs-snapshot-mirroring/src/main/META/hdfs-snapshot-mirroring-properties.json ---------------------------------------------------------------------- diff --git a/addons/extensions/hdfs-snapshot-mirroring/src/main/META/hdfs-snapshot-mirroring-properties.json b/addons/extensions/hdfs-snapshot-mirroring/src/main/META/hdfs-snapshot-mirroring-properties.json new file mode 100644 index 0000000..46554c1 --- /dev/null +++ b/addons/extensions/hdfs-snapshot-mirroring/src/main/META/hdfs-snapshot-mirroring-properties.json @@ -0,0 +1,173 @@ +{ + "shortDescription": "This extension implements replicating snapshotable directories on HDFS from one Hadoop cluster to another.", + "properties":[ + { + "propertyName":"jobName", + "required":true, + "description":"Unique hdfs snapshot mirroring job name", + "example":"hdfs-snapshot-daily-mirror" + }, + { + "propertyName":"jobClusterName", + "required":true, + "description":"Cluster where job should run", + "example":"backupCluster" + }, + { + "propertyName":"jobValidityStart", + "required":true, + "description":"Job cluster validity start time", + "example":"2016-03-03T00:00Z" + }, + { + "propertyName":"jobValidityEnd", + "required":true, + "description":"Job cluster validity end time", + "example":"2018-03-13T00:00Z" + }, + { + "propertyName":"jobFrequency", + "required":true, + "description":"Frequency of mirroring job. Valid frequency types are minutes(int), hours(int), days(int), months(int)", + "example":"months(1)" + }, + { + "propertyName":"jobTimezone", + "required":false, + "description":"Time zone for the job", + "example":"UTC" + }, + { + "propertyName":"jobTags", + "required":false, + "description":"List of comma separated tags. Key Value pairs, separated by comma", + "example":"[email protected], [email protected], _department_type=forecasting" + }, + { + "propertyName":"jobRetryPolicy", + "required":false, + "description":"Job retry policy", + "example":"periodic" + }, + { + "propertyName":"jobRetryDelay", + "required":false, + "description":"Job retry delay", + "example":"minutes(30)" + }, + { + "propertyName":"jobRetryAttempts", + "required":false, + "description":"Job retry attempts", + "example":"3" + }, + { + "propertyName":"jobAclOwner", + "required":false, + "description":"ACL owner", + "example":"ambari-qa" + }, + { + "propertyName":"jobAclGroup", + "required":false, + "description":"ACL group", + "example":"users" + }, + { + "propertyName":"jobAclPermission", + "required":false, + "description":"ACL permission", + "example":"*" + }, + { + "propertyName":"sourceCluster", + "required":true, + "description":"Source cluster for hdfs snapshot replication", + "example":"primaryCluster" + }, + { + "propertyName":"sourceSnapshotDir", + "required":true, + "description":"Snapshot-able source directory which should be replicated", + "example":"/user/ambari-qa/snapshot/test/primaryCluster/input" + }, + { + "propertyName":"sourceSnapshotRetentionPolicy", + "required":false, + "description":"Retention policy for snapshots created on source. Default is delete (Right now,only delete is supported)", + "example":"delete" + }, + { + "propertyName":"sourceSnapshotRetentionAgeLimit", + "required":true, + "description":"Snapshots on source older than this age limit will be eligible for deletion.", + "example":"days(7)" + }, + { + "propertyName":"sourceSnapshotRetentionNumber", + "required":true, + "description":"These many latest snapshots on source will be retained, the rest of them eligible for deletion.", + "example":"10" + }, + { + "propertyName":"targetCluster", + "required":true, + "description":"Target cluster for hdfs snapshot replication", + "example":"backupCluster" + }, + { + "propertyName":"targetSnapshotDir", + "required":true, + "description":"Snapshot-able target directory to which source should be replicated", + "example":"/user/ambari-qa/snapshot/test/backupCluster/replica/" + }, + { + "propertyName":"targetSnapshotRetentionPolicy", + "required":false, + "description":"Retention policy for snapshots created on target. Default is delete (Right now,only delete is supported)", + "example":"delete" + }, + { + "propertyName":"targetSnapshotRetentionAgeLimit", + "required":true, + "description":"Snapshots on target older than this age limit will be eligible for deletion.", + "example":"days(7)" + }, + { + "propertyName":"targetSnapshotRetentionNumber", + "required":true, + "description":"These many latest snapshots on target will be retained, the rest of them eligible for deletion.", + "example":"10" + }, + { + "propertyName":"distcpMaxMaps", + "required":false, + "description":"Maximum number of mappers for DistCP", + "example":"1" + }, + { + "propertyName":"distcpMapBandwidth", + "required":false, + "description":"Bandwidth in MB for each mapper in DistCP", + "example":"100" + }, + { + "propertyName":"tdeEncryptionEnabled", + "required":false, + "description":"Specify if TDE based encryption is enabled on source and target dirs", + "example":"false" + }, + { + "propertyName":"jobNotificationType", + "required":false, + "description":"Email Notification for Falcon instance completion", + "example":"email" + }, + { + "propertyName":"jobNotificationReceivers", + "required":false, + "description":"Comma separated email Id's", + "example":"[email protected], [email protected]" + } + ] +} \ No newline at end of file http://git-wip-us.apache.org/repos/asf/falcon/blob/aba79aae/addons/extensions/hdfs-snapshot-mirroring/src/main/resources/runtime/hdfs-snapshot-mirroring-template.xml ---------------------------------------------------------------------- diff --git a/addons/extensions/hdfs-snapshot-mirroring/src/main/resources/runtime/hdfs-snapshot-mirroring-template.xml b/addons/extensions/hdfs-snapshot-mirroring/src/main/resources/runtime/hdfs-snapshot-mirroring-template.xml new file mode 100644 index 0000000..29131da --- /dev/null +++ b/addons/extensions/hdfs-snapshot-mirroring/src/main/resources/runtime/hdfs-snapshot-mirroring-template.xml @@ -0,0 +1,45 @@ +<?xml version="1.0" encoding="UTF-8"?> +<!-- + Licensed to the Apache Software Foundation (ASF) under one + or more contributor license agreements. See the NOTICE file + distributed with this work for additional information + regarding copyright ownership. The ASF licenses this file + to you under the Apache License, Version 2.0 (the + "License"); you may not use this file except in compliance + with the License. You may obtain a copy of the License at + + http://www.apache.org/licenses/LICENSE-2.0 + + Unless required by applicable law or agreed to in writing, software + distributed under the License is distributed on an "AS IS" BASIS, + WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + See the License for the specific language governing permissions and + limitations under the License. + --> + +<process name="##jobName##" xmlns="uri:falcon:process:0.1"> + <clusters> + <!-- source --> + <cluster name="##jobClusterName##"> + <validity end="##jobValidityEnd##" start="##jobValidityStart##"/> + </cluster> + </clusters> + + <tags/> + + <parallel>1</parallel> + <!-- Replication needs to run only once to catch up --> + <order>LAST_ONLY</order> + <frequency>##jobFrequency##</frequency> + <timezone>##jobTimezone##</timezone> + + <properties> + <property name="oozie.wf.subworkflow.classpath.inheritance" value="true"/> + </properties> + + <workflow name="##jobWorkflowName##" engine="##jobWorkflowEngine##" + path="##jobWorkflowPath##" lib="##jobWorkflowLibPath##"/> + <retry policy="##jobRetryPolicy##" delay="##jobRetryDelay##" attempts="3"/> + <notification type="##jobNotificationType##" to="##jobNotificationReceivers##"/> + <ACL/> +</process> \ No newline at end of file http://git-wip-us.apache.org/repos/asf/falcon/blob/aba79aae/addons/extensions/hdfs-snapshot-mirroring/src/main/resources/runtime/hdfs-snapshot-mirroring-workflow.xml ---------------------------------------------------------------------- diff --git a/addons/extensions/hdfs-snapshot-mirroring/src/main/resources/runtime/hdfs-snapshot-mirroring-workflow.xml b/addons/extensions/hdfs-snapshot-mirroring/src/main/resources/runtime/hdfs-snapshot-mirroring-workflow.xml new file mode 100644 index 0000000..c735167 --- /dev/null +++ b/addons/extensions/hdfs-snapshot-mirroring/src/main/resources/runtime/hdfs-snapshot-mirroring-workflow.xml @@ -0,0 +1,172 @@ +<!-- + Licensed to the Apache Software Foundation (ASF) under one + or more contributor license agreements. See the NOTICE file + distributed with this work for additional information + regarding copyright ownership. The ASF licenses this file + to you under the Apache License, Version 2.0 (the + "License"); you may not use this file except in compliance + with the License. You may obtain a copy of the License at + + http://www.apache.org/licenses/LICENSE-2.0 + + Unless required by applicable law or agreed to in writing, software + distributed under the License is distributed on an "AS IS" BASIS, + WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + See the License for the specific language governing permissions and + limitations under the License. + --> +<workflow-app xmlns='uri:oozie:workflow:0.3' name='falcon-hdfs-snapshot-mirroring'> + <start to='snapshot-replication'/> + <!-- Snapshot replication action --> + <action name="snapshot-replication"> + <java> + <job-tracker>${jobTracker}</job-tracker> + <name-node>${nameNode}</name-node> + <configuration> + <property> <!-- hadoop 2 parameter --> + <name>oozie.launcher.mapreduce.job.user.classpath.first</name> + <value>true</value> + </property> + <property> + <name>mapred.job.queue.name</name> + <value>${queueName}</value> + </property> + <property> + <name>oozie.launcher.mapred.job.priority</name> + <value>${jobPriority}</value> + </property> + <property> + <name>oozie.use.system.libpath</name> + <value>true</value> + </property> + <property> + <name>oozie.launcher.oozie.libpath</name> + <value>${wf:conf("falcon.libpath")}</value> + </property> + <property> + <name>oozie.action.sharelib.for.java</name> + <value>distcp</value> + </property> + <property> + <name>oozie.launcher.mapreduce.job.hdfs-servers</name> + <value>${sourceNN},${targetNN}</value> + </property> + <property> + <name>mapreduce.job.hdfs-servers</name> + <value>${sourceNN},${targetNN}</value> + </property> + </configuration> + <main-class>org.apache.falcon.snapshots.replication.HdfsSnapshotReplicator</main-class> + <arg>-Dmapred.job.queue.name=${queueName}</arg> + <arg>-Dmapred.job.priority=${jobPriority}</arg> + <arg>-distcpMaxMaps</arg> + <arg>${distcpMaxMaps}</arg> + <arg>-distcpMapBandwidth</arg> + <arg>${distcpMapBandwidth}</arg> + <arg>-sourceNN</arg> + <arg>${sourceNN}</arg> + <arg>-sourceExecUrl</arg> + <arg>${sourceExecUrl}</arg> + <arg>-sourceNNKerberosPrincipal</arg> + <arg>${sourceNNKerberosPrincipal}</arg> + <arg>-sourceSnapshotDir</arg> + <arg>${sourceSnapshotDir}</arg> + <arg>-targetNN</arg> + <arg>${targetNN}</arg> + <arg>-targetExecUrl</arg> + <arg>${targetExecUrl}</arg> + <arg>-targetNNKerberosPrincipal</arg> + <arg>${targetNNKerberosPrincipal}</arg> + <arg>-targetSnapshotDir</arg> + <arg>${targetSnapshotDir}</arg> + <arg>-tdeEncryptionEnabled</arg> + <arg>${tdeEncryptionEnabled}</arg> + <arg>-snapshotJobName</arg> + <arg>${snapshotJobName}-${nominalTime}</arg> + </java> + <ok to="snapshot-retention"/> + <error to="fail"/> + </action> + <!-- Snapshot retention action --> + <action name="snapshot-retention"> + <java> + <job-tracker>${jobTracker}</job-tracker> + <name-node>${nameNode}</name-node> + <configuration> + <property> <!-- hadoop 2 parameter --> + <name>oozie.launcher.mapreduce.job.user.classpath.first</name> + <value>true</value> + </property> + <property> + <name>mapred.job.queue.name</name> + <value>${queueName}</value> + </property> + <property> + <name>oozie.launcher.mapred.job.priority</name> + <value>${jobPriority}</value> + </property> + <property> + <name>oozie.use.system.libpath</name> + <value>true</value> + </property> + <property> + <name>oozie.action.sharelib.for.java</name> + <value>distcp</value> + </property> + <property> + <name>oozie.launcher.oozie.libpath</name> + <value>${wf:conf("falcon.libpath")}</value> + </property> + <property> + <name>oozie.launcher.mapreduce.job.hdfs-servers</name> + <value>${sourceNN},${targetNN}</value> + </property> + <property> + <name>mapreduce.job.hdfs-servers</name> + <value>${sourceNN},${targetNN}</value> + </property> + </configuration> + <main-class>org.apache.falcon.snapshots.retention.HdfsSnapshotEvictor</main-class> + <arg>-Dmapred.job.queue.name=${queueName}</arg> + <arg>-Dmapred.job.priority=${jobPriority}</arg> + <arg>-sourceNN</arg> + <arg>${sourceNN}</arg> + <arg>-sourceExecUrl</arg> + <arg>${sourceExecUrl}</arg> + <arg>-sourceNNKerberosPrincipal</arg> + <arg>${sourceNNKerberosPrincipal}</arg> + <arg>-sourceSnapshotDir</arg> + <arg>${sourceSnapshotDir}</arg> + <arg>-sourceSnapshotRetentionPolicy</arg> + <arg>${sourceSnapshotRetentionPolicy}</arg> + <arg>-sourceSnapshotRetentionAgeLimit</arg> + <arg>${sourceSnapshotRetentionAgeLimit}</arg> + <arg>-sourceSnapshotRetentionNumber</arg> + <arg>${sourceSnapshotRetentionNumber}</arg> + <arg>-targetNN</arg> + <arg>${targetNN}</arg> + <arg>-targetExecUrl</arg> + <arg>${targetExecUrl}</arg> + <arg>-targetNNKerberosPrincipal</arg> + <arg>${targetNNKerberosPrincipal}</arg> + <arg>-targetSnapshotDir</arg> + <arg>${targetSnapshotDir}</arg> + <arg>-targetSnapshotRetentionPolicy</arg> + <arg>${targetSnapshotRetentionPolicy}</arg> + <arg>-targetSnapshotRetentionAgeLimit</arg> + <arg>${targetSnapshotRetentionAgeLimit}</arg> + <arg>-targetSnapshotRetentionNumber</arg> + <arg>${targetSnapshotRetentionNumber}</arg> + <arg>-snapshotJobName</arg> + <arg>${snapshotJobName}-${nominalTime}</arg> + </java> + <ok to="end"/> + <error to="fail"/> + </action> + <kill name="fail"> + <message> + Workflow action failed, error message[${wf:errorMessage(wf:lastErrorNode())}] + </message> + </kill> + <end name="end"/> +</workflow-app> \ No newline at end of file http://git-wip-us.apache.org/repos/asf/falcon/blob/aba79aae/addons/hdfs-snapshot-mirroring/README ---------------------------------------------------------------------- diff --git a/addons/hdfs-snapshot-mirroring/README b/addons/hdfs-snapshot-mirroring/README new file mode 100644 index 0000000..fc33d3a --- /dev/null +++ b/addons/hdfs-snapshot-mirroring/README @@ -0,0 +1,31 @@ +# Licensed to the Apache Software Foundation (ASF) under one +# or more contributor license agreements. See the NOTICE file +# distributed with this work for additional information +# regarding copyright ownership. The ASF licenses this file +# to you under the Apache License, Version 2.0 (the +# "License"); you may not use this file except in compliance +# with the License. You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, +# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +# See the License for the specific language governing permissions and +# limitations under the License. + +HDFS Snapshot Mirroring Extension. + +Overview +This extension implements replication for snapshot-able directories on HDFS from one +Hadoop cluster to another. This piggybacks on snapshot solution supported in HDFS (HDFS-7535). +It also performs retention on the snapshots generated in source and target. + +Use Case +* Create snapshots in source directory +* Copy this directory between HDFS clusters +* Create snapshot in target directory +* Handle snapshot retention in source and target directories + +Limitations +If TDE encryption is enabled, this snapshot based replication is not efficient. \ No newline at end of file http://git-wip-us.apache.org/repos/asf/falcon/blob/aba79aae/addons/hdfs-snapshot-mirroring/pom.xml ---------------------------------------------------------------------- diff --git a/addons/hdfs-snapshot-mirroring/pom.xml b/addons/hdfs-snapshot-mirroring/pom.xml new file mode 100644 index 0000000..5240d62 --- /dev/null +++ b/addons/hdfs-snapshot-mirroring/pom.xml @@ -0,0 +1,188 @@ +<?xml version="1.0" encoding="UTF-8"?> +<!-- + Licensed to the Apache Software Foundation (ASF) under one + or more contributor license agreements. See the NOTICE file + distributed with this work for additional information + regarding copyright ownership. The ASF licenses this file + to you under the Apache License, Version 2.0 (the + "License"); you may not use this file except in compliance + with the License. You may obtain a copy of the License at + + http://www.apache.org/licenses/LICENSE-2.0 + + Unless required by applicable law or agreed to in writing, software + distributed under the License is distributed on an "AS IS" BASIS, + WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + See the License for the specific language governing permissions and + limitations under the License. + --> +<project xmlns="http://maven.apache.org/POM/4.0.0" + xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance" + xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/xsd/maven-4.0.0.xsd"> + <modelVersion>4.0.0</modelVersion> + + <parent> + <groupId>org.apache.falcon</groupId> + <artifactId>falcon-main</artifactId> + <version>0.10-SNAPSHOT</version> + <relativePath>../../pom.xml</relativePath> + </parent> + <artifactId>falcon-hdfs-snapshot-mirroring</artifactId> + <description>Apache Falcon HDFS Snapshot based Replication Module</description> + <name>Apache Falcon HDFS Snapshot Replication</name> + <packaging>jar</packaging> + + <dependencies> + <!-- dependencies are always listed in sorted order by groupId, artifactId --> + <!-- intra-project --> + + <dependency> + <groupId>org.testng</groupId> + <artifactId>testng</artifactId> + </dependency> + <!-- inter-project --> + <dependency> + <groupId>javax.jms</groupId> + <artifactId>jms</artifactId> + </dependency> + + <dependency> + <groupId>junit</groupId> + <artifactId>junit</artifactId> + <version>3.8.1</version> + <scope>test</scope> + </dependency> + <dependency> + <groupId>org.apache.falcon</groupId> + <artifactId>falcon-test-util</artifactId> + <scope>test</scope> + </dependency> + <dependency> + <groupId>org.apache.falcon</groupId> + <artifactId>falcon-extensions</artifactId> + </dependency> + <dependency> + <groupId>org.apache.falcon</groupId> + <artifactId>falcon-hadoop-dependencies</artifactId> + <scope>test</scope> + </dependency> + <dependency> + <groupId>org.apache.falcon</groupId> + <artifactId>falcon-metrics</artifactId> + </dependency> + </dependencies> + + <profiles> + <profile> + <id>hadoop-2</id> + <activation> + <activeByDefault>true</activeByDefault> + </activation> + <dependencies> + <dependency> + <groupId>org.apache.hadoop</groupId> + <artifactId>hadoop-hdfs</artifactId> + <classifier>tests</classifier> + </dependency> + + <dependency> + <groupId>org.apache.hadoop</groupId> + <artifactId>hadoop-common</artifactId> + <classifier>tests</classifier> + </dependency> + + <dependency> + <groupId>org.apache.hadoop</groupId> + <artifactId>hadoop-common</artifactId> + <scope>compile</scope> + </dependency> + + <dependency> + <groupId>org.apache.hadoop</groupId> + <artifactId>hadoop-hdfs</artifactId> + <scope>compile</scope> + </dependency> + + <dependency> + <groupId>org.apache.hadoop</groupId> + <artifactId>hadoop-mapreduce-client-core</artifactId> + <scope>compile</scope> + <version>${hadoop.version}</version> + </dependency> + + <dependency> + <groupId>org.apache.hadoop</groupId> + <artifactId>hadoop-client</artifactId> + <scope>compile</scope> + </dependency> + + <dependency> + <groupId>org.apache.hadoop</groupId> + <artifactId>hadoop-mapreduce-client-common</artifactId> + <scope>compile</scope> + </dependency> + + <dependency> + <groupId>org.apache.hadoop</groupId> + <artifactId>hadoop-yarn-server-resourcemanager</artifactId> + <scope>compile</scope> + </dependency> + + <dependency> + <groupId>org.apache.hadoop</groupId> + <artifactId>hadoop-yarn-server-nodemanager</artifactId> + <scope>compile</scope> + </dependency> + + <dependency> + <groupId>org.apache.hadoop</groupId> + <artifactId>hadoop-auth</artifactId> + <scope>compile</scope> + </dependency> + + <dependency> + <groupId>org.apache.hadoop</groupId> + <artifactId>hadoop-distcp</artifactId> + <scope>compile</scope> + </dependency> + + <dependency> + <groupId>org.apache.derby</groupId> + <artifactId>derby</artifactId> + <version>${derby.version}</version> + </dependency> + </dependencies> + </profile> + </profiles> + + <build> + <sourceDirectory>${basedir}/src/main/java</sourceDirectory> + <testSourceDirectory>${basedir}/src/test/java</testSourceDirectory> + <plugins> + <plugin> + <groupId>org.apache.maven.plugins</groupId> + <artifactId>maven-jar-plugin</artifactId> + <executions> + <execution> + <goals> + <goal>test-jar</goal> + </goals> + </execution> + </executions> + </plugin> + <plugin> + <groupId>org.apache.maven.plugins</groupId> + <artifactId>maven-surefire-plugin</artifactId> + <configuration> + <systemProperties> + <property> + <name>derby.stream.error.file</name> + <value>target/derby.log</value> + </property> + </systemProperties> + </configuration> + </plugin> + </plugins> + </build> + +</project> http://git-wip-us.apache.org/repos/asf/falcon/blob/aba79aae/addons/hdfs-snapshot-mirroring/src/main/java/org/apache/falcon/snapshots/replication/HdfsSnapshotReplicator.java ---------------------------------------------------------------------- diff --git a/addons/hdfs-snapshot-mirroring/src/main/java/org/apache/falcon/snapshots/replication/HdfsSnapshotReplicator.java b/addons/hdfs-snapshot-mirroring/src/main/java/org/apache/falcon/snapshots/replication/HdfsSnapshotReplicator.java new file mode 100644 index 0000000..2e41cc0 --- /dev/null +++ b/addons/hdfs-snapshot-mirroring/src/main/java/org/apache/falcon/snapshots/replication/HdfsSnapshotReplicator.java @@ -0,0 +1,277 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.falcon.snapshots.replication; + +import org.apache.commons.cli.CommandLine; +import org.apache.commons.cli.GnuParser; +import org.apache.commons.cli.Option; +import org.apache.commons.cli.Options; +import org.apache.commons.cli.ParseException; +import org.apache.commons.lang3.StringUtils; +import org.apache.falcon.FalconException; +import org.apache.falcon.extensions.mirroring.hdfsSnapshot.HdfsSnapshotMirrorProperties; +import org.apache.falcon.hadoop.HadoopClientFactory; +import org.apache.falcon.snapshots.util.HdfsSnapshotUtil; +import org.apache.falcon.workflow.util.OozieActionConfigurationHelper; +import org.apache.hadoop.conf.Configuration; +import org.apache.hadoop.conf.Configured; +import org.apache.hadoop.fs.FileStatus; +import org.apache.hadoop.fs.FileSystem; +import org.apache.hadoop.fs.Path; +import org.apache.hadoop.hdfs.DistributedFileSystem; +import org.apache.hadoop.mapreduce.Job; +import org.apache.hadoop.tools.DistCp; +import org.apache.hadoop.tools.DistCpOptions; +import org.apache.hadoop.util.Tool; +import org.apache.hadoop.util.ToolRunner; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +import java.io.IOException; +import java.util.ArrayList; +import java.util.Arrays; +import java.util.Comparator; +import java.util.HashSet; +import java.util.List; +import java.util.Set; + +/** + * HDFS snapshot generator and snapshot based replicator. + */ +public class HdfsSnapshotReplicator extends Configured implements Tool { + private static final Logger LOG = LoggerFactory.getLogger(HdfsSnapshotReplicator.class); + protected CommandLine cmd; + + public static void main(String[] args) throws Exception { + Configuration conf = OozieActionConfigurationHelper.createActionConf(); + int ret = ToolRunner.run(conf, new HdfsSnapshotReplicator(), args); + if (ret != 0) { + throw new Exception("Unable to perform Snapshot based replication action args: " + Arrays.toString(args)); + } + } + + @Override + public int run(String[] args) throws FalconException { + cmd = getCommand(args); + + String sourceStorageUrl = cmd.getOptionValue(HdfsSnapshotMirrorProperties.SOURCE_NN.getName()); + String targetStorageUrl = cmd.getOptionValue(HdfsSnapshotMirrorProperties.TARGET_NN.getName()); + + DistributedFileSystem sourceFs = HdfsSnapshotUtil.getSourceFileSystem(cmd); + DistributedFileSystem targetFs = HdfsSnapshotUtil.getTargetFileSystem(cmd); + + String currentSnapshotName = HdfsSnapshotUtil.SNAPSHOT_PREFIX + + cmd.getOptionValue(HdfsSnapshotMirrorProperties.SNAPSHOT_JOB_NAME.getName()) + + "-" + System.currentTimeMillis(); + String sourceDir = cmd.getOptionValue(HdfsSnapshotMirrorProperties.SOURCE_SNAPSHOT_DIR.getName()); + String targetDir = cmd.getOptionValue(HdfsSnapshotMirrorProperties.TARGET_SNAPSHOT_DIR.getName()); + + // Generate snapshot on source. + createSnapshotInFileSystem(sourceDir, currentSnapshotName, sourceFs); + + // Find most recently recplicated snapshot. If it exists, distCp using the snapshots. + // If not, do regular distcp as this is the first time job is running. + invokeCopy(sourceStorageUrl, targetStorageUrl, sourceFs, targetFs, + sourceDir, targetDir, currentSnapshotName); + + // Generate snapshot on target if distCp succeeds. + createSnapshotInFileSystem(targetDir, currentSnapshotName, targetFs); + + LOG.info("Completed HDFS Snapshot Replication."); + return 0; + } + + private static void createSnapshotInFileSystem(String dirName, String snapshotName, + FileSystem fs) throws FalconException { + try { + LOG.info("Creating snapshot {} in directory {}", snapshotName, dirName); + fs.createSnapshot(new Path(dirName), snapshotName); + } catch (IOException e) { + LOG.warn("Unable to create snapshot {} in filesystem {}. Exception is {}", + snapshotName, fs.getConf().get(HadoopClientFactory.FS_DEFAULT_NAME_KEY), e.getMessage()); + throw new FalconException("Unable to create snapshot " + snapshotName, e); + } + } + + protected void invokeCopy(String sourceStorageUrl, String targetStorageUrl, + DistributedFileSystem sourceFs, DistributedFileSystem targetFs, + String sourceDir, String targetDir, + String currentSnapshotName) throws FalconException { + try { + Configuration jobConf = this.getConf(); + DistCpOptions options = getDistCpOptions(sourceStorageUrl, targetStorageUrl, + sourceFs, targetFs, sourceDir, targetDir, currentSnapshotName); + DistCp distCp = new DistCp(jobConf, options); + LOG.info("Started Snapshot based DistCp from {} to {} ", getStagingUri(sourceStorageUrl, sourceDir), + getStagingUri(targetStorageUrl, targetDir)); + Job distcpJob = distCp.execute(); + LOG.info("Distp Hadoop job: {}", distcpJob.getJobID().toString()); + LOG.info("Completed Snapshot based DistCp"); + + } catch (FalconException fe) { + throw fe; + } catch (Exception e) { + throw new FalconException("Unable to replicate HDFS directory using snapshots.", e); + } + } + + private DistCpOptions getDistCpOptions(String sourceStorageUrl, String targetStorageUrl, + DistributedFileSystem sourceFs, DistributedFileSystem targetFs, + String sourceDir, String targetDir, + String currentSnapshotName) throws FalconException { + + List<Path> sourceUris=new ArrayList<Path>(); + sourceUris.add(new Path(getStagingUri(sourceStorageUrl, sourceDir))); + + DistCpOptions distcpOptions = new DistCpOptions(sourceUris, + new Path(getStagingUri(targetStorageUrl, targetDir))); + + // Settings needed for Snapshot distCp. + distcpOptions.setSyncFolder(true); + distcpOptions.setDeleteMissing(true); + + // Use snapshot diff if two snapshots exist. Else treat it as simple distCp. + // get latest replicated snapshot. + String replicatedSnapshotName = findLatestReplicatedSnapshot(sourceFs, targetFs, sourceDir, targetDir); + if (StringUtils.isNotBlank(replicatedSnapshotName)) { + distcpOptions.setUseDiff(true, replicatedSnapshotName, currentSnapshotName); + } + + if (Boolean.valueOf(cmd.getOptionValue(HdfsSnapshotMirrorProperties.TDE_ENCRYPTION_ENABLED.getName()))) { + // skipCRCCheck and update enabled + distcpOptions.setSkipCRC(true); + } + + distcpOptions.setBlocking(true); + distcpOptions.setMaxMaps( + Integer.parseInt(cmd.getOptionValue(HdfsSnapshotMirrorProperties.DISTCP_MAX_MAPS.getName()))); + distcpOptions.setMapBandwidth( + Integer.parseInt(cmd.getOptionValue(HdfsSnapshotMirrorProperties.MAP_BANDWIDTH_IN_MB.getName()))); + return distcpOptions; + } + + private String findLatestReplicatedSnapshot(DistributedFileSystem sourceFs, DistributedFileSystem targetFs, + String sourceDir, String targetDir) throws FalconException { + try { + FileStatus[] sourceSnapshots = sourceFs.listStatus(new Path(getSnapshotDir(sourceDir))); + Set<String> sourceSnapshotNames = new HashSet<String>(); + for (FileStatus snapshot : sourceSnapshots) { + sourceSnapshotNames.add(snapshot.getPath().getName()); + } + + FileStatus[] targetSnapshots = targetFs.listStatus(new Path(getSnapshotDir(targetDir))); + if (targetSnapshots.length > 0) { + //sort target snapshots in desc order of creation time. + Arrays.sort(targetSnapshots, new Comparator<FileStatus>() { + @Override + public int compare(FileStatus f1, FileStatus f2) { + return Long.compare(f2.getModificationTime(), f1.getModificationTime()); + } + }); + + // get most recent snapshot name that exists in source. + for (int i = 0; i < targetSnapshots.length; i++) { + String name = targetSnapshots[i].getPath().getName(); + if (sourceSnapshotNames.contains(name)) { + return name; + } + } + // If control reaches here, + // there are snapshots on target, but none are replicated from source. Return null. + } // No target snapshots, return null + return null; + } catch (IOException e) { + LOG.error("Unable to find latest snapshot on targetDir {} {}", targetDir, e.getMessage()); + throw new FalconException("Unable to find latest snapshot on targetDir " + targetDir, e); + } + } + + private String getStagingUri(String storageUrl, String dir) { + storageUrl = StringUtils.removeEnd(storageUrl, Path.SEPARATOR); + return storageUrl + Path.SEPARATOR + dir; + } + + private String getSnapshotDir(String dirName) { + dirName = StringUtils.removeEnd(dirName, Path.SEPARATOR); + return dirName + Path.SEPARATOR + HdfsSnapshotUtil.SNAPSHOT_DIR_PREFIX + Path.SEPARATOR; + } + + protected CommandLine getCommand(String[] args) throws FalconException { + Options options = new Options(); + + Option opt = new Option(HdfsSnapshotMirrorProperties.DISTCP_MAX_MAPS.getName(), + true, "max number of maps to use for distcp"); + opt.setRequired(true); + options.addOption(opt); + opt = new Option(HdfsSnapshotMirrorProperties.MAP_BANDWIDTH_IN_MB.getName(), + true, "Bandwidth in MB/s used by each mapper during replication"); + opt.setRequired(true); + options.addOption(opt); + + opt = new Option(HdfsSnapshotMirrorProperties.SOURCE_NN.getName(), true, "Source NN"); + opt.setRequired(true); + options.addOption(opt); + opt = new Option(HdfsSnapshotMirrorProperties.SOURCE_EXEC_URL.getName(), + true, "Replication instance job Exec Url"); + opt.setRequired(true); + options.addOption(opt); + opt = new Option(HdfsSnapshotMirrorProperties.SOURCE_NN_KERBEROS_PRINCIPAL.getName(), + true, "Replication instance job NN Kerberos Principal"); + opt.setRequired(false); + options.addOption(opt); + opt = new Option(HdfsSnapshotMirrorProperties.SOURCE_SNAPSHOT_DIR.getName(), + true, "Source snapshot-able dir to replicate"); + opt.setRequired(true); + options.addOption(opt); + + opt = new Option(HdfsSnapshotMirrorProperties.TARGET_NN.getName(), true, "Target NN"); + opt.setRequired(true); + options.addOption(opt); + opt = new Option(HdfsSnapshotMirrorProperties.TARGET_EXEC_URL.getName(), + true, "Replication instance target Exec Url"); + opt.setRequired(true); + options.addOption(opt); + opt = new Option(HdfsSnapshotMirrorProperties.TARGET_NN_KERBEROS_PRINCIPAL.getName(), + true, "Replication instance target NN Kerberos Principal"); + opt.setRequired(false); + options.addOption(opt); + opt = new Option(HdfsSnapshotMirrorProperties.TARGET_SNAPSHOT_DIR.getName(), + true, "Target snapshot-able dir to replicate"); + opt.setRequired(true); + options.addOption(opt); + + opt = new Option(HdfsSnapshotMirrorProperties.TDE_ENCRYPTION_ENABLED.getName(), + true, "Is TDE encryption enabled on dirs being replicated?"); + opt.setRequired(false); + options.addOption(opt); + + opt = new Option(HdfsSnapshotMirrorProperties.SNAPSHOT_JOB_NAME.getName(), + true, "Replication instance job name"); + opt.setRequired(true); + options.addOption(opt); + + try { + return new GnuParser().parse(options, args); + } catch (ParseException pe) { + LOG.info("Unabel to parse commad line arguments for HdfsSnapshotReplicator " + pe.getMessage()); + throw new FalconException(pe.getMessage()); + } + } + +} http://git-wip-us.apache.org/repos/asf/falcon/blob/aba79aae/addons/hdfs-snapshot-mirroring/src/main/java/org/apache/falcon/snapshots/retention/HdfsSnapshotEvictor.java ---------------------------------------------------------------------- diff --git a/addons/hdfs-snapshot-mirroring/src/main/java/org/apache/falcon/snapshots/retention/HdfsSnapshotEvictor.java b/addons/hdfs-snapshot-mirroring/src/main/java/org/apache/falcon/snapshots/retention/HdfsSnapshotEvictor.java new file mode 100644 index 0000000..22e3377 --- /dev/null +++ b/addons/hdfs-snapshot-mirroring/src/main/java/org/apache/falcon/snapshots/retention/HdfsSnapshotEvictor.java @@ -0,0 +1,208 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.falcon.snapshots.retention; + +import org.apache.commons.cli.CommandLine; +import org.apache.commons.cli.GnuParser; +import org.apache.commons.cli.Option; +import org.apache.commons.cli.Options; +import org.apache.falcon.FalconException; +import org.apache.falcon.extensions.mirroring.hdfsSnapshot.HdfsSnapshotMirrorProperties; +import org.apache.falcon.retention.EvictionHelper; +import org.apache.falcon.snapshots.util.HdfsSnapshotUtil; +import org.apache.falcon.workflow.util.OozieActionConfigurationHelper; +import org.apache.hadoop.conf.Configuration; +import org.apache.hadoop.conf.Configured; +import org.apache.hadoop.fs.FileStatus; +import org.apache.hadoop.fs.Path; +import org.apache.hadoop.hdfs.DistributedFileSystem; +import org.apache.commons.lang3.StringUtils; +import org.apache.hadoop.util.Tool; +import org.apache.hadoop.util.ToolRunner; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +import javax.servlet.jsp.el.ELException; +import java.io.IOException; +import java.util.Arrays; +import java.util.Comparator; + +/** + * HDFS snapshot evictor. + */ +public class HdfsSnapshotEvictor extends Configured implements Tool { + private static final Logger LOG = LoggerFactory.getLogger(HdfsSnapshotEvictor.class); + + public static void main(String[] args) throws Exception { + Configuration conf = OozieActionConfigurationHelper.createActionConf(); + int ret = ToolRunner.run(conf, new HdfsSnapshotEvictor(), args); + if (ret != 0) { + throw new Exception("Unable to perform eviction action args: " + Arrays.toString(args)); + } + } + + @Override + public int run(String[] args) throws Exception { + CommandLine cmd = getCommand(args); + DistributedFileSystem sourceFs = HdfsSnapshotUtil.getSourceFileSystem(cmd); + DistributedFileSystem targetFs = HdfsSnapshotUtil.getTargetFileSystem(cmd); + + String sourceDir = cmd.getOptionValue(HdfsSnapshotMirrorProperties.SOURCE_SNAPSHOT_DIR.getName()); + String targetDir = cmd.getOptionValue(HdfsSnapshotMirrorProperties.TARGET_SNAPSHOT_DIR.getName()); + + // evict on source + String retPolicy = cmd.getOptionValue(HdfsSnapshotMirrorProperties.SOURCE_SNAPSHOT_RETENTION_POLICY.getName()); + String ageLimit = cmd.getOptionValue( + HdfsSnapshotMirrorProperties.SOURCE_SNAPSHOT_RETENTION_AGE_LIMIT.getName()); + int numSnapshots = Integer.parseInt( + cmd.getOptionValue(HdfsSnapshotMirrorProperties.SOURCE_SNAPSHOT_RETENTION_NUMBER.getName())); + if (retPolicy.equalsIgnoreCase("delete")) { + evictSnapshots(sourceFs, sourceDir, ageLimit, numSnapshots); + } else { + LOG.warn("Unsupported source retention policy {}", retPolicy); + throw new FalconException("Unsupported source retention policy " + retPolicy); + } + + // evict on target + retPolicy = cmd.getOptionValue(HdfsSnapshotMirrorProperties.TARGET_SNAPSHOT_RETENTION_POLICY.getName()); + ageLimit = cmd.getOptionValue( + HdfsSnapshotMirrorProperties.TARGET_SNAPSHOT_RETENTION_AGE_LIMIT.getName()); + numSnapshots = Integer.parseInt( + cmd.getOptionValue(HdfsSnapshotMirrorProperties.TARGET_SNAPSHOT_RETENTION_NUMBER.getName())); + if (retPolicy.equalsIgnoreCase("delete")) { + evictSnapshots(targetFs, targetDir, ageLimit, numSnapshots); + } else { + LOG.warn("Unsupported target retention policy {}", retPolicy); + throw new FalconException("Unsupported target retention policy " + retPolicy); + } + + LOG.info("Completed HDFS Snapshot Eviction."); + return 0; + } + + protected static void evictSnapshots(DistributedFileSystem fs, String dirName, String ageLimit, + int numSnapshots) throws FalconException { + try { + LOG.info("Started evicting snapshots on dir {}{} using policy {}, agelimit {}, numSnapshot {}", + fs.getUri(), dirName, ageLimit, numSnapshots); + + long evictionTime = System.currentTimeMillis() - EvictionHelper.evalExpressionToMilliSeconds(ageLimit); + + dirName = StringUtils.removeEnd(dirName, Path.SEPARATOR); + String snapshotDir = dirName + Path.SEPARATOR + HdfsSnapshotUtil.SNAPSHOT_DIR_PREFIX + Path.SEPARATOR; + FileStatus[] snapshots = fs.listStatus(new Path(snapshotDir)); + if (snapshots.length <= numSnapshots) { + // no eviction needed + return; + } + + // Sort by last modified time, ascending order. + Arrays.sort(snapshots, new Comparator<FileStatus>() { + @Override + public int compare(FileStatus f1, FileStatus f2) { + return Long.compare(f1.getModificationTime(), f2.getModificationTime()); + } + }); + + for (int i = 0; i < (snapshots.length - numSnapshots); i++) { + // delete if older than ageLimit while retaining numSnapshots + if (snapshots[i].getModificationTime() < evictionTime) { + fs.deleteSnapshot(new Path(dirName), snapshots[i].getPath().getName()); + } + } + + } catch (ELException ele) { + LOG.warn("Unable to parse retention age limit {} {}", ageLimit, ele.getMessage()); + throw new FalconException("Unable to parse retention age limit " + ageLimit, ele); + } catch (IOException ioe) { + LOG.warn("Unable to evict snapshots from dir {} {}", dirName, ioe); + throw new FalconException("Unable to evict snapshots from dir " + dirName, ioe); + } + + } + + private CommandLine getCommand(String[] args) throws org.apache.commons.cli.ParseException { + Options options = new Options(); + + Option opt = new Option(HdfsSnapshotMirrorProperties.SOURCE_NN.getName(), true, "Source Cluster"); + opt.setRequired(true); + options.addOption(opt); + opt = new Option(HdfsSnapshotMirrorProperties.SOURCE_EXEC_URL.getName(), + true, "Replication instance job Exec Url"); + opt.setRequired(true); + options.addOption(opt); + opt = new Option(HdfsSnapshotMirrorProperties.SOURCE_NN_KERBEROS_PRINCIPAL.getName(), + true, "Replication instance job NN Kerberos Principal"); + opt.setRequired(false); + options.addOption(opt); + opt = new Option(HdfsSnapshotMirrorProperties.SOURCE_SNAPSHOT_DIR.getName(), + true, "Source snapshot-able dir to replicate"); + opt.setRequired(true); + options.addOption(opt); + + opt = new Option(HdfsSnapshotMirrorProperties.TARGET_NN.getName(), true, "Target Cluster"); + opt.setRequired(true); + options.addOption(opt); + opt = new Option(HdfsSnapshotMirrorProperties.TARGET_SNAPSHOT_DIR.getName(), + true, "Target snapshot-able dir to replicate"); + opt.setRequired(true); + options.addOption(opt); + opt = new Option(HdfsSnapshotMirrorProperties.TARGET_EXEC_URL.getName(), + true, "Replication instance target Exec Url"); + opt.setRequired(true); + options.addOption(opt); + opt = new Option(HdfsSnapshotMirrorProperties.TARGET_NN_KERBEROS_PRINCIPAL.getName(), + true, "Replication instance target NN Kerberos Principal"); + opt.setRequired(false); + options.addOption(opt); + + opt = new Option(HdfsSnapshotMirrorProperties.SNAPSHOT_JOB_NAME.getName(), true, + "Replication instance job name"); + opt.setRequired(true); + options.addOption(opt); + + opt = new Option(HdfsSnapshotMirrorProperties.SOURCE_SNAPSHOT_RETENTION_POLICY.getName(), + true, "Source retention policy"); + opt.setRequired(false); + options.addOption(opt); + opt = new Option(HdfsSnapshotMirrorProperties.SOURCE_SNAPSHOT_RETENTION_AGE_LIMIT.getName(), + true, "Source delete snapshots older than agelimit"); + opt.setRequired(true); + options.addOption(opt); + opt = new Option(HdfsSnapshotMirrorProperties.SOURCE_SNAPSHOT_RETENTION_NUMBER.getName(), + true, "Source number of snapshots to retain"); + opt.setRequired(true); + options.addOption(opt); + + opt = new Option(HdfsSnapshotMirrorProperties.TARGET_SNAPSHOT_RETENTION_POLICY.getName(), + true, "Target retention policy"); + opt.setRequired(false); + options.addOption(opt); + opt = new Option(HdfsSnapshotMirrorProperties.TARGET_SNAPSHOT_RETENTION_AGE_LIMIT.getName(), + true, "Target delete snapshots older than agelimit"); + opt.setRequired(true); + options.addOption(opt); + opt = new Option(HdfsSnapshotMirrorProperties.TARGET_SNAPSHOT_RETENTION_NUMBER.getName(), + true, "Target number of snapshots to retain"); + opt.setRequired(true); + options.addOption(opt); + + return new GnuParser().parse(options, args); + } +} http://git-wip-us.apache.org/repos/asf/falcon/blob/aba79aae/addons/hdfs-snapshot-mirroring/src/main/java/org/apache/falcon/snapshots/util/HdfsSnapshotUtil.java ---------------------------------------------------------------------- diff --git a/addons/hdfs-snapshot-mirroring/src/main/java/org/apache/falcon/snapshots/util/HdfsSnapshotUtil.java b/addons/hdfs-snapshot-mirroring/src/main/java/org/apache/falcon/snapshots/util/HdfsSnapshotUtil.java new file mode 100644 index 0000000..5196791 --- /dev/null +++ b/addons/hdfs-snapshot-mirroring/src/main/java/org/apache/falcon/snapshots/util/HdfsSnapshotUtil.java @@ -0,0 +1,67 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.falcon.snapshots.util; + +import org.apache.commons.cli.CommandLine; +import org.apache.falcon.FalconException; +import org.apache.falcon.entity.ClusterHelper; +import org.apache.falcon.extensions.mirroring.hdfsSnapshot.HdfsSnapshotMirrorProperties; +import org.apache.falcon.extensions.mirroring.hdfsSnapshot.HdfsSnapshotMirroringExtension; +import org.apache.falcon.hadoop.HadoopClientFactory; +import org.apache.hadoop.conf.Configuration; +import org.apache.hadoop.hdfs.DistributedFileSystem; + +/** + * Util class for HDFS snapshot based mirroring. + */ +public final class HdfsSnapshotUtil { + + public static final String SNAPSHOT_PREFIX = "falcon-snapshot-"; + public static final String SNAPSHOT_DIR_PREFIX = ".snapshot"; + + private HdfsSnapshotUtil() {} + + public static DistributedFileSystem getSourceFileSystem(CommandLine cmd) throws FalconException { + String sourceStorageUrl = cmd.getOptionValue(HdfsSnapshotMirrorProperties.SOURCE_NN.getName()); + String sourceExecuteEndpoint = cmd.getOptionValue(HdfsSnapshotMirrorProperties.SOURCE_EXEC_URL.getName()); + String sourcePrincipal = parseKerberosPrincipal(cmd.getOptionValue( + HdfsSnapshotMirrorProperties.SOURCE_NN_KERBEROS_PRINCIPAL.getName())); + Configuration sourceConf = ClusterHelper.getConfiguration(sourceStorageUrl, + sourceExecuteEndpoint, sourcePrincipal); + return HadoopClientFactory.get().createDistributedProxiedFileSystem(sourceConf); + } + + public static DistributedFileSystem getTargetFileSystem(CommandLine cmd) throws FalconException { + String targetStorageUrl = cmd.getOptionValue(HdfsSnapshotMirrorProperties.TARGET_NN.getName()); + String taregtExecuteEndpoint = cmd.getOptionValue(HdfsSnapshotMirrorProperties.TARGET_EXEC_URL.getName()); + String targetPrincipal = parseKerberosPrincipal(cmd.getOptionValue( + HdfsSnapshotMirrorProperties.TARGET_NN_KERBEROS_PRINCIPAL.getName())); + + Configuration targetConf = ClusterHelper.getConfiguration(targetStorageUrl, + taregtExecuteEndpoint, targetPrincipal); + return HadoopClientFactory.get().createDistributedProxiedFileSystem(targetConf); + } + + public static String parseKerberosPrincipal(String principal) { + if (principal.equals(HdfsSnapshotMirroringExtension.EMPTY_KERBEROS_PRINCIPAL)) { + return null; + } + return principal; + } +} http://git-wip-us.apache.org/repos/asf/falcon/blob/aba79aae/addons/hdfs-snapshot-mirroring/src/test/java/org/apache/falcon/snapshots/replication/HdfsSnapshotReplicatorTest.java ---------------------------------------------------------------------- diff --git a/addons/hdfs-snapshot-mirroring/src/test/java/org/apache/falcon/snapshots/replication/HdfsSnapshotReplicatorTest.java b/addons/hdfs-snapshot-mirroring/src/test/java/org/apache/falcon/snapshots/replication/HdfsSnapshotReplicatorTest.java new file mode 100644 index 0000000..7924214 --- /dev/null +++ b/addons/hdfs-snapshot-mirroring/src/test/java/org/apache/falcon/snapshots/replication/HdfsSnapshotReplicatorTest.java @@ -0,0 +1,163 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.falcon.snapshots.replication; + +import org.apache.falcon.FalconException; +import org.apache.falcon.cluster.util.MiniHdfsClusterUtil; +import org.apache.falcon.entity.ClusterHelper; +import org.apache.falcon.entity.store.ConfigurationStore; +import org.apache.falcon.entity.v0.EntityType; +import org.apache.falcon.entity.v0.cluster.Cluster; +import org.apache.falcon.extensions.mirroring.hdfsSnapshot.HdfsSnapshotMirrorProperties; +import org.apache.falcon.hadoop.HadoopClientFactory; +import org.apache.hadoop.conf.Configuration; +import org.apache.hadoop.fs.Path; +import org.apache.hadoop.fs.permission.FsAction; +import org.apache.hadoop.fs.permission.FsPermission; +import org.apache.hadoop.hdfs.DistributedFileSystem; +import org.apache.hadoop.hdfs.MiniDFSCluster; +import org.testng.Assert; +import org.testng.annotations.AfterClass; +import org.testng.annotations.BeforeClass; +import org.testng.annotations.Test; + +import java.io.File; +import java.io.InputStream; +import java.nio.file.Files; + +/** + * Hdfs Snapshot replicator unit tests. + */ +public class HdfsSnapshotReplicatorTest extends HdfsSnapshotReplicator { + private MiniDFSCluster miniDFSCluster; + private DistributedFileSystem miniDfs; + private File baseDir; + private Cluster sourceCluster; + private Cluster targetCluster; + private String sourceStorageUrl; + private String targetStorageUrl; + private Path sourceDir = new Path("/apps/falcon/snapshot-replication/sourceDir/"); + private Path targetDir = new Path("/apps/falcon/snapshot-replication/targetDir/"); + + private FsPermission fsPermission = new FsPermission(FsAction.ALL, FsAction.ALL, FsAction.ALL); + + private String[] args = {"--" + HdfsSnapshotMirrorProperties.DISTCP_MAX_MAPS.getName(), "1", + "--" + HdfsSnapshotMirrorProperties.MAP_BANDWIDTH_IN_MB.getName(), "100", + "--" + HdfsSnapshotMirrorProperties.SOURCE_NN.getName(), "hdfs://localhost:54136", + "--" + HdfsSnapshotMirrorProperties.SOURCE_EXEC_URL.getName(), "localhost:8021", + "--" + HdfsSnapshotMirrorProperties.TARGET_EXEC_URL.getName(), "localhost:8021", + "--" + HdfsSnapshotMirrorProperties.TARGET_NN.getName(), "hdfs://localhost:54136", + "--" + HdfsSnapshotMirrorProperties.SOURCE_SNAPSHOT_DIR.getName(), + "/apps/falcon/snapshot-replication/sourceDir/", + "--" + HdfsSnapshotMirrorProperties.TARGET_SNAPSHOT_DIR.getName(), + "/apps/falcon/snapshot-replication/targetDir/", + "--" + HdfsSnapshotMirrorProperties.TDE_ENCRYPTION_ENABLED.getName(), "false", + "--" + HdfsSnapshotMirrorProperties.SNAPSHOT_JOB_NAME.getName(), "snapshotJobName", }; + + @BeforeClass + public void init() throws Exception { + baseDir = Files.createTempDirectory("test_snapshot-replication").toFile().getAbsoluteFile(); + miniDFSCluster = MiniHdfsClusterUtil.initMiniDfs(MiniHdfsClusterUtil.SNAPSHOT_REPL_TEST_PORT, baseDir); + miniDfs = miniDFSCluster.getFileSystem(); + + sourceCluster = initCluster("/primary-cluster-0.1.xml"); + targetCluster = initCluster("/backup-cluster-0.1.xml"); + + miniDfs.mkdirs(sourceDir, fsPermission); + miniDfs.mkdirs(targetDir, fsPermission); + + miniDfs.allowSnapshot(sourceDir); + miniDfs.allowSnapshot(targetDir); + + cmd = getCommand(args); + Assert.assertEquals(cmd.getOptionValue(HdfsSnapshotMirrorProperties.DISTCP_MAX_MAPS.getName()), "1"); + Assert.assertEquals(cmd.getOptionValue(HdfsSnapshotMirrorProperties.MAP_BANDWIDTH_IN_MB.getName()), "100"); + + } + + private Cluster initCluster(String clusterName) throws Exception { + InputStream inputStream = getClass().getResourceAsStream(clusterName); + Cluster cluster = (Cluster) EntityType.CLUSTER.getUnmarshaller().unmarshal(inputStream); + ConfigurationStore.get().publish(EntityType.CLUSTER, cluster); + return cluster; + } + + @Test + public void replicationTest() throws Exception { + Configuration sourceConf = ClusterHelper.getConfiguration(sourceCluster); + this.setConf(sourceConf); + Configuration targetConf = ClusterHelper.getConfiguration(targetCluster); + sourceStorageUrl = ClusterHelper.getStorageUrl(sourceCluster); + targetStorageUrl = ClusterHelper.getStorageUrl(targetCluster); + + DistributedFileSystem sourceFs = HadoopClientFactory.get().createDistributedProxiedFileSystem(sourceConf); + DistributedFileSystem targetFs = HadoopClientFactory.get().createDistributedProxiedFileSystem(targetConf); + + // create dir1, create snapshot, invoke copy, check file in target, create snapshot on target + Path dir1 = new Path(sourceDir, "dir1"); + miniDfs.mkdir(dir1, fsPermission); + miniDfs.createSnapshot(sourceDir, "snapshot1"); + invokeCopy(sourceStorageUrl, targetStorageUrl, sourceFs, targetFs, + sourceDir.toString(), targetDir.toString(), "snapshot1"); + miniDfs.createSnapshot(targetDir, "snapshot1"); + Assert.assertTrue(miniDfs.exists(new Path(targetDir, "dir1"))); + + // create dir2, create snapshot, invoke copy, check dir in target, create snapshot on target + Path dir2 = new Path(sourceDir, "dir2"); + miniDfs.mkdir(dir2, fsPermission); + miniDfs.createSnapshot(sourceDir, "snapshot2"); + invokeCopy(sourceStorageUrl, targetStorageUrl, sourceFs, targetFs, + sourceDir.toString(), targetDir.toString(), "snapshot2"); + miniDfs.createSnapshot(targetDir, "snapshot2"); + Assert.assertTrue(miniDfs.exists(new Path(targetDir, "dir1"))); + Assert.assertTrue(miniDfs.exists(new Path(targetDir, "dir2"))); + + // delete dir1, create snapshot, invoke copy, check file not in target + miniDfs.delete(dir1, true); + miniDfs.createSnapshot(sourceDir, "snapshot3"); + invokeCopy(sourceStorageUrl, targetStorageUrl, sourceFs, targetFs, + sourceDir.toString(), targetDir.toString(), "snapshot3"); + miniDfs.createSnapshot(targetDir, "snapshot3"); + Assert.assertFalse(miniDfs.exists(new Path(targetDir, "dir1"))); + Assert.assertTrue(miniDfs.exists(new Path(targetDir, "dir2"))); + } + + @Test(dependsOnMethods = "replicationTest", + expectedExceptions = FalconException.class, + expectedExceptionsMessageRegExp = "Unable to find latest snapshot on targetDir " + + "/apps/falcon/snapshot-replication/targetDir") + public void removeSnapshotabilityOnTargetTest() throws Exception { + // remove snapshotability on target, create snapshot on source, invoke copy should fail + miniDfs.deleteSnapshot(targetDir, "snapshot1"); + miniDfs.deleteSnapshot(targetDir, "snapshot2"); + miniDfs.deleteSnapshot(targetDir, "snapshot3"); + + miniDfs.disallowSnapshot(targetDir); + Path dir1 = new Path(sourceDir, "dir4"); + miniDfs.mkdir(dir1, fsPermission); + miniDfs.createSnapshot(sourceDir, "snapshot4"); + invokeCopy(sourceStorageUrl, targetStorageUrl, miniDfs, miniDfs, + sourceDir.toString(), targetDir.toString(), "snapshot4"); + } + + @AfterClass + public void cleanup() throws Exception { + MiniHdfsClusterUtil.cleanupDfs(miniDFSCluster, baseDir); + } +} http://git-wip-us.apache.org/repos/asf/falcon/blob/aba79aae/addons/hdfs-snapshot-mirroring/src/test/java/org/apache/falcon/snapshots/retention/HdfsSnapshotEvictorTest.java ---------------------------------------------------------------------- diff --git a/addons/hdfs-snapshot-mirroring/src/test/java/org/apache/falcon/snapshots/retention/HdfsSnapshotEvictorTest.java b/addons/hdfs-snapshot-mirroring/src/test/java/org/apache/falcon/snapshots/retention/HdfsSnapshotEvictorTest.java new file mode 100644 index 0000000..a73c399 --- /dev/null +++ b/addons/hdfs-snapshot-mirroring/src/test/java/org/apache/falcon/snapshots/retention/HdfsSnapshotEvictorTest.java @@ -0,0 +1,98 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.falcon.snapshots.retention; + +import org.apache.falcon.FalconException; +import org.apache.falcon.cluster.util.MiniHdfsClusterUtil; +import org.apache.hadoop.fs.FileStatus; +import org.apache.hadoop.fs.Path; +import org.apache.hadoop.fs.permission.FsAction; +import org.apache.hadoop.fs.permission.FsPermission; +import org.apache.hadoop.hdfs.DistributedFileSystem; +import org.apache.hadoop.hdfs.MiniDFSCluster; +import org.testng.Assert; +import org.testng.annotations.AfterClass; +import org.testng.annotations.BeforeClass; +import org.testng.annotations.Test; + +import java.io.File; +import java.nio.file.Files; + +/** + * Hdfs snapshot evictor unit tests. + */ +public class HdfsSnapshotEvictorTest extends HdfsSnapshotEvictor { + + private static final int NUM_FILES = 7; + private MiniDFSCluster miniDFSCluster; + private DistributedFileSystem miniDfs; + private File baseDir; + private Path evictionDir = new Path("/apps/falcon/snapshot-eviction/"); + private FsPermission fsPermission = new FsPermission(FsAction.ALL, FsAction.ALL, FsAction.ALL); + + @BeforeClass + public void init() throws Exception { + baseDir = Files.createTempDirectory("test_snapshot-eviction_hdfs").toFile().getAbsoluteFile(); + miniDFSCluster = MiniHdfsClusterUtil.initMiniDfs(MiniHdfsClusterUtil.SNAPSHOT_EVICTION_TEST_PORT, baseDir); + miniDfs = miniDFSCluster.getFileSystem(); + miniDfs.mkdirs(evictionDir, fsPermission); + miniDfs.allowSnapshot(evictionDir); + createSnapshotsForEviction(); + } + + private void createSnapshotsForEviction() throws Exception { + for (int i = 0; i < NUM_FILES; i++) { + miniDfs.createSnapshot(evictionDir, String.valueOf(i)); + Thread.sleep(10000); + } + } + + @Test + public void evictionTest() throws Exception { + Path snapshotDir = new Path(evictionDir, ".snapshot"); + FileStatus[] fileStatuses = miniDfs.listStatus(snapshotDir); + Assert.assertEquals(fileStatuses.length, NUM_FILES); + + evictSnapshots(miniDfs, evictionDir.toString(), "minutes(1)", NUM_FILES + 1); + fileStatuses = miniDfs.listStatus(snapshotDir); + Assert.assertEquals(fileStatuses.length, NUM_FILES); + + evictSnapshots(miniDfs, evictionDir.toString(), "minutes(1)", NUM_FILES - 1); + fileStatuses = miniDfs.listStatus(snapshotDir); + Assert.assertEquals(fileStatuses.length, NUM_FILES - 1); + + evictSnapshots(miniDfs, evictionDir.toString(), "minutes(1)", 2); + fileStatuses = miniDfs.listStatus(snapshotDir); + Assert.assertTrue(fileStatuses.length >= 5); + } + + @Test(expectedExceptions = FalconException.class, + expectedExceptionsMessageRegExp = "Unable to evict snapshots from dir /apps/falcon/non-snapshot-eviction") + public void evictionTestCannotSnapshot() throws Exception { + Path nonSnapshotDir = new Path("/apps/falcon/non-snapshot-eviction/"); + miniDfs.mkdirs(nonSnapshotDir, fsPermission); + evictSnapshots(miniDfs, nonSnapshotDir.toString(), "minutes(1)", NUM_FILES); + } + + @AfterClass + public void cleanup() throws Exception { + MiniHdfsClusterUtil.cleanupDfs(miniDFSCluster, baseDir); + + } +} http://git-wip-us.apache.org/repos/asf/falcon/blob/aba79aae/addons/hdfs-snapshot-mirroring/src/test/resources/backup-cluster-0.1.xml ---------------------------------------------------------------------- diff --git a/addons/hdfs-snapshot-mirroring/src/test/resources/backup-cluster-0.1.xml b/addons/hdfs-snapshot-mirroring/src/test/resources/backup-cluster-0.1.xml new file mode 100644 index 0000000..f89237e --- /dev/null +++ b/addons/hdfs-snapshot-mirroring/src/test/resources/backup-cluster-0.1.xml @@ -0,0 +1,44 @@ +<?xml version="1.0"?> +<!-- + ~ Licensed to the Apache Software Foundation (ASF) under one + ~ or more contributor license agreements. See the NOTICE file + ~ distributed with this work for additional information + ~ regarding copyright ownership. The ASF licenses this file + ~ to you under the Apache License, Version 2.0 (the + ~ "License"); you may not use this file except in compliance + ~ with the License. You may obtain a copy of the License at + ~ + ~ http://www.apache.org/licenses/LICENSE-2.0 + ~ + ~ Unless required by applicable law or agreed to in writing, software + ~ distributed under the License is distributed on an "AS IS" BASIS, + ~ WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + ~ See the License for the specific language governing permissions and + ~ limitations under the License. + --> + +<cluster colo="gs" description="" name="backupSnapshotRepl" xmlns="uri:falcon:cluster:0.1" + > + <interfaces> + <interface type="readonly" endpoint="hftp://localhost:50010" + version="0.20.2"/> + <interface type="write" endpoint="hdfs://localhost:54136" + version="0.20.2"/> + <interface type="execute" endpoint="localhost:8021" version="0.20.2"/> + <interface type="workflow" endpoint="http://localhost:11000/oozie/" + version="4.0"/> + <interface type="messaging" endpoint="tcp://localhost:61616?daemon=true" + version="5.1.6"/> + <interface type="registry" endpoint="Hcat" version="1"/> + </interfaces> + <locations> + <location name="staging" path="/projects/falcon/staging"/> + <location name="temp" path="/tmp"/> + <location name="working" path="/projects/falcon/working"/> + </locations> + <properties> + <property name="field1" value="value1"/> + <property name="field2" value="value2"/> + <property name="hive.metastore.client.socket.timeout" value="20"/> + </properties> +</cluster> \ No newline at end of file http://git-wip-us.apache.org/repos/asf/falcon/blob/aba79aae/addons/hdfs-snapshot-mirroring/src/test/resources/primary-cluster-0.1.xml ---------------------------------------------------------------------- diff --git a/addons/hdfs-snapshot-mirroring/src/test/resources/primary-cluster-0.1.xml b/addons/hdfs-snapshot-mirroring/src/test/resources/primary-cluster-0.1.xml new file mode 100644 index 0000000..30c6242 --- /dev/null +++ b/addons/hdfs-snapshot-mirroring/src/test/resources/primary-cluster-0.1.xml @@ -0,0 +1,43 @@ +<?xml version="1.0"?> +<!-- + ~ Licensed to the Apache Software Foundation (ASF) under one + ~ or more contributor license agreements. See the NOTICE file + ~ distributed with this work for additional information + ~ regarding copyright ownership. The ASF licenses this file + ~ to you under the Apache License, Version 2.0 (the + ~ "License"); you may not use this file except in compliance + ~ with the License. You may obtain a copy of the License at + ~ + ~ http://www.apache.org/licenses/LICENSE-2.0 + ~ + ~ Unless required by applicable law or agreed to in writing, software + ~ distributed under the License is distributed on an "AS IS" BASIS, + ~ WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + ~ See the License for the specific language governing permissions and + ~ limitations under the License. + --> + +<cluster colo="gs" description="" name="primarySnapshotRepl" xmlns="uri:falcon:cluster:0.1"> + <interfaces> + <interface type="readonly" endpoint="hftp://localhost:50010" + version="0.20.2"/> + <interface type="write" endpoint="hdfs://localhost:54136" + version="0.20.2"/> + <interface type="execute" endpoint="localhost:8021" version="0.20.2"/> + <interface type="workflow" endpoint="http://localhost:11000/oozie/" + version="4.0"/> + <interface type="messaging" endpoint="tcp://localhost:61616?daemon=true" + version="5.1.6"/> + <interface type="registry" endpoint="Hcat" version="1"/> + </interfaces> + <locations> + <location name="staging" path="/projects/falcon/staging"/> + <location name="temp" path="/tmp"/> + <location name="working" path="/projects/falcon/working"/> + </locations> + <properties> + <property name="field1" value="value1"/> + <property name="field2" value="value2"/> + <property name="hive.metastore.client.socket.timeout" value="20"/> + </properties> +</cluster> \ No newline at end of file http://git-wip-us.apache.org/repos/asf/falcon/blob/aba79aae/common/src/main/java/org/apache/falcon/entity/ClusterHelper.java ---------------------------------------------------------------------- diff --git a/common/src/main/java/org/apache/falcon/entity/ClusterHelper.java b/common/src/main/java/org/apache/falcon/entity/ClusterHelper.java index 24ba7d7..9d79742 100644 --- a/common/src/main/java/org/apache/falcon/entity/ClusterHelper.java +++ b/common/src/main/java/org/apache/falcon/entity/ClusterHelper.java @@ -18,6 +18,7 @@ package org.apache.falcon.entity; +import org.apache.commons.lang3.StringUtils; import org.apache.falcon.FalconException; import org.apache.falcon.entity.store.ConfigurationStore; import org.apache.falcon.entity.v0.EntityType; @@ -28,6 +29,7 @@ import org.apache.falcon.entity.v0.cluster.Interfacetype; import org.apache.falcon.entity.v0.cluster.Location; import org.apache.falcon.entity.v0.cluster.Property; import org.apache.falcon.hadoop.HadoopClientFactory; +import org.apache.falcon.security.SecurityUtil; import org.apache.hadoop.conf.Configuration; import org.apache.hadoop.fs.Path; @@ -71,6 +73,18 @@ public final class ClusterHelper { return conf; } + public static Configuration getConfiguration(String storageUrl, String executeEndPoint, + String kerberosPrincipal) { + Configuration conf = new Configuration(); + conf.set(HadoopClientFactory.FS_DEFAULT_NAME_KEY, storageUrl); + conf.set(HadoopClientFactory.MR_JT_ADDRESS_KEY, executeEndPoint); + conf.set(HadoopClientFactory.YARN_RM_ADDRESS_KEY, executeEndPoint); + if (StringUtils.isNotBlank(kerberosPrincipal)) { + conf.set(SecurityUtil.NN_PRINCIPAL, kerberosPrincipal); + } + return conf; + } + public static String getOozieUrl(Cluster cluster) { return getInterface(cluster, Interfacetype.WORKFLOW).getEndpoint(); } http://git-wip-us.apache.org/repos/asf/falcon/blob/aba79aae/common/src/main/java/org/apache/falcon/hadoop/HadoopClientFactory.java ---------------------------------------------------------------------- diff --git a/common/src/main/java/org/apache/falcon/hadoop/HadoopClientFactory.java b/common/src/main/java/org/apache/falcon/hadoop/HadoopClientFactory.java index e970439..3d6b16b 100644 --- a/common/src/main/java/org/apache/falcon/hadoop/HadoopClientFactory.java +++ b/common/src/main/java/org/apache/falcon/hadoop/HadoopClientFactory.java @@ -29,6 +29,7 @@ import org.apache.hadoop.fs.FileSystem; import org.apache.hadoop.fs.Path; import org.apache.hadoop.fs.permission.FsAction; import org.apache.hadoop.fs.permission.FsPermission; +import org.apache.hadoop.hdfs.DistributedFileSystem; import org.apache.hadoop.mapred.JobClient; import org.apache.hadoop.mapred.JobConf; import org.apache.hadoop.security.UserGroupInformation; @@ -132,6 +133,28 @@ public final class HadoopClientFactory { } } + /** + * Return a DistributedFileSystem created with the authenticated proxy user for the specified conf. + * + * @param conf Configuration with all necessary information to create the FileSystem. + * @return DistributedFileSystem created with the provided proxyUser/group. + * @throws org.apache.falcon.FalconException + * if the filesystem could not be created. + */ + public DistributedFileSystem createDistributedProxiedFileSystem(final Configuration conf) throws FalconException { + Validate.notNull(conf, "configuration cannot be null"); + + String nameNode = getNameNode(conf); + try { + return createDistributedFileSystem(CurrentUser.getProxyUGI(), new URI(nameNode), conf); + } catch (URISyntaxException e) { + throw new FalconException("Exception while getting Distributed FileSystem for: " + nameNode, e); + } catch (IOException e) { + throw new FalconException("Exception while getting Distributed FileSystem for proxy: " + + CurrentUser.getUser(), e); + } + } + private static String getNameNode(Configuration conf) { return conf.get(FS_DEFAULT_NAME_KEY); } @@ -172,19 +195,7 @@ public final class HadoopClientFactory { @SuppressWarnings("ResultOfMethodCallIgnored") public FileSystem createFileSystem(UserGroupInformation ugi, final URI uri, final Configuration conf) throws FalconException { - Validate.notNull(ugi, "ugi cannot be null"); - Validate.notNull(conf, "configuration cannot be null"); - - try { - if (UserGroupInformation.isSecurityEnabled()) { - ugi.checkTGTAndReloginFromKeytab(); - } - } catch (IOException ioe) { - throw new FalconException("Exception while getting FileSystem. Unable to check TGT for user " - + ugi.getShortUserName(), ioe); - } - - validateNameNode(uri, conf); + validateInputs(ugi, uri, conf); try { // prevent falcon impersonating falcon, no need to use doas @@ -201,13 +212,65 @@ public final class HadoopClientFactory { return FileSystem.get(uri, conf); } }); - } catch (InterruptedException ex) { + } catch (InterruptedException | IOException ex) { throw new FalconException("Exception creating FileSystem:" + ex.getMessage(), ex); - } catch (IOException ex) { + } + } + + /** + * Return a DistributedFileSystem created with the provided user for the specified URI. + * + * @param ugi user group information + * @param uri file system URI. + * @param conf Configuration with all necessary information to create the FileSystem. + * @return DistributedFileSystem created with the provided user/group. + * @throws org.apache.falcon.FalconException + * if the filesystem could not be created. + */ + @SuppressWarnings("ResultOfMethodCallIgnored") + public DistributedFileSystem createDistributedFileSystem(UserGroupInformation ugi, final URI uri, + final Configuration conf) throws FalconException { + validateInputs(ugi, uri, conf); + FileSystem returnFs; + try { + // prevent falcon impersonating falcon, no need to use doas + final String proxyUserName = ugi.getShortUserName(); + if (proxyUserName.equals(UserGroupInformation.getLoginUser().getShortUserName())) { + LOG.info("Creating Distributed FS for the login user {}, impersonation not required", + proxyUserName); + returnFs = DistributedFileSystem.get(uri, conf); + } else { + LOG.info("Creating FS impersonating user {}", proxyUserName); + returnFs = ugi.doAs(new PrivilegedExceptionAction<FileSystem>() { + public FileSystem run() throws Exception { + return DistributedFileSystem.get(uri, conf); + } + }); + } + + return (DistributedFileSystem) returnFs; + } catch (InterruptedException | IOException ex) { throw new FalconException("Exception creating FileSystem:" + ex.getMessage(), ex); } } + private void validateInputs(UserGroupInformation ugi, final URI uri, + final Configuration conf) throws FalconException { + Validate.notNull(ugi, "ugi cannot be null"); + Validate.notNull(conf, "configuration cannot be null"); + + try { + if (UserGroupInformation.isSecurityEnabled()) { + ugi.checkTGTAndReloginFromKeytab(); + } + } catch (IOException ioe) { + throw new FalconException("Exception while getting FileSystem. Unable to check TGT for user " + + ugi.getShortUserName(), ioe); + } + + validateNameNode(uri, conf); + } + /** * This method validates if the execute url is able to reach the MR endpoint. *
