Repository: falcon Updated Branches: refs/heads/master 3b8d32ffd -> 33d72f77a
FALCON-1777 Add regression for HDFS replication (recipe). Contributed by Paul Isaychuk Project: http://git-wip-us.apache.org/repos/asf/falcon/repo Commit: http://git-wip-us.apache.org/repos/asf/falcon/commit/33d72f77 Tree: http://git-wip-us.apache.org/repos/asf/falcon/tree/33d72f77 Diff: http://git-wip-us.apache.org/repos/asf/falcon/diff/33d72f77 Branch: refs/heads/master Commit: 33d72f77ac11dfd344d2905afc4d42e056f450b5 Parents: 3b8d32f Author: Paul Isaychuk <[email protected]> Authored: Thu Jan 28 17:37:40 2016 +0200 Committer: Paul Isaychuk <[email protected]> Committed: Thu Jan 28 17:37:40 2016 +0200 ---------------------------------------------------------------------- falcon-regression/CHANGES.txt | 2 + .../regression/Entities/RecipeMerlin.java | 40 +++--- .../regression/hive/dr/HdfsRecipeTest.java | 126 +++++++++++++++++++ .../hive-disaster-recovery-workflow.xml | 8 +- .../hive-disaster-recovery.properties | 21 ++-- 5 files changed, 169 insertions(+), 28 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/falcon/blob/33d72f77/falcon-regression/CHANGES.txt ---------------------------------------------------------------------- diff --git a/falcon-regression/CHANGES.txt b/falcon-regression/CHANGES.txt index f2df91a..e6664f3 100644 --- a/falcon-regression/CHANGES.txt +++ b/falcon-regression/CHANGES.txt @@ -5,6 +5,8 @@ Trunk (Unreleased) INCOMPATIBLE CHANGES NEW FEATURES + FALCON-1777 Add regression for HDFS replication (recipe) (Paul Isaychuk) + FALCON-1699 Test fixes for RetentionTest, LineageApiTest, TouchAPIPrismAndServerTest, FeedReplicationTest and few fortifications(Paul Isaychuk via Pragya Mittal) FALCON-1698 New tests for ProcessSetupTest, ClusterSetupTest, UI test fixes(Paul Isaychuk via Ajay Yadava) http://git-wip-us.apache.org/repos/asf/falcon/blob/33d72f77/falcon-regression/merlin-core/src/main/java/org/apache/falcon/regression/Entities/RecipeMerlin.java ---------------------------------------------------------------------- diff --git a/falcon-regression/merlin-core/src/main/java/org/apache/falcon/regression/Entities/RecipeMerlin.java b/falcon-regression/merlin-core/src/main/java/org/apache/falcon/regression/Entities/RecipeMerlin.java index 40fec08..9b9cff2 100644 --- a/falcon-regression/merlin-core/src/main/java/org/apache/falcon/regression/Entities/RecipeMerlin.java +++ b/falcon-regression/merlin-core/src/main/java/org/apache/falcon/regression/Entities/RecipeMerlin.java @@ -131,27 +131,35 @@ public final class RecipeMerlin { public RecipeMerlin withSourceCluster(ClusterMerlin sourceCluster) { this.srcCluster = sourceCluster; - properties.setProperty("sourceCluster", sourceCluster.getName()); - properties.setProperty("sourceMetastoreUri", sourceCluster.getProperty("hive.metastore.uris")); - properties.setProperty("sourceHiveServer2Uri", sourceCluster.getProperty("hive.server2.uri")); - //properties.setProperty("sourceServicePrincipal", - // sourceCluster.getProperty("hive.metastore.kerberos.principal")); - properties.setProperty("sourceStagingPath", sourceCluster.getLocation("staging")); - properties.setProperty("sourceNN", sourceCluster.getInterfaceEndpoint(Interfacetype.WRITE)); - properties.setProperty("sourceRM", sourceCluster.getInterfaceEndpoint(Interfacetype.EXECUTE)); + if (recipeOperation == FalconCLI.RecipeOperation.HDFS_REPLICATION) { + properties.setProperty("drSourceClusterFS", sourceCluster.getInterfaceEndpoint(Interfacetype.WRITE)); + } else { + properties.setProperty("sourceCluster", sourceCluster.getName()); + properties.setProperty("sourceMetastoreUri", sourceCluster.getProperty("hive.metastore.uris")); + properties.setProperty("sourceHiveServer2Uri", sourceCluster.getProperty("hive.server2.uri")); + //properties.setProperty("sourceServicePrincipal", + // sourceCluster.getProperty("hive.metastore.kerberos.principal")); + properties.setProperty("sourceStagingPath", sourceCluster.getLocation("staging")); + properties.setProperty("sourceNN", sourceCluster.getInterfaceEndpoint(Interfacetype.WRITE)); + properties.setProperty("sourceRM", sourceCluster.getInterfaceEndpoint(Interfacetype.EXECUTE)); + } return this; } public RecipeMerlin withTargetCluster(ClusterMerlin targetCluster) { this.tgtCluster = targetCluster; - properties.setProperty("targetCluster", targetCluster.getName()); - properties.setProperty("targetMetastoreUri", targetCluster.getProperty("hive.metastore.uris")); - properties.setProperty("targetHiveServer2Uri", targetCluster.getProperty("hive.server2.uri")); - //properties.setProperty("targetServicePrincipal", - // targetCluster.getProperty("hive.metastore.kerberos.principal")); - properties.setProperty("targetStagingPath", targetCluster.getLocation("staging")); - properties.setProperty("targetNN", targetCluster.getInterfaceEndpoint(Interfacetype.WRITE)); - properties.setProperty("targetRM", targetCluster.getInterfaceEndpoint(Interfacetype.EXECUTE)); + if (recipeOperation == FalconCLI.RecipeOperation.HDFS_REPLICATION) { + properties.setProperty("drTargetClusterFS", targetCluster.getInterfaceEndpoint(Interfacetype.WRITE)); + } else { + properties.setProperty("targetCluster", targetCluster.getName()); + properties.setProperty("targetMetastoreUri", targetCluster.getProperty("hive.metastore.uris")); + properties.setProperty("targetHiveServer2Uri", targetCluster.getProperty("hive.server2.uri")); + //properties.setProperty("targetServicePrincipal", + // targetCluster.getProperty("hive.metastore.kerberos.principal")); + properties.setProperty("targetStagingPath", targetCluster.getLocation("staging")); + properties.setProperty("targetNN", targetCluster.getInterfaceEndpoint(Interfacetype.WRITE)); + properties.setProperty("targetRM", targetCluster.getInterfaceEndpoint(Interfacetype.EXECUTE)); + } return this; } http://git-wip-us.apache.org/repos/asf/falcon/blob/33d72f77/falcon-regression/merlin/src/test/java/org/apache/falcon/regression/hive/dr/HdfsRecipeTest.java ---------------------------------------------------------------------- diff --git a/falcon-regression/merlin/src/test/java/org/apache/falcon/regression/hive/dr/HdfsRecipeTest.java b/falcon-regression/merlin/src/test/java/org/apache/falcon/regression/hive/dr/HdfsRecipeTest.java new file mode 100644 index 0000000..05b9cf4 --- /dev/null +++ b/falcon-regression/merlin/src/test/java/org/apache/falcon/regression/hive/dr/HdfsRecipeTest.java @@ -0,0 +1,126 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.falcon.regression.hive.dr; + +import org.apache.falcon.cli.FalconCLI; +import org.apache.falcon.entity.v0.EntityType; +import org.apache.falcon.entity.v0.Frequency; +import org.apache.falcon.regression.Entities.ClusterMerlin; +import org.apache.falcon.regression.Entities.RecipeMerlin; +import org.apache.falcon.regression.core.bundle.Bundle; +import org.apache.falcon.regression.core.helpers.ColoHelper; +import org.apache.falcon.regression.core.util.AssertUtil; +import org.apache.falcon.regression.core.util.BundleUtil; +import org.apache.falcon.regression.core.util.HadoopUtil; +import org.apache.falcon.regression.core.util.InstanceUtil; +import org.apache.falcon.regression.core.util.MatrixUtil; +import org.apache.falcon.regression.core.util.OSUtil; +import org.apache.falcon.regression.core.util.TimeUtil; +import org.apache.falcon.regression.testHelper.BaseTestClass; +import org.apache.hadoop.fs.FileSystem; +import org.apache.hadoop.fs.Path; +import org.apache.log4j.Logger; +import org.apache.oozie.client.CoordinatorAction; +import org.apache.oozie.client.OozieClient; +import org.testng.Assert; +import org.testng.annotations.AfterMethod; +import org.testng.annotations.DataProvider; +import org.testng.annotations.Test; + +import java.io.IOException; +import java.util.List; + +/** + * Hdfs recipe test. + */ +@Test(groups = "embedded") +public class HdfsRecipeTest extends BaseTestClass { + private static final Logger LOGGER = Logger.getLogger(HdfsRecipeTest.class); + private final ColoHelper cluster = servers.get(0); + private final ColoHelper cluster2 = servers.get(1); + private final FileSystem clusterFS = serverFS.get(0); + private final FileSystem clusterFS2 = serverFS.get(1); + private final OozieClient clusterOC = serverOC.get(0); + private final OozieClient clusterOC2 = serverOC.get(1); + private final String baseTestHDFSDir = cleanAndGetTestDir() + "/HdfsDR"; + private String sourceDataLocation = baseTestHDFSDir + "/source"; + private String targetDataLocation = baseTestHDFSDir + "/target"; + private RecipeMerlin hdfsRecipe; + + @DataProvider + public Object[][] getRecipeLocation() { + return MatrixUtil.crossProduct(RecipeExecLocation.values()); + } + + private void setUp(RecipeExecLocation recipeExecLocation) throws Exception { + bundles[0] = new Bundle(BundleUtil.readELBundle(), cluster); + bundles[1] = new Bundle(BundleUtil.readELBundle(), cluster2); + bundles[0].generateUniqueBundle(this); + bundles[1].generateUniqueBundle(this); + final ClusterMerlin srcCluster = bundles[0].getClusterElement(); + final ClusterMerlin tgtCluster = bundles[1].getClusterElement(); + String recipeDir = "HdfsRecipe"; + Bundle.submitCluster(recipeExecLocation.getRecipeBundle(bundles[0], bundles[1])); + hdfsRecipe = RecipeMerlin.readFromDir(recipeDir, FalconCLI.RecipeOperation.HDFS_REPLICATION) + .withRecipeCluster(recipeExecLocation.getRecipeCluster(srcCluster, tgtCluster)); + hdfsRecipe.withSourceCluster(srcCluster) + .withTargetCluster(tgtCluster) + .withFrequency(new Frequency("5", Frequency.TimeUnit.minutes)) + .withValidity(TimeUtil.getTimeWrtSystemTime(-5), TimeUtil.getTimeWrtSystemTime(15)); + hdfsRecipe.setUniqueName(this.getClass().getSimpleName()); + } + + /** + * Test recipe based replication with 1 source and 1 target. + */ + @Test(dataProvider = "getRecipeLocation") + public void test1Source1Target(RecipeExecLocation execLocation) throws Exception { + setUp(execLocation); + hdfsRecipe.withSourceDir(sourceDataLocation).withTargetDir(targetDataLocation); + final List<String> command = hdfsRecipe.getSubmissionCommand(); + Assert.assertEquals(Bundle.runFalconCLI(command), 0, "Recipe submission failed."); + + InstanceUtil.waitTillInstanceReachState(execLocation.getRecipeOC(clusterOC, clusterOC2), + hdfsRecipe.getName(), 1, CoordinatorAction.Status.WAITING, EntityType.PROCESS); + + HadoopUtil.copyDataToFolder(clusterFS, sourceDataLocation, OSUtil.NORMAL_INPUT); + + InstanceUtil.waitTillInstanceReachState(execLocation.getRecipeOC(clusterOC, clusterOC2), + hdfsRecipe.getName(), 1, CoordinatorAction.Status.SUCCEEDED, EntityType.PROCESS); + + //check if data has been replicated correctly + List<Path> cluster1ReplicatedData = HadoopUtil + .getAllFilesRecursivelyHDFS(clusterFS, new Path(sourceDataLocation)); + List<Path> cluster2ReplicatedData = HadoopUtil + .getAllFilesRecursivelyHDFS(clusterFS2, new Path(targetDataLocation)); + + AssertUtil.checkForListSizes(cluster1ReplicatedData, cluster2ReplicatedData); + } + + @AfterMethod(alwaysRun = true) + public void tearDown() throws IOException { + try { + prism.getProcessHelper().deleteByName(hdfsRecipe.getName(), null); + } catch (Exception e) { + LOGGER.info("Deletion of process: " + hdfsRecipe.getName() + " failed with exception: " + e); + } + removeTestClassEntities(); + cleanTestsDirs(); + } +} http://git-wip-us.apache.org/repos/asf/falcon/blob/33d72f77/falcon-regression/merlin/src/test/resources/HdfsRecipe/hive-disaster-recovery-workflow.xml ---------------------------------------------------------------------- diff --git a/falcon-regression/merlin/src/test/resources/HdfsRecipe/hive-disaster-recovery-workflow.xml b/falcon-regression/merlin/src/test/resources/HdfsRecipe/hive-disaster-recovery-workflow.xml index 7c4c53a..aa820d0 100644 --- a/falcon-regression/merlin/src/test/resources/HdfsRecipe/hive-disaster-recovery-workflow.xml +++ b/falcon-regression/merlin/src/test/resources/HdfsRecipe/hive-disaster-recovery-workflow.xml @@ -47,6 +47,10 @@ <name>oozie.launcher.oozie.libpath</name> <value>${wf:conf("falcon.libpath")}</value> </property> + <property> + <name>oozie.launcher.mapreduce.job.hdfs-servers</name> + <value>${drSourceClusterFS},${drTargetClusterFS}</value> + </property> </configuration> <main-class>org.apache.falcon.replication.FeedReplicator</main-class> <arg>-Dmapred.job.queue.name=${queueName}</arg> @@ -56,7 +60,7 @@ <arg>-mapBandwidth</arg> <arg>${distcpMapBandwidth}</arg> <arg>-sourcePaths</arg> - <arg>${drSourceClusterFS}${drSourceDir}</arg> + <arg>${drSourceDir}</arg> <arg>-targetPath</arg> <arg>${drTargetClusterFS}${drTargetDir}</arg> <arg>-falconFeedStorageType</arg> @@ -64,7 +68,7 @@ <arg>-availabilityFlag</arg> <arg>${availabilityFlag == 'NA' ? "NA" : availabilityFlag}</arg> <arg>-counterLogDir</arg> - <arg>${logDir}/job-${nominalTime}</arg> + <arg>${logDir}/job-${nominalTime}/${srcClusterName == 'NA' ? '' : srcClusterName}</arg> </java> <ok to="end"/> <error to="fail"/> http://git-wip-us.apache.org/repos/asf/falcon/blob/33d72f77/falcon-regression/merlin/src/test/resources/HdfsRecipe/hive-disaster-recovery.properties ---------------------------------------------------------------------- diff --git a/falcon-regression/merlin/src/test/resources/HdfsRecipe/hive-disaster-recovery.properties b/falcon-regression/merlin/src/test/resources/HdfsRecipe/hive-disaster-recovery.properties index 7c95db7..6c715f3 100644 --- a/falcon-regression/merlin/src/test/resources/HdfsRecipe/hive-disaster-recovery.properties +++ b/falcon-regression/merlin/src/test/resources/HdfsRecipe/hive-disaster-recovery.properties @@ -26,14 +26,13 @@ falcon.recipe.name=hdfs-replication-monthly falcon.recipe.workflow.name=hdfs-dr-workflow # Provide Wf absolute path. This can be HDFS or local FS path. If WF is on local FS it will be copied to HDFS falcon.recipe.workflow.path=/apps/data-mirroring/workflows/hdfs-replication-workflow.xml -# Provide Wf lib absolute path. This can be HDFS or local FS path. If libs are on local FS it will be copied to HDFS -#falcon.recipe.workflow.lib.path=/recipes/hdfs-replication/lib ##### Cluster properties + # Cluster where job should run falcon.recipe.cluster.name=primaryCluster # Change the cluster hdfs write end point here. This is mandatory. -falcon.recipe.cluster.hdfs.writeEndPoint=hdfs://240.0.0.10:8020 +falcon.recipe.cluster.hdfs.writeEndPoint=hdfs://localhost:8020 # Change the cluster validity start time here falcon.recipe.cluster.validity.start=2015-03-13T00:00Z # Change the cluster validity end time here @@ -43,10 +42,6 @@ falcon.recipe.cluster.validity.end=2016-12-30T00:00Z # Change the recipe frequency here. Valid frequency type are minutes, hours, days, months falcon.recipe.process.frequency=minutes(5) -##### Tag properties - An optional list of comma separated tags, Key Value Pairs, separated by comma -##### Uncomment to add tags -#falcon.recipe.tags= - ##### Retry policy properties falcon.recipe.retry.policy=periodic @@ -54,14 +49,20 @@ falcon.recipe.retry.delay=minutes(30) falcon.recipe.retry.attempts=3 falcon.recipe.retry.onTimeout=false +##### Tag properties - An optional list of comma separated tags, Key Value Pairs, separated by comma +##### Uncomment to add tags +#falcon.recipe.tags= + ##### ACL properties - Uncomment and change ACL if authorization is enabled -falcon.recipe.acl.owner=ambari-qa -falcon.recipe.acl.group=users -falcon.recipe.acl.permission=0x755 +#falcon.recipe.acl.owner=ambari-qa +#falcon.recipe.acl.group=users +#falcon.recipe.acl.permission=0x755 +#falcon.recipe.nn.principal=nn/[email protected] ##### Custom Job properties +# Specify multiple comma separated source directories drSourceDir=/user/falcon_qa/dr/test/primaryCluster/input drSourceClusterFS=hdfs://240.0.0.10:8020 drTargetDir=/user/falcon_qa/dr/test/backupCluster/input
