http://git-wip-us.apache.org/repos/asf/falcon/blob/8e49379d/falcon-regression/merlin/src/test/java/org/apache/falcon/regression/FeedInstanceStatusTest.java ---------------------------------------------------------------------- diff --git a/falcon-regression/merlin/src/test/java/org/apache/falcon/regression/FeedInstanceStatusTest.java b/falcon-regression/merlin/src/test/java/org/apache/falcon/regression/FeedInstanceStatusTest.java deleted file mode 100644 index 514fd10..0000000 --- a/falcon-regression/merlin/src/test/java/org/apache/falcon/regression/FeedInstanceStatusTest.java +++ /dev/null @@ -1,234 +0,0 @@ -/** - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ - -package org.apache.falcon.regression; - -import org.apache.falcon.regression.Entities.FeedMerlin; -import org.apache.falcon.regression.core.bundle.Bundle; -import org.apache.falcon.entity.v0.feed.ActionType; -import org.apache.falcon.entity.v0.feed.ClusterType; -import org.apache.falcon.regression.core.helpers.ColoHelper; -import org.apache.falcon.regression.core.util.AssertUtil; -import org.apache.falcon.regression.core.util.BundleUtil; -import org.apache.falcon.regression.core.util.HadoopUtil; -import org.apache.falcon.regression.core.util.OSUtil; -import org.apache.falcon.regression.core.util.TimeUtil; -import org.apache.falcon.regression.core.util.Util; -import org.apache.falcon.regression.testHelper.BaseTestClass; -import org.apache.falcon.resource.InstancesResult; -import org.apache.hadoop.fs.FileSystem; -import org.apache.log4j.Logger; -import org.testng.annotations.AfterMethod; -import org.testng.annotations.BeforeClass; -import org.testng.annotations.BeforeMethod; -import org.testng.annotations.Test; - - - -/** - * Feed instance status tests. - */ -@Test(groups = "embedded") -public class FeedInstanceStatusTest extends BaseTestClass { - - private String baseTestDir = cleanAndGetTestDir(); - private String feedInputPath = baseTestDir + MINUTE_DATE_PATTERN; - private String aggregateWorkflowDir = baseTestDir + "/aggregator"; - - private ColoHelper cluster2 = servers.get(1); - private ColoHelper cluster3 = servers.get(2); - private FileSystem cluster2FS = serverFS.get(1); - private FileSystem cluster3FS = serverFS.get(2); - private static final Logger LOGGER = Logger.getLogger(FeedInstanceStatusTest.class); - - @BeforeClass(alwaysRun = true) - public void uploadWorkflow() throws Exception { - uploadDirToClusters(aggregateWorkflowDir, OSUtil.RESOURCES_OOZIE); - } - - @BeforeMethod(alwaysRun = true) - public void setup() throws Exception { - Bundle bundle = BundleUtil.readELBundle(); - for (int i = 0; i < 3; i++) { - bundles[i] = new Bundle(bundle, servers.get(i)); - bundles[i].generateUniqueBundle(this); - bundles[i].setProcessWorkflow(aggregateWorkflowDir); - } - } - - @AfterMethod(alwaysRun = true) - public void tearDown() { - removeTestClassEntities(); - } - - /** - * Goes through the whole feed replication workflow checking its instances status while. - * submitting feed, scheduling it, performing different combinations of actions like - * -submit, -resume, -kill, -rerun. - */ - @Test(groups = {"multiCluster"}) - public void feedInstanceStatusRunning() throws Exception { - bundles[0].setInputFeedDataPath(feedInputPath); - - AssertUtil.assertSucceeded(prism.getClusterHelper() - .submitEntity(bundles[0].getClusters().get(0))); - - AssertUtil.assertSucceeded(prism.getClusterHelper() - .submitEntity(bundles[1].getClusters().get(0))); - - AssertUtil.assertSucceeded(prism.getClusterHelper() - .submitEntity(bundles[2].getClusters().get(0))); - - String feed = bundles[0].getDataSets().get(0); - String feedName = Util.readEntityName(feed); - feed = FeedMerlin.fromString(feed).clearFeedClusters().toString(); - String startTime = TimeUtil.getTimeWrtSystemTime(-50); - final String startPlus20Min = TimeUtil.addMinsToTime(startTime, 20); - final String startPlus40Min = TimeUtil.addMinsToTime(startTime, 40); - final String startPlus100Min = TimeUtil.addMinsToTime(startTime, 100); - - feed = FeedMerlin.fromString(feed) - .addFeedCluster(new FeedMerlin.FeedClusterBuilder( - Util.readEntityName(bundles[1].getClusters().get(0))) - .withRetention("hours(10)", ActionType.DELETE) - .withValidity(startTime, TimeUtil.addMinsToTime(startTime, 65)) - .withClusterType(ClusterType.SOURCE) - .withPartition("US/${cluster.colo}") - .build()) - .toString(); - feed = FeedMerlin.fromString(feed).addFeedCluster( - new FeedMerlin.FeedClusterBuilder(Util.readEntityName(bundles[0].getClusters().get(0))) - .withRetention("hours(10)", ActionType.DELETE) - .withValidity(startPlus20Min, - TimeUtil.addMinsToTime(startTime, 85)) - .withClusterType(ClusterType.TARGET) - .build()) - .toString(); - feed = FeedMerlin.fromString(feed).addFeedCluster( - new FeedMerlin.FeedClusterBuilder(Util.readEntityName(bundles[2].getClusters().get(0))) - .withRetention("hours(10)", ActionType.DELETE) - .withValidity(startPlus40Min, - TimeUtil.addMinsToTime(startTime, 110)) - .withClusterType(ClusterType.SOURCE) - .withPartition("UK/${cluster.colo}") - .build()) - .toString(); - - LOGGER.info("feed: " + Util.prettyPrintXml(feed)); - - //status before submit - prism.getFeedHelper().getProcessInstanceStatus(feedName, "?start=" + startPlus100Min - + "&end=" + TimeUtil.addMinsToTime(startTime, 120)); - - AssertUtil.assertSucceeded(prism.getFeedHelper().submitEntity(feed)); - prism.getFeedHelper().getProcessInstanceStatus(feedName, - "?start=" + startTime + "&end=" + startPlus100Min); - - AssertUtil.assertSucceeded(prism.getFeedHelper().schedule(feed)); - - // both replication instances - prism.getFeedHelper().getProcessInstanceStatus(feedName, - "?start=" + startTime + "&end=" + startPlus100Min); - - // single instance at -30 - prism.getFeedHelper().getProcessInstanceStatus(feedName, "?start=" + startPlus20Min); - - //single at -10 - prism.getFeedHelper().getProcessInstanceStatus(feedName, "?start=" + startPlus40Min); - - //single at 10 - prism.getFeedHelper().getProcessInstanceStatus(feedName, "?start=" + startPlus40Min); - - //single at 30 - prism.getFeedHelper().getProcessInstanceStatus(feedName, "?start=" + startPlus40Min); - - String postFix = "/US/" + cluster2.getClusterHelper().getColoName(); - String prefix = bundles[0].getFeedDataPathPrefix(); - HadoopUtil.deleteDirIfExists(prefix.substring(1), cluster2FS); - HadoopUtil.lateDataReplenish(cluster2FS, 80, 20, prefix, postFix); - - postFix = "/UK/" + cluster3.getClusterHelper().getColoName(); - prefix = bundles[0].getFeedDataPathPrefix(); - HadoopUtil.deleteDirIfExists(prefix.substring(1), cluster3FS); - HadoopUtil.lateDataReplenish(cluster3FS, 80, 20, prefix, postFix); - - // both replication instances - prism.getFeedHelper().getProcessInstanceStatus(feedName, - "?start=" + startTime + "&end=" + startPlus100Min); - - // single instance at -30 - prism.getFeedHelper().getProcessInstanceStatus(feedName, "?start=" + startPlus20Min); - - //single at -10 - prism.getFeedHelper().getProcessInstanceStatus(feedName, "?start=" + startPlus40Min); - - //single at 10 - prism.getFeedHelper().getProcessInstanceStatus(feedName, "?start=" + startPlus40Min); - - //single at 30 - prism.getFeedHelper().getProcessInstanceStatus(feedName, "?start=" + startPlus40Min); - - LOGGER.info("Wait till feed goes into running "); - - //suspend instances -10 - prism.getFeedHelper().getProcessInstanceSuspend(feedName, "?start=" + startPlus40Min); - prism.getFeedHelper().getProcessInstanceStatus(feedName, - "?start=" + startPlus20Min + "&end=" + startPlus40Min); - - //resuspend -10 and suspend -30 source specific - prism.getFeedHelper().getProcessInstanceSuspend(feedName, - "?start=" + startPlus20Min + "&end=" + startPlus40Min); - prism.getFeedHelper().getProcessInstanceStatus(feedName, - "?start=" + startPlus20Min + "&end=" + startPlus40Min); - - //resume -10 and -30 - prism.getFeedHelper().getProcessInstanceResume(feedName, - "?start=" + startPlus20Min + "&end=" + startPlus40Min); - prism.getFeedHelper().getProcessInstanceStatus(feedName, - "?start=" + startPlus20Min + "&end=" + startPlus40Min); - - //get running instances - prism.getFeedHelper().getRunningInstance(feedName); - - //rerun succeeded instance - prism.getFeedHelper().getProcessInstanceRerun(feedName, "?start=" + startTime); - prism.getFeedHelper().getProcessInstanceStatus(feedName, - "?start=" + startTime + "&end=" + startPlus20Min); - - //kill instance - prism.getFeedHelper().getProcessInstanceKill(feedName, - "?start=" + TimeUtil.addMinsToTime(startTime, 44)); - prism.getFeedHelper().getProcessInstanceKill(feedName, "?start=" + startTime); - - //end time should be less than end of validity i.e startTime + 110 - prism.getFeedHelper().getProcessInstanceStatus(feedName, - "?start=" + startTime + "&end=" + TimeUtil.addMinsToTime(startTime, 110)); - - //rerun killed instance - prism.getFeedHelper().getProcessInstanceRerun(feedName, "?start=" + startTime); - prism.getFeedHelper().getProcessInstanceStatus(feedName, - "?start=" + startTime + "&end=" + TimeUtil.addMinsToTime(startTime, 110)); - - //kill feed - prism.getFeedHelper().delete(feed); - InstancesResult responseInstance = prism.getFeedHelper().getProcessInstanceStatus(feedName, - "?start=" + startTime + "&end=" + TimeUtil.addMinsToTime(startTime, 110)); - - LOGGER.info(responseInstance.getMessage()); - } -}
http://git-wip-us.apache.org/repos/asf/falcon/blob/8e49379d/falcon-regression/merlin/src/test/java/org/apache/falcon/regression/FeedLateRerunTest.java ---------------------------------------------------------------------- diff --git a/falcon-regression/merlin/src/test/java/org/apache/falcon/regression/FeedLateRerunTest.java b/falcon-regression/merlin/src/test/java/org/apache/falcon/regression/FeedLateRerunTest.java deleted file mode 100644 index 5bb5e6e..0000000 --- a/falcon-regression/merlin/src/test/java/org/apache/falcon/regression/FeedLateRerunTest.java +++ /dev/null @@ -1,230 +0,0 @@ -/** - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ - -package org.apache.falcon.regression; - -import org.apache.falcon.entity.v0.EntityType; -import org.apache.falcon.entity.v0.feed.ActionType; -import org.apache.falcon.entity.v0.feed.ClusterType; -import org.apache.falcon.regression.Entities.FeedMerlin; -import org.apache.falcon.regression.core.bundle.Bundle; -import org.apache.falcon.regression.core.helpers.ColoHelper; -import org.apache.falcon.regression.core.util.OozieUtil; -import org.apache.falcon.regression.core.util.AssertUtil; -import org.apache.falcon.regression.core.util.TimeUtil; -import org.apache.falcon.regression.core.util.HadoopUtil; -import org.apache.falcon.regression.core.util.BundleUtil; -import org.apache.falcon.regression.core.util.InstanceUtil; -import org.apache.falcon.regression.core.util.OSUtil; -import org.apache.falcon.regression.core.util.Util; -import org.apache.falcon.regression.testHelper.BaseTestClass; -import org.apache.hadoop.fs.FileSystem; -import org.apache.hadoop.fs.Path; -import org.apache.hadoop.security.authentication.client.AuthenticationException; -import org.apache.log4j.Logger; -import org.apache.oozie.client.CoordinatorAction; -import org.apache.oozie.client.OozieClient; -import org.apache.oozie.client.OozieClientException; -import org.testng.Assert; -import org.testng.annotations.AfterMethod; -import org.testng.annotations.BeforeMethod; -import org.testng.annotations.DataProvider; -import org.testng.annotations.Test; - -import javax.xml.bind.JAXBException; -import java.io.IOException; -import java.net.URISyntaxException; -import java.util.List; - -/** - * This test submits and schedules feed and then check for replication. - * On adding further late data it checks whether the data has been replicated correctly in the given late cut-off time. - * Assuming that late frequency set in server is 3 minutes. Although value can be changed according to requirement. - */ -@Test(groups = "embedded") -public class FeedLateRerunTest extends BaseTestClass { - - private ColoHelper cluster1 = servers.get(0); - private ColoHelper cluster2 = servers.get(1); - private FileSystem cluster1FS = serverFS.get(0); - private FileSystem cluster2FS = serverFS.get(1); - private OozieClient cluster2OC = serverOC.get(1); - private String baseTestDir = cleanAndGetTestDir(); - private String feedDataLocation = baseTestDir + "/source" + MINUTE_DATE_PATTERN; - private String targetPath = baseTestDir + "/target"; - private String targetDataLocation = targetPath + MINUTE_DATE_PATTERN; - private static final Logger LOGGER = Logger.getLogger(FeedLateRerunTest.class); - private String source = null; - private String target = null; - - @BeforeMethod(alwaysRun = true) - public void setUp() throws JAXBException, IOException { - Bundle bundle = BundleUtil.readFeedReplicationBundle(); - bundles[0] = new Bundle(bundle, cluster1); - bundles[1] = new Bundle(bundle, cluster2); - bundles[0].generateUniqueBundle(this); - bundles[1].generateUniqueBundle(this); - } - - @AfterMethod(alwaysRun = true) - public void tearDown() { - removeTestClassEntities(); - } - - @Test(dataProvider = "dataFlagProvider") - public void testLateRerun(boolean dataFlag) - throws URISyntaxException, AuthenticationException, InterruptedException, IOException, - OozieClientException, JAXBException { - Bundle.submitCluster(bundles[0], bundles[1]); - String startTime = TimeUtil.getTimeWrtSystemTime(0); - String endTime = TimeUtil.addMinsToTime(startTime, 30); - LOGGER.info("Time range between : " + startTime + " and " + endTime); - - //configure feed - FeedMerlin feed = new FeedMerlin(bundles[0].getDataSets().get(0)); - feed.setFilePath(feedDataLocation); - //erase all clusters from feed definition - feed.clearFeedClusters(); - //set cluster1 as source - feed.addFeedCluster( - new FeedMerlin.FeedClusterBuilder(Util.readEntityName(bundles[0].getClusters().get(0))) - .withRetention("days(1000000)", ActionType.DELETE) - .withValidity(startTime, endTime) - .withClusterType(ClusterType.SOURCE) - .build()); - //set cluster2 as target - feed.addFeedCluster( - new FeedMerlin.FeedClusterBuilder(Util.readEntityName(bundles[1].getClusters().get(0))) - .withRetention("days(1000000)", ActionType.DELETE) - .withValidity(startTime, endTime) - .withClusterType(ClusterType.TARGET) - .withDataLocation(targetDataLocation) - .build()); - String entityName = feed.getName(); - - //submit and schedule feed - AssertUtil.assertSucceeded(prism.getFeedHelper().submitAndSchedule(feed.toString())); - - //check if coordinator exists - InstanceUtil.waitTillInstancesAreCreated(cluster2OC, feed.toString(), 0); - Assert.assertEquals(OozieUtil.checkIfFeedCoordExist(cluster2OC, entityName, "REPLICATION"), 1); - - //Finding bundleId of replicated instance on target - String bundleId = OozieUtil.getLatestBundleID(cluster2OC, entityName, EntityType.FEED); - - //Finding and creating missing dependencies - List<String> missingDependencies = getAndCreateDependencies( - cluster1FS, cluster1.getPrefix(), cluster2OC, bundleId, dataFlag, entityName); - int count = 1; - for (String location : missingDependencies) { - if (count==1) { - source = location; - count++; - } - } - source=splitPathFromIp(source, "8020"); - LOGGER.info("source : " + source); - target = source.replace("source", "target"); - LOGGER.info("target : " + target); - /* Sleep for some time ( as is defined in runtime property of server ). - Let the instance rerun and then it should succeed.*/ - int sleepMins = 8; - for(int i=0; i < sleepMins; i++) { - LOGGER.info("Waiting..."); - TimeUtil.sleepSeconds(60); - } - String bundleID = OozieUtil.getLatestBundleID(cluster2OC, entityName, EntityType.FEED); - OozieUtil.validateRetryAttempts(cluster2OC, bundleID, EntityType.FEED, 1); - - //check if data has been replicated correctly - List<Path> cluster1ReplicatedData = HadoopUtil - .getAllFilesRecursivelyHDFS(cluster1FS, new Path(HadoopUtil.cutProtocol(source))); - List<Path> cluster2ReplicatedData = HadoopUtil - .getAllFilesRecursivelyHDFS(cluster2FS, new Path(HadoopUtil.cutProtocol(target))); - AssertUtil.checkForListSizes(cluster1ReplicatedData, cluster2ReplicatedData); - } - - private String splitPathFromIp(String src, String port) { - String reqSrc, tempSrc = ""; - if (src.contains(":")) { - String[] tempPath = src.split(":"); - for (String aTempPath : tempPath) { - if (aTempPath.startsWith(port)) { - tempSrc = aTempPath; - } - } - } - if (tempSrc.isEmpty()) { - reqSrc = src; - } else { - reqSrc=tempSrc.replace(port, ""); - } - return reqSrc; - } - - /* prismHelper1 - source colo, prismHelper2 - target colo */ - private List<String> getAndCreateDependencies(FileSystem sourceFS, String prefix, OozieClient targetOC, - String bundleId, boolean dataFlag, String entityName) throws OozieClientException, IOException { - List<String> missingDependencies = OozieUtil.getMissingDependencies(targetOC, bundleId); - for (int i = 0; i < 10 && missingDependencies == null; ++i) { - TimeUtil.sleepSeconds(30); - LOGGER.info("sleeping..."); - missingDependencies = OozieUtil.getMissingDependencies(targetOC, bundleId); - } - Assert.assertNotNull(missingDependencies, "Missing dependencies not found."); - //print missing dependencies - for (String dependency : missingDependencies) { - LOGGER.info("dependency from job: " + dependency); - } - // Creating missing dependencies - HadoopUtil.createFolders(sourceFS, prefix, missingDependencies); - //Adding data to empty folders depending on dataFlag - if (dataFlag) { - int tempCount = 1; - for (String location : missingDependencies) { - if (tempCount==1) { - LOGGER.info("Transferring data to : " + location); - HadoopUtil.copyDataToFolder(sourceFS, location, OSUtil.concat(OSUtil.NORMAL_INPUT, "dataFile.xml")); - tempCount++; - } - } - } - //replication should start, wait while it ends - InstanceUtil.waitTillInstanceReachState(targetOC, entityName, 1, - CoordinatorAction.Status.SUCCEEDED, EntityType.FEED); - // Adding data for late rerun - int tempCounter = 1; - for (String dependency : missingDependencies) { - if (tempCounter==1) { - LOGGER.info("Transferring late data to : " + dependency); - HadoopUtil.copyDataToFolder(sourceFS, dependency, - OSUtil.concat(OSUtil.NORMAL_INPUT, "dataFile.properties")); - } - tempCounter++; - } - return missingDependencies; - } - - @DataProvider(name = "dataFlagProvider") - private Object[][] dataFlagProvider() { - return new Object[][] { - new Object[] {true, }, - new Object[] {false, }, - }; - } -} http://git-wip-us.apache.org/repos/asf/falcon/blob/8e49379d/falcon-regression/merlin/src/test/java/org/apache/falcon/regression/FeedReplicationTest.java ---------------------------------------------------------------------- diff --git a/falcon-regression/merlin/src/test/java/org/apache/falcon/regression/FeedReplicationTest.java b/falcon-regression/merlin/src/test/java/org/apache/falcon/regression/FeedReplicationTest.java deleted file mode 100644 index a936aa1..0000000 --- a/falcon-regression/merlin/src/test/java/org/apache/falcon/regression/FeedReplicationTest.java +++ /dev/null @@ -1,581 +0,0 @@ -/** - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ - -package org.apache.falcon.regression; - -import org.apache.falcon.entity.v0.EntityType; -import org.apache.falcon.entity.v0.feed.ActionType; -import org.apache.falcon.entity.v0.feed.ClusterType; -import org.apache.falcon.regression.Entities.FeedMerlin; -import org.apache.falcon.regression.core.bundle.Bundle; -import org.apache.falcon.regression.core.helpers.ColoHelper; -import org.apache.falcon.regression.core.supportClasses.ExecResult; -import org.apache.falcon.regression.core.util.AssertUtil; -import org.apache.falcon.regression.core.util.BundleUtil; -import org.apache.falcon.regression.core.util.HadoopUtil; -import org.apache.falcon.regression.core.util.InstanceUtil; -import org.apache.falcon.regression.core.util.OSUtil; -import org.apache.falcon.regression.core.util.OozieUtil; -import org.apache.falcon.regression.core.util.TimeUtil; -import org.apache.falcon.regression.core.util.Util; -import org.apache.falcon.regression.testHelper.BaseTestClass; -import org.apache.falcon.resource.InstancesResult; -import org.apache.hadoop.fs.FileSystem; -import org.apache.hadoop.fs.Path; -import org.apache.log4j.Logger; -import org.apache.oozie.client.CoordinatorAction; -import org.apache.oozie.client.OozieClient; -import org.joda.time.DateTime; -import org.joda.time.DateTimeZone; -import org.joda.time.format.DateTimeFormat; -import org.joda.time.format.DateTimeFormatter; -import org.testng.Assert; -import org.testng.annotations.AfterMethod; -import org.testng.annotations.BeforeMethod; -import org.testng.annotations.DataProvider; -import org.testng.annotations.Test; - -import javax.xml.bind.JAXBException; -import java.io.IOException; -import java.util.HashMap; -import java.util.List; -import java.util.Map; - -/** - * feed replication test. - * Replicates empty directories as well as directories containing data. - */ -@Test(groups = "embedded") -public class FeedReplicationTest extends BaseTestClass { - - private ColoHelper cluster1 = servers.get(0); - private ColoHelper cluster2 = servers.get(1); - private ColoHelper cluster3 = servers.get(2); - private FileSystem cluster1FS = serverFS.get(0); - private FileSystem cluster2FS = serverFS.get(1); - private FileSystem cluster3FS = serverFS.get(2); - private OozieClient cluster2OC = serverOC.get(1); - private OozieClient cluster3OC = serverOC.get(2); - private String baseTestDir = cleanAndGetTestDir(); - private String sourcePath = baseTestDir + "/source"; - private String feedDataLocation = baseTestDir + "/source" + MINUTE_DATE_PATTERN; - private String targetPath = baseTestDir + "/target"; - private String targetDataLocation = targetPath + MINUTE_DATE_PATTERN; - private static final Logger LOGGER = Logger.getLogger(FeedReplicationTest.class); - - @BeforeMethod(alwaysRun = true) - public void setUp() throws JAXBException, IOException { - Bundle bundle = BundleUtil.readFeedReplicationBundle(); - - bundles[0] = new Bundle(bundle, cluster1); - bundles[1] = new Bundle(bundle, cluster2); - bundles[2] = new Bundle(bundle, cluster3); - - bundles[0].generateUniqueBundle(this); - bundles[1].generateUniqueBundle(this); - bundles[2].generateUniqueBundle(this); - } - - @AfterMethod(alwaysRun = true) - public void tearDown() throws IOException { - removeTestClassEntities(); - cleanTestsDirs(); - } - - /** - * Test demonstrates replication of stored data from one source cluster to one target cluster. - * It checks the lifecycle of replication workflow instance including its creation. When - * replication ends test checks if data was replicated correctly. - * Also checks for presence of _SUCCESS file in target directory. - */ - @Test(dataProvider = "dataFlagProvider") - public void replicate1Source1Target(boolean dataFlag) - throws Exception { - Bundle.submitCluster(bundles[0], bundles[1]); - String startTime = TimeUtil.getTimeWrtSystemTime(0); - String endTime = TimeUtil.addMinsToTime(startTime, 5); - LOGGER.info("Time range between : " + startTime + " and " + endTime); - - //configure feed - FeedMerlin feed = new FeedMerlin(bundles[0].getDataSets().get(0)); - feed.setFilePath(feedDataLocation); - //erase all clusters from feed definition - feed.clearFeedClusters(); - //set cluster1 as source - feed.addFeedCluster( - new FeedMerlin.FeedClusterBuilder(Util.readEntityName(bundles[0].getClusters().get(0))) - .withRetention("days(1000000)", ActionType.DELETE) - .withValidity(startTime, endTime) - .withClusterType(ClusterType.SOURCE) - .build()); - //set cluster2 as target - feed.addFeedCluster( - new FeedMerlin.FeedClusterBuilder(Util.readEntityName(bundles[1].getClusters().get(0))) - .withRetention("days(1000000)", ActionType.DELETE) - .withValidity(startTime, endTime) - .withClusterType(ClusterType.TARGET) - .withDataLocation(targetDataLocation) - .build()); - feed.withProperty("job.counter", "true"); - - //submit and schedule feed - LOGGER.info("Feed : " + Util.prettyPrintXml(feed.toString())); - AssertUtil.assertSucceeded(prism.getFeedHelper().submitAndSchedule(feed.toString())); - - //upload necessary data - DateTime date = new DateTime(startTime, DateTimeZone.UTC); - DateTimeFormatter fmt = DateTimeFormat.forPattern("yyyy'/'MM'/'dd'/'HH'/'mm'"); - String timePattern = fmt.print(date); - String sourceLocation = sourcePath + "/" + timePattern + "/"; - String targetLocation = targetPath + "/" + timePattern + "/"; - HadoopUtil.recreateDir(cluster1FS, sourceLocation); - - Path toSource = new Path(sourceLocation); - Path toTarget = new Path(targetLocation); - if (dataFlag) { - HadoopUtil.copyDataToFolder(cluster1FS, sourceLocation, - OSUtil.concat(OSUtil.NORMAL_INPUT, "dataFile.xml")); - HadoopUtil.copyDataToFolder(cluster1FS, sourceLocation, - OSUtil.concat(OSUtil.NORMAL_INPUT, "dataFile1.txt")); - } - - //check if coordinator exists - InstanceUtil.waitTillInstancesAreCreated(cluster2OC, feed.toString(), 0); - Assert.assertEquals(OozieUtil.checkIfFeedCoordExist(cluster2OC, feed.getName(), "REPLICATION"), 1); - - //replication should start, wait while it ends - InstanceUtil.waitTillInstanceReachState(cluster2OC, Util.readEntityName(feed.toString()), 1, - CoordinatorAction.Status.SUCCEEDED, EntityType.FEED); - - //check if data has been replicated correctly - List<Path> cluster1ReplicatedData = HadoopUtil - .getAllFilesRecursivelyHDFS(cluster1FS, toSource); - List<Path> cluster2ReplicatedData = HadoopUtil - .getAllFilesRecursivelyHDFS(cluster2FS, toTarget); - - AssertUtil.checkForListSizes(cluster1ReplicatedData, cluster2ReplicatedData); - - //_SUCCESS does not exist in source - Assert.assertEquals(HadoopUtil.getSuccessFolder(cluster1FS, toSource, ""), false); - - //_SUCCESS should exist in target - Assert.assertEquals(HadoopUtil.getSuccessFolder(cluster2FS, toTarget, ""), true); - - AssertUtil.assertLogMoverPath(true, Util.readEntityName(feed.toString()), - cluster2FS, "feed", "Success logs are not present"); - - ExecResult execResult = cluster1.getFeedHelper().getCLIMetrics(feed.getName()); - AssertUtil.assertCLIMetrics(execResult, feed.getName(), 1, dataFlag); - } - - /** - * Test demonstrates replication of stored data from one source cluster to two target clusters. - * It checks the lifecycle of replication workflow instances including their creation on both - * targets. When replication ends test checks if data was replicated correctly. - * Also checks for presence of _SUCCESS file in target directory. - */ - @Test(dataProvider = "dataFlagProvider") - public void replicate1Source2Targets(boolean dataFlag) throws Exception { - Bundle.submitCluster(bundles[0], bundles[1], bundles[2]); - String startTime = TimeUtil.getTimeWrtSystemTime(0); - String endTime = TimeUtil.addMinsToTime(startTime, 5); - LOGGER.info("Time range between : " + startTime + " and " + endTime); - - //configure feed - FeedMerlin feed = new FeedMerlin(bundles[0].getDataSets().get(0)); - feed.setFilePath(feedDataLocation); - //erase all clusters from feed definition - feed.clearFeedClusters(); - //set cluster1 as source - feed.addFeedCluster( - new FeedMerlin.FeedClusterBuilder(Util.readEntityName(bundles[0].getClusters().get(0))) - .withRetention("days(1000000)", ActionType.DELETE) - .withValidity(startTime, endTime) - .withClusterType(ClusterType.SOURCE) - .build()); - //set cluster2 as target - feed.addFeedCluster( - new FeedMerlin.FeedClusterBuilder(Util.readEntityName(bundles[1].getClusters().get(0))) - .withRetention("days(1000000)", ActionType.DELETE) - .withValidity(startTime, endTime) - .withClusterType(ClusterType.TARGET) - .withDataLocation(targetDataLocation) - .build()); - //set cluster3 as target - feed.addFeedCluster( - new FeedMerlin.FeedClusterBuilder(Util.readEntityName(bundles[2].getClusters().get(0))) - .withRetention("days(1000000)", ActionType.DELETE) - .withValidity(startTime, endTime) - .withClusterType(ClusterType.TARGET) - .withDataLocation(targetDataLocation) - .build()); - feed.withProperty("job.counter", "true"); - - //submit and schedule feed - LOGGER.info("Feed : " + Util.prettyPrintXml(feed.toString())); - AssertUtil.assertSucceeded(prism.getFeedHelper().submitAndSchedule(feed.toString())); - - //upload necessary data - DateTime date = new DateTime(startTime, DateTimeZone.UTC); - DateTimeFormatter fmt = DateTimeFormat.forPattern("yyyy'/'MM'/'dd'/'HH'/'mm'"); - String timePattern = fmt.print(date); - String sourceLocation = sourcePath + "/" + timePattern + "/"; - String targetLocation = targetPath + "/" + timePattern + "/"; - HadoopUtil.recreateDir(cluster1FS, sourceLocation); - - Path toSource = new Path(sourceLocation); - Path toTarget = new Path(targetLocation); - - if (dataFlag) { - HadoopUtil.copyDataToFolder(cluster1FS, sourceLocation, - OSUtil.concat(OSUtil.NORMAL_INPUT, "dataFile.xml")); - HadoopUtil.copyDataToFolder(cluster1FS, sourceLocation, - OSUtil.concat(OSUtil.NORMAL_INPUT, "dataFile1.txt")); - } - - //check if all coordinators exist - InstanceUtil.waitTillInstancesAreCreated(cluster2OC, feed.toString(), 0); - InstanceUtil.waitTillInstancesAreCreated(cluster3OC, feed.toString(), 0); - - Assert.assertEquals(OozieUtil.checkIfFeedCoordExist(cluster2OC, feed.getName(), "REPLICATION"), 1); - Assert.assertEquals(OozieUtil.checkIfFeedCoordExist(cluster3OC, feed.getName(), "REPLICATION"), 1); - //replication on cluster 2 should start, wait till it ends - InstanceUtil.waitTillInstanceReachState(cluster2OC, feed.getName(), 1, - CoordinatorAction.Status.SUCCEEDED, EntityType.FEED); - - //replication on cluster 3 should start, wait till it ends - InstanceUtil.waitTillInstanceReachState(cluster3OC, feed.getName(), 1, - CoordinatorAction.Status.SUCCEEDED, EntityType.FEED); - - //check if data has been replicated correctly - List<Path> cluster1ReplicatedData = HadoopUtil - .getAllFilesRecursivelyHDFS(cluster1FS, toSource); - List<Path> cluster2ReplicatedData = HadoopUtil - .getAllFilesRecursivelyHDFS(cluster2FS, toTarget); - List<Path> cluster3ReplicatedData = HadoopUtil - .getAllFilesRecursivelyHDFS(cluster3FS, toTarget); - - AssertUtil.checkForListSizes(cluster1ReplicatedData, cluster2ReplicatedData); - AssertUtil.checkForListSizes(cluster1ReplicatedData, cluster3ReplicatedData); - - //_SUCCESS does not exist in source - Assert.assertEquals(HadoopUtil.getSuccessFolder(cluster1FS, toSource, ""), false); - - //_SUCCESS should exist in target - Assert.assertEquals(HadoopUtil.getSuccessFolder(cluster2FS, toTarget, ""), true); - Assert.assertEquals(HadoopUtil.getSuccessFolder(cluster3FS, toTarget, ""), true); - - AssertUtil.assertLogMoverPath(true, Util.readEntityName(feed.toString()), - cluster2FS, "feed", "Success logs are not present"); - - ExecResult execResult = cluster1.getFeedHelper().getCLIMetrics(feed.getName()); - AssertUtil.assertCLIMetrics(execResult, feed.getName(), 1, dataFlag); - } - - /** - * Test demonstrates how replication depends on availability flag. Scenario includes one - * source and one target cluster. When feed is submitted and scheduled and data is available, - * feed still waits for availability flag (file which name is defined as availability flag in - * feed definition). As soon as mentioned file is got uploaded in data directory, - * replication starts and when it ends test checks if data was replicated correctly. - * Also checks for presence of availability flag in target directory. - */ - @Test(dataProvider = "dataFlagProvider") - public void availabilityFlagTest(boolean dataFlag) throws Exception { - //replicate1Source1Target scenario + set availability flag but don't upload required file - Bundle.submitCluster(bundles[0], bundles[1]); - String startTime = TimeUtil.getTimeWrtSystemTime(0); - String endTime = TimeUtil.addMinsToTime(startTime, 5); - LOGGER.info("Time range between : " + startTime + " and " + endTime); - - //configure feed - String availabilityFlagName = "availabilityFlag.txt"; - String feedName = Util.readEntityName(bundles[0].getDataSets().get(0)); - FeedMerlin feedElement = bundles[0].getFeedElement(feedName); - feedElement.setAvailabilityFlag(availabilityFlagName); - bundles[0].writeFeedElement(feedElement, feedName); - FeedMerlin feed = new FeedMerlin(bundles[0].getDataSets().get(0)); - feed.setFilePath(feedDataLocation); - //erase all clusters from feed definition - feed.clearFeedClusters(); - //set cluster1 as source - feed.addFeedCluster( - new FeedMerlin.FeedClusterBuilder(Util.readEntityName(bundles[0].getClusters().get(0))) - .withRetention("days(1000000)", ActionType.DELETE) - .withValidity(startTime, endTime) - .withClusterType(ClusterType.SOURCE) - .build()); - //set cluster2 as target - feed.addFeedCluster( - new FeedMerlin.FeedClusterBuilder(Util.readEntityName(bundles[1].getClusters().get(0))) - .withRetention("days(1000000)", ActionType.DELETE) - .withValidity(startTime, endTime) - .withClusterType(ClusterType.TARGET) - .withDataLocation(targetDataLocation) - .build()); - feed.withProperty("job.counter", "true"); - - //submit and schedule feed - LOGGER.info("Feed : " + Util.prettyPrintXml(feed.toString())); - AssertUtil.assertSucceeded(prism.getFeedHelper().submitAndSchedule(feed.toString())); - - //upload necessary data - DateTime date = new DateTime(startTime, DateTimeZone.UTC); - DateTimeFormatter fmt = DateTimeFormat.forPattern("yyyy'/'MM'/'dd'/'HH'/'mm'"); - String timePattern = fmt.print(date); - String sourceLocation = sourcePath + "/" + timePattern + "/"; - String targetLocation = targetPath + "/" + timePattern + "/"; - HadoopUtil.recreateDir(cluster1FS, sourceLocation); - - Path toSource = new Path(sourceLocation); - Path toTarget = new Path(targetLocation); - if (dataFlag) { - HadoopUtil.copyDataToFolder(cluster1FS, sourceLocation, - OSUtil.concat(OSUtil.NORMAL_INPUT, "dataFile.xml")); - HadoopUtil.copyDataToFolder(cluster1FS, sourceLocation, - OSUtil.concat(OSUtil.NORMAL_INPUT, "dataFile1.txt")); - } - - //check while instance is got created - InstanceUtil.waitTillInstancesAreCreated(cluster2OC, feed.toString(), 0); - - //check if coordinator exists - Assert.assertEquals(OozieUtil.checkIfFeedCoordExist(cluster2OC, feedName, "REPLICATION"), 1); - - //replication should not start even after time - TimeUtil.sleepSeconds(60); - InstancesResult r = prism.getFeedHelper().getProcessInstanceStatus(feedName, - "?start=" + startTime + "&end=" + endTime); - InstanceUtil.validateResponse(r, 1, 0, 0, 1, 0); - LOGGER.info("Replication didn't start."); - - //create availability flag on source - HadoopUtil.copyDataToFolder(cluster1FS, sourceLocation, OSUtil.concat(OSUtil.RESOURCES, availabilityFlagName)); - - //check if instance become running - InstanceUtil.waitTillInstanceReachState(cluster2OC, feed.getName(), 1, - CoordinatorAction.Status.RUNNING, EntityType.FEED); - - //wait till instance succeed - InstanceUtil.waitTillInstanceReachState(cluster2OC, feed.getName(), 1, - CoordinatorAction.Status.SUCCEEDED, EntityType.FEED); - - //check if data was replicated correctly - List<Path> cluster1ReplicatedData = HadoopUtil - .getAllFilesRecursivelyHDFS(cluster1FS, toSource); - LOGGER.info("Data on source cluster: " + cluster1ReplicatedData); - List<Path> cluster2ReplicatedData = HadoopUtil - .getAllFilesRecursivelyHDFS(cluster2FS, toTarget); - LOGGER.info("Data on target cluster: " + cluster2ReplicatedData); - AssertUtil.checkForListSizes(cluster1ReplicatedData, cluster2ReplicatedData); - - //availabilityFlag exists in source - Assert.assertEquals(HadoopUtil.getSuccessFolder(cluster1FS, toSource, availabilityFlagName), true); - - //availabilityFlag should exist in target - Assert.assertEquals(HadoopUtil.getSuccessFolder(cluster2FS, toTarget, availabilityFlagName), true); - - AssertUtil.assertLogMoverPath(true, Util.readEntityName(feed.toString()), - cluster2FS, "feed", "Success logs are not present"); - - ExecResult execResult = cluster1.getFeedHelper().getCLIMetrics(feed.getName()); - AssertUtil.assertCLIMetrics(execResult, feed.getName(), 1, dataFlag); - } - - /** - * Test for https://issues.apache.org/jira/browse/FALCON-668. - * Check that new DistCp options are allowed. - */ - @Test - public void testNewDistCpOptions() throws Exception { - Bundle.submitCluster(bundles[0], bundles[1]); - String startTime = TimeUtil.getTimeWrtSystemTime(0); - String endTime = TimeUtil.addMinsToTime(startTime, 5); - LOGGER.info("Time range between : " + startTime + " and " + endTime); - //configure feed - String feedName = Util.readEntityName(bundles[0].getDataSets().get(0)); - FeedMerlin feedElement = bundles[0].getFeedElement(feedName); - bundles[0].writeFeedElement(feedElement, feedName); - FeedMerlin feed = new FeedMerlin(bundles[0].getDataSets().get(0)); - feed.setFilePath(feedDataLocation); - //erase all clusters from feed definition - feed.clearFeedClusters(); - //set cluster1 as source - feed.addFeedCluster( - new FeedMerlin.FeedClusterBuilder(Util.readEntityName(bundles[0].getClusters().get(0))) - .withRetention("days(1000000)", ActionType.DELETE) - .withValidity(startTime, endTime) - .withClusterType(ClusterType.SOURCE) - .build()); - //set cluster2 as target - feed.addFeedCluster( - new FeedMerlin.FeedClusterBuilder(Util.readEntityName(bundles[1].getClusters().get(0))) - .withRetention("days(1000000)", ActionType.DELETE) - .withValidity(startTime, endTime) - .withClusterType(ClusterType.TARGET) - .withDataLocation(targetDataLocation) - .build()); - feed.withProperty("job.counter", "true"); - - //add custom properties to feed - HashMap<String, String> propMap = new HashMap<>(); - propMap.put("overwrite", "true"); - propMap.put("ignoreErrors", "false"); - propMap.put("skipChecksum", "false"); - propMap.put("removeDeletedFiles", "true"); - propMap.put("preserveBlockSize", "true"); - propMap.put("preserveReplicationNumber", "true"); - propMap.put("preservePermission", "true"); - for (Map.Entry<String, String> entry : propMap.entrySet()) { - feed.withProperty(entry.getKey(), entry.getValue()); - } - //add custom property which shouldn't be passed to workflow - HashMap<String, String> unsupportedPropMap = new HashMap<>(); - unsupportedPropMap.put("myCustomProperty", "true"); - feed.withProperty("myCustomProperty", "true"); - - //upload necessary data to source - DateTime date = new DateTime(startTime, DateTimeZone.UTC); - DateTimeFormatter fmt = DateTimeFormat.forPattern("yyyy'/'MM'/'dd'/'HH'/'mm'"); - String timePattern = fmt.print(date); - String sourceLocation = sourcePath + "/" + timePattern + "/"; - HadoopUtil.recreateDir(cluster1FS, sourceLocation); - HadoopUtil.copyDataToFolder(cluster1FS, sourceLocation, OSUtil.concat(OSUtil.NORMAL_INPUT, "dataFile.xml")); - HadoopUtil.copyDataToFolder(cluster1FS, sourceLocation, OSUtil.concat(OSUtil.NORMAL_INPUT, "dataFile1.txt")); - - //copy 2 files to target to check if they will be deleted because of removeDeletedFiles property - String targetLocation = targetPath + "/" + timePattern + "/"; - cluster2FS.copyFromLocalFile(new Path(OSUtil.concat(OSUtil.NORMAL_INPUT, "dataFile3.txt")), - new Path(targetLocation + "dataFile3.txt")); - - //submit and schedule feed - LOGGER.info("Feed : " + Util.prettyPrintXml(feed.toString())); - AssertUtil.assertSucceeded(prism.getFeedHelper().submitAndSchedule(feed.toString())); - - //check while instance is got created - InstanceUtil.waitTillInstancesAreCreated(cluster2OC, feed.toString(), 0); - - //check if coordinator exists and replication starts - Assert.assertEquals(OozieUtil.checkIfFeedCoordExist(cluster2OC, feed.getName(), "REPLICATION"), 1); - InstanceUtil.waitTillInstanceReachState(cluster2OC, feed.getName(), 1, - CoordinatorAction.Status.RUNNING, EntityType.FEED); - - //check that properties were passed to workflow definition - String bundleId = OozieUtil.getLatestBundleID(cluster2OC, feedName, EntityType.FEED); - String coordId = OozieUtil.getReplicationCoordID(bundleId, cluster2.getFeedHelper()).get(0); - CoordinatorAction coordinatorAction = cluster2OC.getCoordJobInfo(coordId).getActions().get(0); - String wfDefinition = cluster2OC.getJobDefinition(coordinatorAction.getExternalId()); - LOGGER.info(String.format("Definition of coordinator job action %s : \n %s \n", - coordinatorAction.getExternalId(), Util.prettyPrintXml(wfDefinition))); - Assert.assertTrue(OozieUtil.propsArePresentInWorkflow(wfDefinition, "replication", propMap), - "New distCp supported properties should be passed to replication args list."); - Assert.assertFalse(OozieUtil.propsArePresentInWorkflow(wfDefinition, "replication", unsupportedPropMap), - "Unsupported properties shouldn't be passed to replication args list."); - - //check that replication succeeds - InstanceUtil.waitTillInstanceReachState(cluster2OC, feed.getName(), 1, - CoordinatorAction.Status.SUCCEEDED, EntityType.FEED); - - List<Path> finalFiles = HadoopUtil.getAllFilesRecursivelyHDFS(cluster2FS, new Path(targetPath)); - Assert.assertEquals(finalFiles.size(), 2, "Only replicated files should be present on target " - + "because of 'removeDeletedFiles' distCp property."); - - ExecResult execResult = cluster1.getFeedHelper().getCLIMetrics(feed.getName()); - AssertUtil.assertCLIMetrics(execResult, feed.getName(), 1, true); - } - - /** - * Test demonstrates failure pf replication of stored data from one source cluster to one target cluster. - * When replication job fails test checks if failed logs are present in staging directory or not. - */ - @Test - public void replicate1Source1TargetFail() - throws Exception { - Bundle.submitCluster(bundles[0], bundles[1]); - String startTime = TimeUtil.getTimeWrtSystemTime(0); - String endTime = TimeUtil.addMinsToTime(startTime, 5); - LOGGER.info("Time range between : " + startTime + " and " + endTime); - - //configure feed - FeedMerlin feed = new FeedMerlin(bundles[0].getDataSets().get(0)); - feed.setFilePath(feedDataLocation); - //erase all clusters from feed definition - feed.clearFeedClusters(); - //set cluster1 as source - feed.addFeedCluster( - new FeedMerlin.FeedClusterBuilder(Util.readEntityName(bundles[0].getClusters().get(0))) - .withRetention("days(1000000)", ActionType.DELETE) - .withValidity(startTime, endTime) - .withClusterType(ClusterType.SOURCE) - .build()); - //set cluster2 as target - feed.addFeedCluster( - new FeedMerlin.FeedClusterBuilder(Util.readEntityName(bundles[1].getClusters().get(0))) - .withRetention("days(1000000)", ActionType.DELETE) - .withValidity(startTime, endTime) - .withClusterType(ClusterType.TARGET) - .withDataLocation(targetDataLocation) - .build()); - - //submit and schedule feed - LOGGER.info("Feed : " + Util.prettyPrintXml(feed.toString())); - AssertUtil.assertSucceeded(prism.getFeedHelper().submitAndSchedule(feed.toString())); - - //upload necessary data - DateTime date = new DateTime(startTime, DateTimeZone.UTC); - DateTimeFormatter fmt = DateTimeFormat.forPattern("yyyy'/'MM'/'dd'/'HH'/'mm'"); - String timePattern = fmt.print(date); - String sourceLocation = sourcePath + "/" + timePattern + "/"; - String targetLocation = targetPath + "/" + timePattern + "/"; - HadoopUtil.recreateDir(cluster1FS, sourceLocation); - - Path toSource = new Path(sourceLocation); - Path toTarget = new Path(targetLocation); - HadoopUtil.copyDataToFolder(cluster1FS, sourceLocation, OSUtil.concat(OSUtil.NORMAL_INPUT, "dataFile.xml")); - HadoopUtil.copyDataToFolder(cluster1FS, sourceLocation, OSUtil.concat(OSUtil.NORMAL_INPUT, "dataFile1.txt")); - - //check if coordinator exists - InstanceUtil.waitTillInstancesAreCreated(cluster2OC, feed.toString(), 0); - Assert.assertEquals(OozieUtil.checkIfFeedCoordExist(cluster2OC, feed.getName(), "REPLICATION"), 1); - - //check if instance become running - InstanceUtil.waitTillInstanceReachState(cluster2OC, feed.getName(), 1, - CoordinatorAction.Status.RUNNING, EntityType.FEED); - - HadoopUtil.deleteDirIfExists(sourceLocation, cluster1FS); - - //check if instance became killed - InstanceUtil.waitTillInstanceReachState(cluster2OC, feed.getName(), 1, - CoordinatorAction.Status.KILLED, EntityType.FEED); - - AssertUtil.assertLogMoverPath(false, Util.readEntityName(feed.toString()), - cluster2FS, "feed", "Success logs are not present"); - } - - /* Flag value denotes whether to add data for replication or not. - * flag=true : add data for replication. - * flag=false : let empty directories be replicated. - */ - @DataProvider(name = "dataFlagProvider") - private Object[][] dataFlagProvider() { - return new Object[][] { - new Object[] {true, }, - new Object[] {false, }, - }; - } -} http://git-wip-us.apache.org/repos/asf/falcon/blob/8e49379d/falcon-regression/merlin/src/test/java/org/apache/falcon/regression/FeedResumeTest.java ---------------------------------------------------------------------- diff --git a/falcon-regression/merlin/src/test/java/org/apache/falcon/regression/FeedResumeTest.java b/falcon-regression/merlin/src/test/java/org/apache/falcon/regression/FeedResumeTest.java deleted file mode 100644 index ec117d7..0000000 --- a/falcon-regression/merlin/src/test/java/org/apache/falcon/regression/FeedResumeTest.java +++ /dev/null @@ -1,118 +0,0 @@ -/** - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ - -package org.apache.falcon.regression; - - -import org.apache.falcon.entity.v0.EntityType; -import org.apache.falcon.regression.core.bundle.Bundle; -import org.apache.falcon.regression.core.helpers.ColoHelper; -import org.apache.falcon.regression.core.helpers.entity.AbstractEntityHelper; -import org.apache.falcon.regression.core.response.ServiceResponse; -import org.apache.falcon.regression.core.util.AssertUtil; -import org.apache.falcon.regression.core.util.BundleUtil; -import org.apache.falcon.regression.testHelper.BaseTestClass; -import org.apache.oozie.client.Job; -import org.apache.oozie.client.OozieClient; -import org.testng.Assert; -import org.testng.annotations.AfterMethod; -import org.testng.annotations.BeforeMethod; -import org.testng.annotations.Test; - - -/** - * Feed resume tests. - */ -@Test(groups = "embedded") -public class FeedResumeTest extends BaseTestClass { - - private final AbstractEntityHelper feedHelper = prism.getFeedHelper(); - private String feed; - private ColoHelper cluster = servers.get(0); - private OozieClient clusterOC = serverOC.get(0); - - @BeforeMethod(alwaysRun = true) - public void setup() throws Exception { - bundles[0] = BundleUtil.readELBundle(); - bundles[0].generateUniqueBundle(this); - bundles[0] = new Bundle(bundles[0], cluster); - bundles[0].submitClusters(prism); - feed = bundles[0].getInputFeedFromBundle(); - } - - @AfterMethod(alwaysRun = true) - public void tearDown() { - removeTestClassEntities(); - } - - /** - * Launches feed, suspends it and then resumes and checks if it got running. - * - * @throws Exception - */ - @Test(groups = {"singleCluster"}) - public void resumeSuspendedFeed() throws Exception { - AssertUtil.assertSucceeded(feedHelper.submitAndSchedule(feed)); - AssertUtil.assertSucceeded(feedHelper.suspend(feed)); - AssertUtil.checkStatus(clusterOC, EntityType.FEED, feed, Job.Status.SUSPENDED); - AssertUtil.assertSucceeded(feedHelper.resume(feed)); - ServiceResponse response = feedHelper.getStatus(feed); - String colo = feedHelper.getColo(); - Assert.assertTrue(response.getMessage().contains(colo + "/RUNNING")); - AssertUtil.checkStatus(clusterOC, EntityType.FEED, feed, Job.Status.RUNNING); - } - - - /** - * Tries to resume feed that wasn't submitted and scheduled. Attempt should fail. - * - * @throws Exception - */ - @Test(groups = {"singleCluster"}) - public void resumeNonExistentFeed() throws Exception { - AssertUtil.assertFailed(feedHelper.resume(feed)); - } - - /** - * Tries to resume deleted feed. Attempt should fail. - * - * @throws Exception - */ - @Test(groups = {"singleCluster"}) - public void resumeDeletedFeed() throws Exception { - AssertUtil.assertSucceeded(feedHelper.submitAndSchedule(feed)); - AssertUtil.assertSucceeded(feedHelper.delete(feed)); - AssertUtil.assertFailed(feedHelper.resume(feed)); - } - - /** - * Tries to resume scheduled feed which wasn't suspended. Feed status shouldn't change. - * - * @throws Exception - */ - @Test(groups = {"singleCluster"}) - public void resumeScheduledFeed() throws Exception { - AssertUtil.assertSucceeded(feedHelper.submitAndSchedule(feed)); - AssertUtil.checkStatus(clusterOC, EntityType.FEED, feed, Job.Status.RUNNING); - AssertUtil.assertSucceeded(feedHelper.resume(feed)); - ServiceResponse response = feedHelper.getStatus(feed); - String colo = feedHelper.getColo(); - Assert.assertTrue(response.getMessage().contains(colo + "/RUNNING")); - AssertUtil.checkStatus(clusterOC, EntityType.FEED, feed, Job.Status.RUNNING); - } -} http://git-wip-us.apache.org/repos/asf/falcon/blob/8e49379d/falcon-regression/merlin/src/test/java/org/apache/falcon/regression/FeedSLATest.java ---------------------------------------------------------------------- diff --git a/falcon-regression/merlin/src/test/java/org/apache/falcon/regression/FeedSLATest.java b/falcon-regression/merlin/src/test/java/org/apache/falcon/regression/FeedSLATest.java deleted file mode 100644 index 28ddbd7..0000000 --- a/falcon-regression/merlin/src/test/java/org/apache/falcon/regression/FeedSLATest.java +++ /dev/null @@ -1,215 +0,0 @@ -/** - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ - -package org.apache.falcon.regression; - -import org.apache.falcon.entity.v0.Frequency; -import org.apache.falcon.entity.v0.feed.ActionType; -import org.apache.falcon.regression.Entities.FeedMerlin; -import org.apache.falcon.regression.core.bundle.Bundle; -import org.apache.falcon.regression.core.helpers.ColoHelper; -import org.apache.falcon.regression.core.response.ServiceResponse; -import org.apache.falcon.regression.core.util.AssertUtil; -import org.apache.falcon.regression.core.util.BundleUtil; -import org.apache.falcon.regression.core.util.TimeUtil; -import org.apache.falcon.regression.core.util.Util; -import org.apache.falcon.regression.testHelper.BaseTestClass; -import org.apache.log4j.Logger; -import org.testng.Assert; -import org.testng.annotations.AfterMethod; -import org.testng.annotations.BeforeMethod; -import org.testng.annotations.Test; - - -/** - * Feed SLA tests. - */ -@Test(groups = "embedded") -public class FeedSLATest extends BaseTestClass { - - private ColoHelper cluster = servers.get(0); - private String baseTestDir = cleanAndGetTestDir(); - private String feedInputPath = baseTestDir + "/input" + MINUTE_DATE_PATTERN; - private static final Logger LOGGER = Logger.getLogger(FeedSLATest.class); - - private FeedMerlin feedMerlin; - private String startTime; - private String endTime; - - @BeforeMethod(alwaysRun = true) - public void setup() throws Exception { - Bundle bundle = BundleUtil.readELBundle(); - bundles[0] = new Bundle(bundle, cluster); - bundles[0].generateUniqueBundle(this); - bundles[0].setInputFeedDataPath(feedInputPath); - - startTime = TimeUtil.getTimeWrtSystemTime(0); - endTime = TimeUtil.addMinsToTime(startTime, 120); - LOGGER.info("Time range between : " + startTime + " and " + endTime); - ServiceResponse response = - prism.getClusterHelper().submitEntity(bundles[0].getClusters().get(0)); - AssertUtil.assertSucceeded(response); - - feedMerlin = new FeedMerlin(bundles[0].getInputFeedFromBundle()); - feedMerlin.setFrequency(new Frequency("1", Frequency.TimeUnit.hours)); - } - - @AfterMethod(alwaysRun = true) - public void tearDown() { - removeTestClassEntities(); - } - - /** - * Submit feed with correctly adjusted sla. Response should reflect success. - * - */ - - @Test - public void submitValidFeedSLA() throws Exception { - - feedMerlin.clearFeedClusters(); - feedMerlin.addFeedCluster(new FeedMerlin.FeedClusterBuilder( - Util.readEntityName(bundles[0].getClusters().get(0))) - .withRetention("days(1000000)", ActionType.DELETE) - .withValidity(startTime, endTime) - .build()); - - //set slaLow and slaHigh - feedMerlin.setSla(new Frequency("3", Frequency.TimeUnit.hours), new Frequency("6", Frequency.TimeUnit.hours)); - - final ServiceResponse serviceResponse = - prism.getFeedHelper().submitEntity(feedMerlin.toString()); - AssertUtil.assertSucceeded(serviceResponse); - } - - /** - * Submit feed with slaHigh greater than feed retention. Response should reflect failure. - * - */ - - @Test - public void submitFeedWithSLAHigherThanRetention() throws Exception { - - feedMerlin.clearFeedClusters(); - feedMerlin.addFeedCluster(new FeedMerlin.FeedClusterBuilder( - Util.readEntityName(bundles[0].getClusters().get(0))) - .withRetention((new Frequency("2", Frequency.TimeUnit.hours)).toString(), ActionType.DELETE) - .withValidity(startTime, endTime) - .build()); - - //set slaLow and slaHigh - feedMerlin.setSla(new Frequency("3", Frequency.TimeUnit.hours), new Frequency("6", Frequency.TimeUnit.hours)); - - final ServiceResponse serviceResponse = - prism.getFeedHelper().submitEntity(feedMerlin.toString()); - String message = "Feed's retention limit: " - + feedMerlin.getClusters().getClusters().get(0).getRetention().getLimit() - + " of referenced cluster " + bundles[0].getClusterNames().get(0) - + " should be more than feed's late arrival cut-off period: " - + feedMerlin.getSla().getSlaHigh().getTimeUnit() - + "(" + feedMerlin.getSla().getSlaHigh().getFrequency() + ")" - + " for feed: " + bundles[0].getInputFeedNameFromBundle(); - validate(serviceResponse, message); - } - - - /** - * Submit feed with slaHigh less than slaLow. Response should reflect failure. - * - */ - @Test - public void submitFeedWithSLAHighLowerthanSLALow() throws Exception { - - feedMerlin.clearFeedClusters(); - feedMerlin.addFeedCluster(new FeedMerlin.FeedClusterBuilder( - Util.readEntityName(bundles[0].getClusters().get(0))) - .withRetention((new Frequency("6", Frequency.TimeUnit.hours)).toString(), ActionType.DELETE) - .withValidity(startTime, endTime) - .build()); - - //set slaLow and slaHigh - feedMerlin.setSla(new Frequency("4", Frequency.TimeUnit.hours), new Frequency("2", Frequency.TimeUnit.hours)); - - final ServiceResponse serviceResponse = - prism.getFeedHelper().submitEntity(feedMerlin.toString()); - String message = "slaLow of Feed: " + feedMerlin.getSla().getSlaLow().getTimeUnit() + "(" - + feedMerlin.getSla().getSlaLow().getFrequency() + ")is greater than slaHigh: " - + feedMerlin.getSla().getSlaHigh().getTimeUnit() + "(" + feedMerlin.getSla().getSlaHigh().getFrequency() - + ") for cluster: " + bundles[0].getClusterNames().get(0); - validate(serviceResponse, message); - } - - /** - * Submit feed with slaHigh and slaLow greater than feed retention. Response should reflect failure. - * - */ - @Test - public void submitFeedWithSLAHighSLALowHigherThanRetention() throws Exception { - - feedMerlin.clearFeedClusters(); - feedMerlin.addFeedCluster(new FeedMerlin.FeedClusterBuilder( - Util.readEntityName(bundles[0].getClusters().get(0))) - .withRetention((new Frequency("4", Frequency.TimeUnit.hours)).toString(), ActionType.DELETE) - .withValidity(startTime, endTime) - .build()); - - //set slaLow and slaHigh - feedMerlin.setSla(new Frequency("5", Frequency.TimeUnit.hours), new Frequency("6", Frequency.TimeUnit.hours)); - - final ServiceResponse serviceResponse = - prism.getFeedHelper().submitEntity(feedMerlin.toString()); - String message = "Feed's retention limit: " - + feedMerlin.getClusters().getClusters().get(0).getRetention().getLimit() - + " of referenced cluster " + bundles[0].getClusterNames().get(0) - + " should be more than feed's late arrival cut-off period: " - + feedMerlin.getSla().getSlaHigh().getTimeUnit() +"(" + feedMerlin.getSla().getSlaHigh().getFrequency() - + ")" + " for feed: " + bundles[0].getInputFeedNameFromBundle(); - validate(serviceResponse, message); - } - - /** - * Submit feed with slaHigh and slaLow having equal value. Response should reflect success. - * - */ - @Test - public void submitFeedWithSameSLAHighSLALow() throws Exception { - - feedMerlin.clearFeedClusters(); - feedMerlin.addFeedCluster(new FeedMerlin.FeedClusterBuilder( - Util.readEntityName(bundles[0].getClusters().get(0))) - .withRetention((new Frequency("7", Frequency.TimeUnit.hours)).toString(), ActionType.DELETE) - .withValidity(startTime, endTime) - .build()); - - //set slaLow and slaHigh - feedMerlin.setSla(new Frequency("3", Frequency.TimeUnit.hours), new Frequency("3", Frequency.TimeUnit.hours)); - - final ServiceResponse serviceResponse = - prism.getFeedHelper().submitEntity(feedMerlin.toString()); - AssertUtil.assertSucceeded(serviceResponse); - } - - private void validate(ServiceResponse response, String message) throws Exception { - AssertUtil.assertFailed(response); - LOGGER.info("Expected message is : " + message); - Assert.assertTrue(response.getMessage().contains(message), - "Correct response was not present in feed schedule. Feed response is : " - + response.getMessage()); - } - -} http://git-wip-us.apache.org/repos/asf/falcon/blob/8e49379d/falcon-regression/merlin/src/test/java/org/apache/falcon/regression/FeedScheduleTest.java ---------------------------------------------------------------------- diff --git a/falcon-regression/merlin/src/test/java/org/apache/falcon/regression/FeedScheduleTest.java b/falcon-regression/merlin/src/test/java/org/apache/falcon/regression/FeedScheduleTest.java deleted file mode 100644 index 79b722a..0000000 --- a/falcon-regression/merlin/src/test/java/org/apache/falcon/regression/FeedScheduleTest.java +++ /dev/null @@ -1,139 +0,0 @@ -/** - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ - -package org.apache.falcon.regression; - -import org.apache.falcon.entity.v0.EntityType; -import org.apache.falcon.regression.core.bundle.Bundle; -import org.apache.falcon.regression.core.helpers.ColoHelper; -import org.apache.falcon.regression.core.response.ServiceResponse; -import org.apache.falcon.regression.core.util.AssertUtil; -import org.apache.falcon.regression.core.util.BundleUtil; -import org.apache.falcon.regression.testHelper.BaseTestClass; -import org.apache.oozie.client.Job; -import org.apache.oozie.client.OozieClient; -import org.testng.annotations.AfterMethod; -import org.testng.annotations.BeforeMethod; -import org.testng.annotations.Test; - - -/** - * Feed schedule tests. - */ -@Test(groups = "embedded") -public class FeedScheduleTest extends BaseTestClass { - - private ColoHelper cluster = servers.get(0); - private OozieClient clusterOC = serverOC.get(0); - private String feed; - - @BeforeMethod(alwaysRun = true) - public void setUp() throws Exception { - bundles[0] = BundleUtil.readELBundle(); - bundles[0] = new Bundle(bundles[0], cluster); - bundles[0].generateUniqueBundle(this); - Bundle.submitCluster(bundles[0]); - feed = bundles[0].getInputFeedFromBundle(); - } - - @AfterMethod(alwaysRun = true) - public void tearDown() { - removeTestClassEntities(); - } - - /** - * Tries to schedule already scheduled feed. Request should be considered as correct. - * Feed status shouldn't change. - * - * @throws Exception - */ - @Test(groups = {"singleCluster"}) - public void scheduleAlreadyScheduledFeed() throws Exception { - ServiceResponse response = prism.getFeedHelper().submitEntity(feed); - AssertUtil.assertSucceeded(response); - - response = prism.getFeedHelper().schedule(feed); - AssertUtil.assertSucceeded(response); - AssertUtil.checkStatus(clusterOC, EntityType.FEED, feed, Job.Status.RUNNING); - - //now try re-scheduling again - response = prism.getFeedHelper().schedule(feed); - AssertUtil.assertSucceeded(response); - AssertUtil.checkStatus(clusterOC, EntityType.FEED, feed, Job.Status.RUNNING); - } - - /** - * Schedule correct feed. Feed should got running. - * - * @throws Exception - */ - @Test(groups = {"singleCluster"}) - public void scheduleValidFeed() throws Exception { - //submit feed - ServiceResponse response = prism.getFeedHelper().submitEntity(feed); - AssertUtil.assertSucceeded(response); - - //now schedule the thing - response = prism.getFeedHelper().schedule(feed); - AssertUtil.assertSucceeded(response); - AssertUtil.checkStatus(clusterOC, EntityType.FEED, feed, Job.Status.RUNNING); - } - - /** - * Tries to schedule already scheduled and suspended feed. Suspended status shouldn't change. - * - * @throws Exception - */ - @Test(groups = {"singleCluster"}) - public void scheduleSuspendedFeed() throws Exception { - AssertUtil.assertSucceeded(prism.getFeedHelper().submitAndSchedule(feed)); - - //now suspend - AssertUtil.assertSucceeded(prism.getFeedHelper().suspend(feed)); - AssertUtil.checkStatus(clusterOC, EntityType.FEED, feed, Job.Status.SUSPENDED); - //now schedule this! - AssertUtil.assertSucceeded(prism.getFeedHelper().schedule(feed)); - AssertUtil.checkStatus(clusterOC, EntityType.FEED, feed, Job.Status.SUSPENDED); - } - - /** - * Schedules and deletes feed. Tries to schedule it. Request should fail. - * - * @throws Exception - */ - @Test(groups = {"singleCluster"}) - public void scheduleKilledFeed() throws Exception { - AssertUtil.assertSucceeded(prism.getFeedHelper().submitAndSchedule(feed)); - - //now suspend - AssertUtil.assertSucceeded(prism.getFeedHelper().delete(feed)); - AssertUtil.checkStatus(clusterOC, EntityType.FEED, feed, Job.Status.KILLED); - //now schedule this! - AssertUtil.assertFailed(prism.getFeedHelper().schedule(feed)); - } - - /** - * Tries to schedule feed which wasn't submitted. Request should fail. - * - * @throws Exception - */ - @Test(groups = {"singleCluster"}) - public void scheduleNonExistentFeed() throws Exception { - AssertUtil.assertFailed(prism.getFeedHelper().schedule(feed)); - } -} http://git-wip-us.apache.org/repos/asf/falcon/blob/8e49379d/falcon-regression/merlin/src/test/java/org/apache/falcon/regression/FeedStatusTest.java ---------------------------------------------------------------------- diff --git a/falcon-regression/merlin/src/test/java/org/apache/falcon/regression/FeedStatusTest.java b/falcon-regression/merlin/src/test/java/org/apache/falcon/regression/FeedStatusTest.java deleted file mode 100644 index d5e8696..0000000 --- a/falcon-regression/merlin/src/test/java/org/apache/falcon/regression/FeedStatusTest.java +++ /dev/null @@ -1,167 +0,0 @@ -/** - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ - -package org.apache.falcon.regression; - - -import org.apache.falcon.entity.v0.EntityType; -import org.apache.falcon.regression.core.bundle.Bundle; -import org.apache.falcon.regression.core.helpers.ColoHelper; -import org.apache.falcon.regression.core.response.ServiceResponse; -import org.apache.falcon.regression.core.util.AssertUtil; -import org.apache.falcon.regression.core.util.BundleUtil; -import org.apache.falcon.regression.core.util.Util; -import org.apache.falcon.regression.testHelper.BaseTestClass; -import org.apache.log4j.Logger; -import org.apache.oozie.client.Job; -import org.apache.oozie.client.OozieClient; -import org.testng.Assert; -import org.testng.annotations.AfterMethod; -import org.testng.annotations.BeforeMethod; -import org.testng.annotations.Test; - - -/** - * Feed status tests. Checks getStatus functionality. - */ -@Test(groups = "embedded") -public class FeedStatusTest extends BaseTestClass { - - private ColoHelper cluster = servers.get(0); - private OozieClient clusterOC = serverOC.get(0); - private String feed; - private static final Logger LOGGER = Logger.getLogger(FeedStatusTest.class); - - - @BeforeMethod(alwaysRun = true) - public void setUp() throws Exception { - bundles[0] = BundleUtil.readELBundle(); - bundles[0].generateUniqueBundle(this); - bundles[0] = new Bundle(bundles[0], cluster); - - //submit the cluster - ServiceResponse response = - prism.getClusterHelper().submitEntity(bundles[0].getClusters().get(0)); - AssertUtil.assertSucceeded(response); - feed = bundles[0].getInputFeedFromBundle(); - } - - @AfterMethod(alwaysRun = true) - public void tearDown() { - removeTestClassEntities(); - } - - /** - * Schedules feed. Queries a feed status and checks the response - * correctness and a feed status correspondence. - * - * @throws Exception - */ - @Test(groups = {"singleCluster"}) - public void getStatusForScheduledFeed() throws Exception { - ServiceResponse response = prism.getFeedHelper().submitAndSchedule(feed); - LOGGER.info("Feed: " + Util.prettyPrintXml(feed)); - AssertUtil.assertSucceeded(response); - - response = prism.getFeedHelper().getStatus(feed); - - AssertUtil.assertSucceeded(response); - - String colo = prism.getFeedHelper().getColo(); - Assert.assertTrue(response.getMessage().contains(colo + "/RUNNING")); - AssertUtil.checkStatus(clusterOC, EntityType.FEED, feed, Job.Status.RUNNING); - } - - /** - * Schedules and suspends feed. Queries a feed status and checks the response - * correctness and a feed status correspondence. - * - * @throws Exception - */ - @Test(groups = {"singleCluster"}) - public void getStatusForSuspendedFeed() throws Exception { - ServiceResponse response = prism.getFeedHelper().submitAndSchedule(feed); - - AssertUtil.assertSucceeded(response); - - response = prism.getFeedHelper().suspend(feed); - AssertUtil.assertSucceeded(response); - - response = prism.getFeedHelper().getStatus(feed); - - AssertUtil.assertSucceeded(response); - String colo = prism.getFeedHelper().getColo(); - Assert.assertTrue(response.getMessage().contains(colo + "/SUSPENDED")); - AssertUtil.checkStatus(clusterOC, EntityType.FEED, feed, Job.Status.SUSPENDED); - } - - /** - * Submits feed. Queries a feed status and checks the response - * correctness and a feed status correspondence. - * - * @throws Exception - */ - @Test(groups = {"singleCluster"}) - public void getStatusForSubmittedFeed() throws Exception { - ServiceResponse response = prism.getFeedHelper().submitEntity(feed); - - AssertUtil.assertSucceeded(response); - - response = prism.getFeedHelper().getStatus(feed); - - AssertUtil.assertSucceeded(response); - String colo = prism.getFeedHelper().getColo(); - Assert.assertTrue(response.getMessage().contains(colo + "/SUBMITTED")); - AssertUtil.checkNotStatus(clusterOC, EntityType.FEED, feed, Job.Status.RUNNING); - } - - /** - * Removes feed. Queries a feed status. Checks that the response correctness. - * - * @throws Exception - */ - @Test(groups = {"singleCluster"}) - public void getStatusForDeletedFeed() throws Exception { - ServiceResponse response = prism.getFeedHelper().submitEntity(feed); - AssertUtil.assertSucceeded(response); - - response = prism.getFeedHelper().delete(feed); - AssertUtil.assertSucceeded(response); - - response = prism.getFeedHelper().getStatus(feed); - AssertUtil.assertFailed(response); - - Assert.assertTrue( - response.getMessage().contains(Util.readEntityName(feed) + " (FEED) not found")); - AssertUtil.checkNotStatus(clusterOC, EntityType.FEED, feed, Job.Status.KILLED); - } - - /** - * Queries a status of feed which wasn't submitted and checks the response. - * - * @throws Exception - */ - @Test(groups = {"singleCluster"}) - public void getStatusForNonExistentFeed() throws Exception { - ServiceResponse response = prism.getFeedHelper().getStatus(feed); - AssertUtil.assertFailed(response); - Assert.assertTrue( - response.getMessage().contains(Util.readEntityName(feed) + " (FEED) not found")); - - } -} http://git-wip-us.apache.org/repos/asf/falcon/blob/8e49379d/falcon-regression/merlin/src/test/java/org/apache/falcon/regression/FeedSubmitAndScheduleTest.java ---------------------------------------------------------------------- diff --git a/falcon-regression/merlin/src/test/java/org/apache/falcon/regression/FeedSubmitAndScheduleTest.java b/falcon-regression/merlin/src/test/java/org/apache/falcon/regression/FeedSubmitAndScheduleTest.java deleted file mode 100644 index f7bf0f8..0000000 --- a/falcon-regression/merlin/src/test/java/org/apache/falcon/regression/FeedSubmitAndScheduleTest.java +++ /dev/null @@ -1,175 +0,0 @@ -/** - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ - -package org.apache.falcon.regression; - -import org.apache.falcon.entity.v0.EntityType; -import org.apache.falcon.regression.Entities.FeedMerlin; -import org.apache.falcon.regression.core.bundle.Bundle; -import org.apache.falcon.regression.core.enumsAndConstants.MerlinConstants; -import org.apache.falcon.regression.core.helpers.ColoHelper; -import org.apache.falcon.regression.core.response.ServiceResponse; -import org.apache.falcon.regression.core.util.AssertUtil; -import org.apache.falcon.regression.core.util.BundleUtil; -import org.apache.falcon.regression.core.util.OozieUtil; -import org.apache.falcon.regression.core.util.Util; -import org.apache.falcon.regression.testHelper.BaseTestClass; -import org.apache.hadoop.security.authentication.client.AuthenticationException; -import org.apache.oozie.client.Job; -import org.apache.oozie.client.OozieClient; -import org.testng.annotations.AfterMethod; -import org.testng.annotations.BeforeMethod; -import org.testng.annotations.Test; - -import javax.xml.bind.JAXBException; -import java.io.IOException; -import java.net.URISyntaxException; - -/** - * Feed submit and schedule tests. - */ -@Test(groups = "embedded") -public class FeedSubmitAndScheduleTest extends BaseTestClass { - - private ColoHelper cluster = servers.get(0); - private OozieClient clusterOC = serverOC.get(0); - private String feed; - - @BeforeMethod(alwaysRun = true) - public void setUp() throws Exception { - bundles[0] = BundleUtil.readELBundle(); - bundles[0] = new Bundle(bundles[0], cluster); - bundles[0].generateUniqueBundle(this); - feed = bundles[0].getDataSets().get(0); - } - - @AfterMethod(alwaysRun = true) - public void tearDown() { - //remove entities which belong to both default and different user - removeTestClassEntities(null, MerlinConstants.DIFFERENT_USER_NAME); - } - - @Test(groups = {"singleCluster"}) - public void snsNewFeed() throws Exception { - submitFirstClusterScheduleFirstFeed(); - } - - /** - * Submits and schedules feed with a cluster it depends on. - * - * @throws JAXBException - * @throws IOException - * @throws URISyntaxException - * @throws AuthenticationException - */ - private void submitFirstClusterScheduleFirstFeed() - throws JAXBException, IOException, URISyntaxException, AuthenticationException, - InterruptedException { - AssertUtil.assertSucceeded(prism.getClusterHelper() - .submitEntity(bundles[0].getClusters().get(0))); - ServiceResponse response = prism.getFeedHelper().submitAndSchedule(feed); - AssertUtil.assertSucceeded(response); - } - - /** - * Submits and schedules a feed and then tries to do the same on it. Checks that status - * hasn't changed and response is successful. - * - * @throws Exception - */ - @Test(groups = {"singleCluster"}) - public void snsExistingFeed() throws Exception { - submitFirstClusterScheduleFirstFeed(); - AssertUtil.checkStatus(clusterOC, EntityType.FEED, bundles[0], Job.Status.RUNNING); - - //get created bundle id - String bundleId = OozieUtil.getLatestBundleID(clusterOC, Util.readEntityName(feed), EntityType.FEED); - - //try to submit and schedule the same process again - ServiceResponse response = prism.getFeedHelper().submitAndSchedule(feed); - AssertUtil.assertSucceeded(response); - AssertUtil.checkStatus(clusterOC, EntityType.FEED, bundles[0], Job.Status.RUNNING); - - //check that new bundle wasn't created - OozieUtil.verifyNewBundleCreation(clusterOC, bundleId, null, feed, false, false); - } - - /** - * Try to submit and schedule feed without submitting cluster it depends on. - * Request should fail. - * - * @throws Exception - */ - @Test(groups = {"singleCluster"}) - public void snsFeedWithoutCluster() throws Exception { - ServiceResponse response = prism.getFeedHelper().submitAndSchedule(feed); - AssertUtil.assertFailed(response); - } - - /** - * Submits and schedules feed. Removes it. Submitted and schedules removed feed. - * Checks response and status of feed. - * - * @throws Exception - */ - @Test(groups = {"singleCluster"}) - public void snsDeletedFeed() throws Exception { - submitFirstClusterScheduleFirstFeed(); - AssertUtil.checkStatus(clusterOC, EntityType.FEED, bundles[0], Job.Status.RUNNING); - AssertUtil.assertSucceeded(prism.getFeedHelper().delete(feed)); - AssertUtil.checkStatus(clusterOC, EntityType.FEED, bundles[0], Job.Status.KILLED); - ServiceResponse response = prism.getFeedHelper().submitAndSchedule(feed); - AssertUtil.assertSucceeded(response); - AssertUtil.checkStatus(clusterOC, EntityType.FEED, bundles[0], Job.Status.RUNNING); - } - - /** - * Suspends feed, submit and schedules it. Checks that response is successful, - * feed status hasn't changed. - * - * @throws Exception - */ - @Test(groups = {"singleCluster"}) - public void snsSuspendedFeed() throws Exception { - submitFirstClusterScheduleFirstFeed(); - AssertUtil.checkStatus(clusterOC, EntityType.FEED, bundles[0], Job.Status.RUNNING); - AssertUtil.assertSucceeded(prism.getFeedHelper().suspend(feed)); - AssertUtil.checkStatus(clusterOC, EntityType.FEED, bundles[0], Job.Status.SUSPENDED); - ServiceResponse response = prism.getFeedHelper().submitAndSchedule(feed); - AssertUtil.assertSucceeded(response); - AssertUtil.checkStatus(clusterOC, EntityType.FEED, bundles[0], Job.Status.SUSPENDED); - } - - /** - * Test for https://issues.apache.org/jira/browse/FALCON-1647. - * Create cluster entity as user1. Submit and schedule feed entity feed1 in this cluster as user1. - * Now try to submit and schedule a feed entity feed2 in this cluster as user2. - */ - @Test - public void snsDiffFeedDiffUserSameCluster() - throws URISyntaxException, AuthenticationException, InterruptedException, IOException, JAXBException { - bundles[0].submitClusters(prism); - AssertUtil.assertSucceeded(prism.getFeedHelper().submitAndSchedule(feed)); - FeedMerlin feedMerlin = FeedMerlin.fromString(feed); - feedMerlin.setName(feedMerlin.getName() + "-2"); - feedMerlin.setACL(MerlinConstants.DIFFERENT_USER_NAME, MerlinConstants.DIFFERENT_USER_GROUP, "*"); - ServiceResponse response = prism.getFeedHelper().submitAndSchedule( - feedMerlin.toString(), MerlinConstants.DIFFERENT_USER_NAME, null); - AssertUtil.assertSucceeded(response); - } -}
