Repository: falcon Updated Branches: refs/heads/master 0aeb6c89a -> 4aa036e68
FALCON-1088. Fixing FeedDelayParallelTimeoutTest and renaming it to FeedDelayTest, contributed by Pragya M Project: http://git-wip-us.apache.org/repos/asf/falcon/repo Commit: http://git-wip-us.apache.org/repos/asf/falcon/commit/fe97483d Tree: http://git-wip-us.apache.org/repos/asf/falcon/tree/fe97483d Diff: http://git-wip-us.apache.org/repos/asf/falcon/diff/fe97483d Branch: refs/heads/master Commit: fe97483d701e757269df8927598c6ae393596e1a Parents: 0aeb6c8 Author: samarthg <[email protected]> Authored: Wed Apr 1 04:51:22 2015 +0000 Committer: samarthg <[email protected]> Committed: Wed Apr 1 04:51:22 2015 +0000 ---------------------------------------------------------------------- falcon-regression/CHANGES.txt | 4 + .../falcon/regression/Entities/FeedMerlin.java | 7 + .../prism/FeedDelayParallelTimeoutTest.java | 112 ---------- .../falcon/regression/prism/FeedDelayTest.java | 224 +++++++++++++++++++ 4 files changed, 235 insertions(+), 112 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/falcon/blob/fe97483d/falcon-regression/CHANGES.txt ---------------------------------------------------------------------- diff --git a/falcon-regression/CHANGES.txt b/falcon-regression/CHANGES.txt index 91f1bec..2d0e4c3 100644 --- a/falcon-regression/CHANGES.txt +++ b/falcon-regression/CHANGES.txt @@ -63,6 +63,10 @@ Trunk (Unreleased) via Samarth Gupta) IMPROVEMENTS + + FALCON-1088 Fixing FeedDelayParallelTimeoutTest and renaming it to FeedDelayTest(Pragya M via + Samarth G) + FALCON-1112 Migrate methods related to *Merlin.java classes from Util.java to their respective *Merlin.java (Paul Isaychuk via Ruslan Ostafiychuk) http://git-wip-us.apache.org/repos/asf/falcon/blob/fe97483d/falcon-regression/merlin-core/src/main/java/org/apache/falcon/regression/Entities/FeedMerlin.java ---------------------------------------------------------------------- diff --git a/falcon-regression/merlin-core/src/main/java/org/apache/falcon/regression/Entities/FeedMerlin.java b/falcon-regression/merlin-core/src/main/java/org/apache/falcon/regression/Entities/FeedMerlin.java index 76d34d8..70e2e73 100644 --- a/falcon-regression/merlin-core/src/main/java/org/apache/falcon/regression/Entities/FeedMerlin.java +++ b/falcon-regression/merlin-core/src/main/java/org/apache/falcon/regression/Entities/FeedMerlin.java @@ -211,6 +211,13 @@ public class FeedMerlin extends Feed { cluster.setLocations(feedLocations); return this; } + + public FeedClusterBuilder withDelay(Frequency frequency) { + cluster.setDelay(frequency); + return this; + } + + } /** http://git-wip-us.apache.org/repos/asf/falcon/blob/fe97483d/falcon-regression/merlin/src/test/java/org/apache/falcon/regression/prism/FeedDelayParallelTimeoutTest.java ---------------------------------------------------------------------- diff --git a/falcon-regression/merlin/src/test/java/org/apache/falcon/regression/prism/FeedDelayParallelTimeoutTest.java b/falcon-regression/merlin/src/test/java/org/apache/falcon/regression/prism/FeedDelayParallelTimeoutTest.java deleted file mode 100644 index 8474401..0000000 --- a/falcon-regression/merlin/src/test/java/org/apache/falcon/regression/prism/FeedDelayParallelTimeoutTest.java +++ /dev/null @@ -1,112 +0,0 @@ -/** - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ - -package org.apache.falcon.regression.prism; - -import org.apache.falcon.regression.Entities.FeedMerlin; -import org.apache.falcon.regression.core.bundle.Bundle; -import org.apache.falcon.regression.core.helpers.ColoHelper; -import org.apache.falcon.regression.core.util.BundleUtil; -import org.apache.falcon.regression.core.util.OSUtil; -import org.apache.falcon.regression.core.util.Util; -import org.apache.falcon.regression.testHelper.BaseTestClass; -import org.apache.log4j.Logger; -import org.testng.annotations.AfterMethod; -import org.testng.annotations.BeforeClass; -import org.testng.annotations.BeforeMethod; -import org.testng.annotations.Test; - - -/** - * Test delays in feed. - */ -@Test(groups = "distributed") -public class FeedDelayParallelTimeoutTest extends BaseTestClass { - - private ColoHelper cluster1 = servers.get(0); - private ColoHelper cluster2 = servers.get(1); - - private String baseTestDir = cleanAndGetTestDir(); - private String feedInputPath = baseTestDir + MINUTE_DATE_PATTERN; - private String aggregateWorkflowDir = baseTestDir + "/aggregator"; - private static final Logger LOGGER = Logger.getLogger(FeedDelayParallelTimeoutTest.class); - - @BeforeClass(alwaysRun = true) - public void uploadWorkflow() throws Exception { - uploadDirToClusters(aggregateWorkflowDir, OSUtil.RESOURCES_OOZIE); - } - - @BeforeMethod(alwaysRun = true) - public void setup() throws Exception { - Bundle bundle = BundleUtil.readELBundle(); - bundles[0] = new Bundle(bundle, cluster1); - bundles[1] = new Bundle(bundle, cluster2); - - bundles[0].generateUniqueBundle(this); - bundles[1].generateUniqueBundle(this); - bundles[0].setProcessWorkflow(aggregateWorkflowDir); - bundles[1].setProcessWorkflow(aggregateWorkflowDir); - } - - @AfterMethod(alwaysRun = true) - public void tearDown() { - removeTestClassEntities(); - } - - @Test(enabled = true, timeOut = 12000000) - public void timeoutTest() throws Exception { - bundles[0].setInputFeedDataPath(feedInputPath); - - Bundle.submitCluster(bundles[0], bundles[1]); - FeedMerlin feedOutput01 = new FeedMerlin(bundles[0].getDataSets().get(0)); - org.apache.falcon.entity.v0.Frequency delay = - new org.apache.falcon.entity.v0.Frequency( - "hours(5)"); - - feedOutput01.clearFeedClusters(); - - // uncomment below 2 line when falcon in sync with falcon - - // feedOutput01 = instanceUtil.setFeedCluster(feedOutput01, - // XmlUtil.createValidity("2013-04-21T00:00Z", - // "2099-10-01T12:10Z"),XmlUtil.createRetention("hours(15)",ActionType.DELETE), - // Util.readClusterName(bundles[1].getClusters().get(0)),ClusterType.SOURCE,"",delay, - // feedInputPath); - // feedOutput01 = instanceUtil.setFeedCluster(feedOutput01, - // XmlUtil.createValidity("2013-04-21T00:00Z", - // "2099-10-01T12:25Z"),XmlUtil.createRetention("hours(15)",ActionType.DELETE), - // Util.readClusterName(bundles[0].getClusters().get(0)),ClusterType.TARGET,"",delay, - // feedOutputPath); - - //feedOutput01 = instanceUtil.setFeedCluster(feedOutput01, - // XmlUtil.createValidity("2013-04-21T00:00Z", - // "2099-10-01T12:10Z"),XmlUtil.createRetention("hours(15)",ActionType.DELETE), - // Util.readClusterName(bundles[1].getClusters().get(0)),ClusterType.SOURCE,"", - // feedInputPath); - //feedOutput01 = instanceUtil.setFeedCluster(feedOutput01, - // XmlUtil.createValidity("2013-04-21T00:00Z", - // "2099-10-01T12:25Z"),XmlUtil.createRetention("hours(15)",ActionType.DELETE), - // Util.readClusterName(bundles[0].getClusters().get(0)),ClusterType.TARGET,"", - // feedOutputPath); - - feedOutput01.setFeedProperty("timeout", "minutes(35)").setFeedProperty("parallel", "3"); - - LOGGER.info("feedOutput01: " + Util.prettyPrintXml(feedOutput01.toString())); - prism.getFeedHelper().submitAndSchedule(feedOutput01.toString()); - } -} http://git-wip-us.apache.org/repos/asf/falcon/blob/fe97483d/falcon-regression/merlin/src/test/java/org/apache/falcon/regression/prism/FeedDelayTest.java ---------------------------------------------------------------------- diff --git a/falcon-regression/merlin/src/test/java/org/apache/falcon/regression/prism/FeedDelayTest.java b/falcon-regression/merlin/src/test/java/org/apache/falcon/regression/prism/FeedDelayTest.java new file mode 100644 index 0000000..a72d339 --- /dev/null +++ b/falcon-regression/merlin/src/test/java/org/apache/falcon/regression/prism/FeedDelayTest.java @@ -0,0 +1,224 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.falcon.regression.prism; + +import org.apache.falcon.entity.v0.EntityType; +import org.apache.falcon.entity.v0.Frequency; +import org.apache.falcon.entity.v0.feed.ActionType; +import org.apache.falcon.entity.v0.feed.ClusterType; +import org.apache.falcon.regression.Entities.FeedMerlin; +import org.apache.falcon.regression.core.bundle.Bundle; +import org.apache.falcon.regression.core.helpers.ColoHelper; +import org.apache.falcon.regression.core.util.*; +import org.apache.falcon.regression.testHelper.BaseTestClass; +import org.apache.hadoop.fs.FileSystem; +import org.apache.log4j.Logger; +import org.apache.oozie.client.CoordinatorAction; +import org.apache.oozie.client.OozieClient; +import org.apache.oozie.client.OozieClientException; +import org.testng.Assert; +import org.testng.annotations.*; + +import java.io.IOException; +import java.util.ArrayList; +import java.util.Collections; +import java.util.List; + + +/** + * Test delays in feed. + */ +@Test(groups = "distributed") +public class FeedDelayTest extends BaseTestClass { + + private static final Logger LOGGER = Logger.getLogger(FeedDelayTest.class); + private ColoHelper cluster1 = servers.get(0); + private ColoHelper cluster2 = servers.get(1); + private FileSystem cluster1FS = serverFS.get(0); + private OozieClient cluster2OC = serverOC.get(1); + private String baseTestDir = cleanAndGetTestDir(); + private String aggregateWorkflowDir = baseTestDir + "/aggregator"; + private String targetPath = baseTestDir + "/target"; + private String targetDataLocation = targetPath + MINUTE_DATE_PATTERN; + private String sourcePath = baseTestDir + "/source"; + private String feedInputPath = sourcePath + MINUTE_DATE_PATTERN; + + @BeforeClass(alwaysRun = true) + public void uploadWorkflow() throws Exception { + uploadDirToClusters(aggregateWorkflowDir, OSUtil.RESOURCES_OOZIE); + } + + @BeforeMethod(alwaysRun = true) + public void setup() throws Exception { + Bundle bundle = BundleUtil.readELBundle(); + bundles[0] = new Bundle(bundle, cluster1); + bundles[1] = new Bundle(bundle, cluster2); + + bundles[0].generateUniqueBundle(this); + bundles[1].generateUniqueBundle(this); + bundles[0].setProcessWorkflow(aggregateWorkflowDir); + bundles[1].setProcessWorkflow(aggregateWorkflowDir); + } + + @AfterMethod(alwaysRun = true) + public void tearDown() throws IOException { + removeTestClassEntities(); + cleanTestsDirs(); + } + + /* Test cases to check delay feature in feed. + * Finding the missing dependencies of coordiantor based on + * given delay in entity and creating them. + * These should match with the expected missing dependencies. + * Also checking the startTime of replicated instance with the expected value. + * In case they dont match, the test should fail. + * @param sourceStartTime : start time of source cluster + * @param targetStartTime : start time of target cluster + * @param sourceDelay : delay in source cluster + * @param targetDelay : delay in target cluster + * @param flag : true if (sourceStartTime < targetStartTime) else false + * */ + @Test(enabled = true, dataProvider = "Feed-Delay-Cases", timeOut = 12000000) + public void delayTest(String sourceStartTime, String targetStartTime, + String sourceDelay, String targetDelay, boolean flag) throws Exception { + + bundles[0].setInputFeedDataPath(feedInputPath); + Bundle.submitCluster(bundles[0], bundles[1]); + String feed = bundles[0].getDataSets().get(0); + + feed = FeedMerlin.fromString(feed).clearFeedClusters().toString(); + + //set cluster1 as source + feed = FeedMerlin.fromString(feed).addFeedCluster( + new FeedMerlin.FeedClusterBuilder(Util.readEntityName(bundles[0].getClusters().get(0))) + .withRetention("hours(15)", ActionType.DELETE) + .withValidity(sourceStartTime, "2099-10-01T12:10Z") + .withClusterType(ClusterType.SOURCE) + .withDelay(new Frequency(sourceDelay)) + .withDataLocation(feedInputPath) + .build()).toString(); + //set cluster2 as target + feed = FeedMerlin.fromString(feed).addFeedCluster( + new FeedMerlin.FeedClusterBuilder(Util.readEntityName(bundles[1].getClusters().get(0))) + .withRetention("hours(15)", ActionType.DELETE) + .withValidity(targetStartTime, "2099-10-01T12:25Z") + .withClusterType(ClusterType.TARGET) + .withDelay(new Frequency(targetDelay)) + .withDataLocation(targetDataLocation) + .build()).toString(); + + feed = FeedMerlin.fromString(feed).setFeedProperty("timeout", "minutes(35)").toString(); + feed = FeedMerlin.fromString(feed).setFeedProperty("parallel", "3").toString(); + + LOGGER.info("feed : " + Util.prettyPrintXml(feed)); + AssertUtil.assertSucceeded(prism.getFeedHelper().submitAndSchedule(feed)); + + //check if coordinator exists + InstanceUtil.waitTillInstancesAreCreated(cluster2, feed, 0); + Assert.assertEquals(InstanceUtil + .checkIfFeedCoordExist(cluster2.getFeedHelper(), Util.readEntityName(feed), "REPLICATION"), 1); + + //Finding bundleId of replicated instance on target + String bundleId = InstanceUtil.getLatestBundleID(cluster2, Util.readEntityName(feed), EntityType.FEED); + + //Finding startTime of replicated instance on target + String startTimeO0zie = OozieUtil.getCoordStartTime(cluster2, feed, 0); + String startTimeExpected = getStartTime(sourceStartTime, targetStartTime, new Frequency(sourceDelay), flag); + + List<String> missingDep = getAndCreateDependencies(cluster1, cluster1FS, cluster2, bundleId); + List<String> qaDep = new ArrayList<String>(); + + if (flag) { + qaDep.add(sourcePath + "/" + sourceStartTime.replaceAll("-", "/"). + replaceAll("T", "/").replaceAll(":", "/").replaceAll("Z", "/")); + } else { + qaDep.add(targetPath + "/" + sourceStartTime.replaceAll("-", "/"). + replaceAll("T", "/").replaceAll(":", "/").replaceAll("Z", "/")); + } + + //replication should start, wait while it ends + InstanceUtil.waitTillInstanceReachState(cluster2OC, Util.readEntityName(feed), 0, + CoordinatorAction.Status.SUCCEEDED, EntityType.FEED); + + Assert.assertTrue(startTimeO0zie.equals(startTimeExpected), + "Start time of bundle should be " + startTimeExpected + " but it is " + startTimeO0zie); + matchDependencies(missingDep, qaDep); + LOGGER.info("Done"); + } + + @DataProvider(name = "Feed-Delay-Cases") + public Object[][] getDelayCases() { + return new Object[][] { + { TimeUtil.getTimeWrtSystemTime(-120), TimeUtil.getTimeWrtSystemTime(-120), + "minutes(40)", "minutes(20)", true, }, + { TimeUtil.getTimeWrtSystemTime(-120), TimeUtil.getTimeWrtSystemTime(-120), + "minutes(20)", "minutes(40)", true, }, + { TimeUtil.getTimeWrtSystemTime(-120), TimeUtil.getTimeWrtSystemTime(-240), + "minutes(40)", "minutes(20)", true, }, + { TimeUtil.getTimeWrtSystemTime(-120), TimeUtil.getTimeWrtSystemTime(-60), + "minutes(40)", "minutes(20)", false, }, + }; + } + + private List<String> getAndCreateDependencies(ColoHelper prismHelper1, FileSystem clusterFS1, + ColoHelper prismHelper2, String bundleId) throws OozieClientException, IOException { + List<String> missingDependencies = OozieUtil.getMissingDependencies(prismHelper2, bundleId); + for (int i = 0; i < 10 && missingDependencies == null; ++i) { + TimeUtil.sleepSeconds(30); + LOGGER.info("sleeping..."); + missingDependencies = OozieUtil.getMissingDependencies(prismHelper2, bundleId); + } + Assert.assertNotNull(missingDependencies, "Missing dependencies not found."); + + // Creating missing dependencies + HadoopUtil.createHDFSFolders(prismHelper1, missingDependencies); + + //Adding data to empty folders + for (String location : missingDependencies) { + LOGGER.info("Transferring data to : " + location); + HadoopUtil.copyDataToFolder(clusterFS1, location, OSUtil.RESOURCES + "feed-s4Replication.xml"); + } + + return missingDependencies; + } + + private String getStartTime(String sourceStartTime, String targetStartTime, Frequency sourceDelay, boolean flag) { + String finalDate; + if (flag) { + finalDate = TimeUtil.addMinsToTime(sourceStartTime, sourceDelay.getFrequencyAsInt()); + } else { + finalDate = TimeUtil.addMinsToTime(targetStartTime, sourceDelay.getFrequencyAsInt()); + } + return finalDate; + } + + private boolean matchDependencies(List<String> fromJob, List<String> qaList) { + Collections.sort(fromJob); + Collections.sort(qaList); + if (fromJob.size() != qaList.size()) { + return false; + } + for (int index = 0; index < fromJob.size(); index++) { + if (!fromJob.get(index).contains(qaList.get(index))) { + return false; + } + } + return true; + } +}
