http://git-wip-us.apache.org/repos/asf/incubator-falcon/blob/387604d1/falcon-regression/merlin/src/test/java/org/apache/falcon/regression/FeedLateRerunTest.java ---------------------------------------------------------------------- diff --git a/falcon-regression/merlin/src/test/java/org/apache/falcon/regression/FeedLateRerunTest.java b/falcon-regression/merlin/src/test/java/org/apache/falcon/regression/FeedLateRerunTest.java index ced5fbf..11763c3 100644 --- a/falcon-regression/merlin/src/test/java/org/apache/falcon/regression/FeedLateRerunTest.java +++ b/falcon-regression/merlin/src/test/java/org/apache/falcon/regression/FeedLateRerunTest.java @@ -18,6 +18,7 @@ package org.apache.falcon.regression; +import org.apache.falcon.regression.Entities.FeedMerlin; import org.apache.falcon.regression.core.bundle.Bundle; import org.apache.falcon.entity.v0.EntityType; import org.apache.falcon.entity.v0.feed.ActionType; @@ -42,15 +43,10 @@ import java.net.URISyntaxException; import java.util.List; /** - * feed late data test - */ - -/* * This test submits and schedules feed and then check for replication. * On adding further late data it checks whether the data has been replicated correctly in the given late cut-off time. * Assuming that late frequency set in server is 3 minutes. Although value can be changed according to requirement. */ - public class FeedLateRerunTest extends BaseTestClass { private ColoHelper cluster1 = servers.get(0); @@ -85,9 +81,9 @@ public class FeedLateRerunTest extends BaseTestClass { } @Test(enabled = true) - public void FeedLateRerunTestWithEmptyFolders () - throws AuthenticationException, IOException, URISyntaxException, JAXBException, - OozieClientException, InterruptedException { + public void feedLateRerunTestWithEmptyFolders() + throws AuthenticationException, IOException, URISyntaxException, JAXBException, + OozieClientException, InterruptedException { Bundle.submitCluster(bundles[0], bundles[1]); String startTime = TimeUtil.getTimeWrtSystemTime(0); String endTime = TimeUtil.addMinsToTime(startTime, 30); @@ -97,22 +93,22 @@ public class FeedLateRerunTest extends BaseTestClass { String feed = bundles[0].getDataSets().get(0); feed = InstanceUtil.setFeedFilePath(feed, feedDataLocation); //erase all clusters from feed definition - feed = InstanceUtil.setFeedCluster(feed, - XmlUtil.createValidity("2012-10-01T12:00Z", "2010-01-01T00:00Z"), - XmlUtil.createRetention("days(1000000)", ActionType.DELETE), null, - ClusterType.SOURCE, null); + feed = FeedMerlin.fromString(feed).clearFeedClusters().toString(); //set cluster1 as source - feed = InstanceUtil.setFeedCluster(feed, - XmlUtil.createValidity(startTime, endTime), - XmlUtil.createRetention("days(1000000)", ActionType.DELETE), - Util.readEntityName(bundles[0].getClusters().get(0)), - ClusterType.SOURCE, null); + feed = FeedMerlin.fromString(feed).addFeedCluster( + new FeedMerlin.FeedClusterBuilder(Util.readEntityName(bundles[0].getClusters().get(0))) + .withRetention("days(1000000)", ActionType.DELETE) + .withValidity(startTime, endTime) + .withClusterType(ClusterType.SOURCE) + .build()).toString(); //set cluster2 as target - feed = InstanceUtil.setFeedCluster(feed, - XmlUtil.createValidity(startTime, endTime), - XmlUtil.createRetention("days(1000000)", ActionType.DELETE), - Util.readEntityName(bundles[1].getClusters().get(0)), - ClusterType.TARGET, null, targetDataLocation); + feed = FeedMerlin.fromString(feed).addFeedCluster( + new FeedMerlin.FeedClusterBuilder(Util.readEntityName(bundles[1].getClusters().get(0))) + .withRetention("days(1000000)", ActionType.DELETE) + .withValidity(startTime, endTime) + .withClusterType(ClusterType.TARGET) + .withDataLocation(targetDataLocation) + .build()).toString(); String entityName = Util.readEntityName(feed); @@ -124,8 +120,8 @@ public class FeedLateRerunTest extends BaseTestClass { InstanceUtil.waitTillInstancesAreCreated(cluster2, feed, 0); Assert.assertEquals(InstanceUtil - .checkIfFeedCoordExist(cluster2.getFeedHelper(), entityName, - "REPLICATION"), 1); + .checkIfFeedCoordExist(cluster2.getFeedHelper(), entityName, + "REPLICATION"), 1); //Finding bundleId of replicated instance on target @@ -133,17 +129,18 @@ public class FeedLateRerunTest extends BaseTestClass { //Finding and creating missing dependencies - List<String> missingDependencies = getAndCreateDependencies(cluster1,cluster1FS,cluster2,cluster2OC, bundleId, false, entityName); + List<String> missingDependencies = getAndCreateDependencies( + cluster1, cluster1FS, cluster2, cluster2OC, bundleId, false, entityName); int count = 1; for (String location : missingDependencies) { - if(count==1) { + if (count==1) { source = location; count++; } } - source=splitPathFromIp(source,"8020"); + source=splitPathFromIp(source, "8020"); LOGGER.info("source : " + source); target = source.replace("source", "target"); LOGGER.info("target : " + target); @@ -155,20 +152,20 @@ public class FeedLateRerunTest extends BaseTestClass { */ int sleepMins = 8; - for(int i=0; i < sleepMins ; i++) { + for(int i=0; i < sleepMins; i++) { LOGGER.info("Waiting..."); TimeUtil.sleepSeconds(60); } String bundleID = InstanceUtil.getLatestBundleID(cluster2, entityName, EntityType.FEED); - OozieUtil.validateRetryAttempts(cluster2, bundleID,EntityType.FEED, 1); + OozieUtil.validateRetryAttempts(cluster2, bundleID, EntityType.FEED, 1); //check if data has been replicated correctly List<Path> cluster1ReplicatedData = HadoopUtil - .getAllFilesRecursivelyHDFS(cluster1FS, new Path(source)); + .getAllFilesRecursivelyHDFS(cluster1FS, new Path(source)); List<Path> cluster2ReplicatedData = HadoopUtil - .getAllFilesRecursivelyHDFS(cluster2FS, new Path(target)); + .getAllFilesRecursivelyHDFS(cluster2FS, new Path(target)); AssertUtil.checkForListSizes(cluster1ReplicatedData, cluster2ReplicatedData); @@ -177,9 +174,9 @@ public class FeedLateRerunTest extends BaseTestClass { @Test(enabled = true) - public void FeedLateRerunTestWithData () - throws AuthenticationException, IOException, URISyntaxException, JAXBException, - OozieClientException, InterruptedException { + public void feedLateRerunTestWithData() + throws AuthenticationException, IOException, URISyntaxException, JAXBException, + OozieClientException, InterruptedException { Bundle.submitCluster(bundles[0], bundles[1]); String startTime = TimeUtil.getTimeWrtSystemTime(0); String endTime = TimeUtil.addMinsToTime(startTime, 30); @@ -189,22 +186,22 @@ public class FeedLateRerunTest extends BaseTestClass { String feed = bundles[0].getDataSets().get(0); feed = InstanceUtil.setFeedFilePath(feed, feedDataLocation); //erase all clusters from feed definition - feed = InstanceUtil.setFeedCluster(feed, - XmlUtil.createValidity("2012-10-01T12:00Z", "2010-01-01T00:00Z"), - XmlUtil.createRetention("days(1000000)", ActionType.DELETE), null, - ClusterType.SOURCE, null); + feed = FeedMerlin.fromString(feed).clearFeedClusters().toString(); //set cluster1 as source - feed = InstanceUtil.setFeedCluster(feed, - XmlUtil.createValidity(startTime, endTime), - XmlUtil.createRetention("days(1000000)", ActionType.DELETE), - Util.readEntityName(bundles[0].getClusters().get(0)), - ClusterType.SOURCE, null); + feed = FeedMerlin.fromString(feed).addFeedCluster( + new FeedMerlin.FeedClusterBuilder(Util.readEntityName(bundles[0].getClusters().get(0))) + .withRetention("days(1000000)", ActionType.DELETE) + .withValidity(startTime, endTime) + .withClusterType(ClusterType.SOURCE) + .build()).toString(); //set cluster2 as target - feed = InstanceUtil.setFeedCluster(feed, - XmlUtil.createValidity(startTime, endTime), - XmlUtil.createRetention("days(1000000)", ActionType.DELETE), - Util.readEntityName(bundles[1].getClusters().get(0)), - ClusterType.TARGET, null, targetDataLocation); + feed = FeedMerlin.fromString(feed).addFeedCluster( + new FeedMerlin.FeedClusterBuilder(Util.readEntityName(bundles[1].getClusters().get(0))) + .withRetention("days(1000000)", ActionType.DELETE) + .withValidity(startTime, endTime) + .withClusterType(ClusterType.TARGET) + .withDataLocation(targetDataLocation) + .build()).toString(); String entityName = Util.readEntityName(feed); @@ -216,26 +213,27 @@ public class FeedLateRerunTest extends BaseTestClass { InstanceUtil.waitTillInstancesAreCreated(cluster2, feed, 0); Assert.assertEquals(InstanceUtil - .checkIfFeedCoordExist(cluster2.getFeedHelper(), entityName, - "REPLICATION"), 1); + .checkIfFeedCoordExist(cluster2.getFeedHelper(), entityName, + "REPLICATION"), 1); //Finding bundleId of replicated instance on target String bundleId = InstanceUtil.getLatestBundleID(cluster2, entityName, EntityType.FEED); //Finding and creating missing dependencies - List<String> missingDependencies = getAndCreateDependencies(cluster1,cluster1FS,cluster2,cluster2OC, bundleId, true, entityName); + List<String> missingDependencies = getAndCreateDependencies( + cluster1, cluster1FS, cluster2, cluster2OC, bundleId, true, entityName); int count = 1; for (String location : missingDependencies) { - if(count==1) { + if (count==1) { source = location; count++; } } LOGGER.info("source : " + source); - source=splitPathFromIp(source,"8020"); + source=splitPathFromIp(source, "8020"); LOGGER.info("source : " + source); target = source.replace("source", "target"); LOGGER.info("target : " + target); @@ -247,29 +245,29 @@ public class FeedLateRerunTest extends BaseTestClass { */ int sleepMins = 8; - for(int i=0; i < sleepMins ; i++) { + for(int i=0; i < sleepMins; i++) { LOGGER.info("Waiting..."); TimeUtil.sleepSeconds(60); } String bundleID = InstanceUtil.getLatestBundleID(cluster2, entityName, EntityType.FEED); - OozieUtil.validateRetryAttempts(cluster2, bundleID,EntityType.FEED, 1); + OozieUtil.validateRetryAttempts(cluster2, bundleID, EntityType.FEED, 1); //check if data has been replicated correctly List<Path> cluster1ReplicatedData = HadoopUtil - .getAllFilesRecursivelyHDFS(cluster1FS, new Path(source)); + .getAllFilesRecursivelyHDFS(cluster1FS, new Path(source)); List<Path> cluster2ReplicatedData = HadoopUtil - .getAllFilesRecursivelyHDFS(cluster2FS, new Path(target)); + .getAllFilesRecursivelyHDFS(cluster2FS, new Path(target)); AssertUtil.checkForListSizes(cluster1ReplicatedData, cluster2ReplicatedData); } - private String splitPathFromIp (String src,String port) { - String req_src,tempSrc=""; - if(src.contains(":")) { - String tempPath[] = src.split(":"); + private String splitPathFromIp(String src, String port) { + String reqSrc, tempSrc = ""; + if (src.contains(":")) { + String[] tempPath = src.split(":"); for (String aTempPath : tempPath) { if (aTempPath.startsWith(port)) { tempSrc = aTempPath; @@ -277,21 +275,23 @@ public class FeedLateRerunTest extends BaseTestClass { } } - if(tempSrc.isEmpty()) { - req_src=src; - } - else { - req_src=tempSrc.replace(port,""); + if (tempSrc.isEmpty()) { + reqSrc = src; + } else { + reqSrc=tempSrc.replace(port, ""); } - return req_src; + return reqSrc; } /* prismHelper1 - source colo prismHelper2 - target colo */ - private List<String> getAndCreateDependencies (ColoHelper prismHelper1, FileSystem clusterFS1, ColoHelper prismHelper2,OozieClient oozieClient2, String bundleId, boolean dataFlag, String entityName) - throws OozieClientException, IOException { + private List<String> getAndCreateDependencies(ColoHelper prismHelper1, FileSystem clusterFS1, + ColoHelper prismHelper2, + OozieClient oozieClient2, String bundleId, + boolean dataFlag, String entityName) + throws OozieClientException, IOException { List<String> missingDependencies = OozieUtil.getMissingDependencies(prismHelper2, bundleId); for (int i = 0; i < 10 && missingDependencies == null; ++i) { @@ -310,10 +310,10 @@ public class FeedLateRerunTest extends BaseTestClass { InstanceUtil.createHDFSFolders(prismHelper1, missingDependencies); //Adding data to empty folders depending on dataFlag - if(dataFlag) { + if (dataFlag) { int tempCount = 1; for (String location : missingDependencies) { - if(tempCount==1) { + if (tempCount==1) { LOGGER.info("Transferring data to : " + location); HadoopUtil.copyDataToFolder(clusterFS1, location, OSUtil.RESOURCES + "feed-s4Replication.xml"); tempCount++; @@ -323,13 +323,13 @@ public class FeedLateRerunTest extends BaseTestClass { //replication should start, wait while it ends InstanceUtil.waitTillInstanceReachState(oozieClient2, entityName, 1, - CoordinatorAction.Status.SUCCEEDED, EntityType.FEED); + CoordinatorAction.Status.SUCCEEDED, EntityType.FEED); // Adding data for late rerun int tempCounter = 1; for (String dependency : missingDependencies) { - if(tempCounter==1) { + if (tempCounter==1) { LOGGER.info("Transferring late data to : " + dependency); HadoopUtil.copyDataToFolder(clusterFS1, dependency, OSUtil.RESOURCES + "log4j.properties"); }
http://git-wip-us.apache.org/repos/asf/incubator-falcon/blob/387604d1/falcon-regression/merlin/src/test/java/org/apache/falcon/regression/FeedReplicationTest.java ---------------------------------------------------------------------- diff --git a/falcon-regression/merlin/src/test/java/org/apache/falcon/regression/FeedReplicationTest.java b/falcon-regression/merlin/src/test/java/org/apache/falcon/regression/FeedReplicationTest.java index 0c160ea..b7afad4 100644 --- a/falcon-regression/merlin/src/test/java/org/apache/falcon/regression/FeedReplicationTest.java +++ b/falcon-regression/merlin/src/test/java/org/apache/falcon/regression/FeedReplicationTest.java @@ -104,8 +104,8 @@ public class FeedReplicationTest extends BaseTestClass { */ @Test public void replicate1Source1Target() - throws AuthenticationException, IOException, URISyntaxException, JAXBException, - OozieClientException, InterruptedException { + throws AuthenticationException, IOException, URISyntaxException, JAXBException, + OozieClientException, InterruptedException { Bundle.submitCluster(bundles[0], bundles[1]); String startTime = TimeUtil.getTimeWrtSystemTime(0); String endTime = TimeUtil.addMinsToTime(startTime, 5); http://git-wip-us.apache.org/repos/asf/incubator-falcon/blob/387604d1/falcon-regression/merlin/src/test/java/org/apache/falcon/regression/FeedSubmitAndScheduleTest.java ---------------------------------------------------------------------- diff --git a/falcon-regression/merlin/src/test/java/org/apache/falcon/regression/FeedSubmitAndScheduleTest.java b/falcon-regression/merlin/src/test/java/org/apache/falcon/regression/FeedSubmitAndScheduleTest.java index 1c8b176..c020127 100644 --- a/falcon-regression/merlin/src/test/java/org/apache/falcon/regression/FeedSubmitAndScheduleTest.java +++ b/falcon-regression/merlin/src/test/java/org/apache/falcon/regression/FeedSubmitAndScheduleTest.java @@ -81,8 +81,8 @@ public class FeedSubmitAndScheduleTest extends BaseTestClass { * @throws AuthenticationException */ private void submitFirstClusterScheduleFirstFeed() - throws JAXBException, IOException, URISyntaxException, AuthenticationException, - InterruptedException { + throws JAXBException, IOException, URISyntaxException, AuthenticationException, + InterruptedException { AssertUtil.assertSucceeded(prism.getClusterHelper() .submitEntity(bundles[0].getClusters().get(0))); ServiceResponse response = prism.getFeedHelper().submitAndSchedule(feed); http://git-wip-us.apache.org/repos/asf/incubator-falcon/blob/387604d1/falcon-regression/merlin/src/test/java/org/apache/falcon/regression/InstanceParamTest.java ---------------------------------------------------------------------- diff --git a/falcon-regression/merlin/src/test/java/org/apache/falcon/regression/InstanceParamTest.java b/falcon-regression/merlin/src/test/java/org/apache/falcon/regression/InstanceParamTest.java index 5df9d05..0833f06 100644 --- a/falcon-regression/merlin/src/test/java/org/apache/falcon/regression/InstanceParamTest.java +++ b/falcon-regression/merlin/src/test/java/org/apache/falcon/regression/InstanceParamTest.java @@ -96,8 +96,8 @@ public class InstanceParamTest extends BaseTestClass { */ @Test(timeOut = 1200000, enabled = false) public void getParamsValidRequestInstanceWaiting() - throws URISyntaxException, JAXBException, AuthenticationException, IOException, - OozieClientException, InterruptedException { + throws URISyntaxException, JAXBException, AuthenticationException, IOException, + OozieClientException, InterruptedException { processBundle.setProcessValidity(startTime, endTime); processBundle.addClusterToBundle(bundles[1].getClusters().get(0), ClusterType.SOURCE, null, null); @@ -115,8 +115,8 @@ public class InstanceParamTest extends BaseTestClass { */ @Test(timeOut = 1200000, enabled = true) public void getParamsValidRequestInstanceSucceeded() - throws URISyntaxException, JAXBException, AuthenticationException, IOException, - OozieClientException, InterruptedException { + throws URISyntaxException, JAXBException, AuthenticationException, IOException, + OozieClientException, InterruptedException { processBundle.setProcessValidity(startTime, endTime); processBundle.addClusterToBundle(bundles[1].getClusters().get(0), ClusterType.SOURCE, null, null); @@ -134,12 +134,11 @@ public class InstanceParamTest extends BaseTestClass { /** * Schedule process. Wait till instance got killed. Get its params. - * TODO: change according to test case */ @Test(timeOut = 1200000, enabled = false) public void getParamsValidRequestInstanceKilled() - throws URISyntaxException, JAXBException, AuthenticationException, IOException, - OozieClientException, InterruptedException { + throws URISyntaxException, JAXBException, AuthenticationException, IOException, + OozieClientException, InterruptedException { processBundle.setProcessValidity(startTime, endTime); processBundle.addClusterToBundle(bundles[1].getClusters().get(0), ClusterType.SOURCE, null, null); @@ -149,7 +148,7 @@ public class InstanceParamTest extends BaseTestClass { InstanceUtil.waitTillInstancesAreCreated(cluster1, processBundle.getProcessData(), 0); OozieUtil.createMissingDependencies(cluster1, EntityType.PROCESS, processName, 0); InstanceUtil.waitTillInstanceReachState(cluster1OC, processName, 0, - CoordinatorAction.Status.SUCCEEDED, EntityType.PROCESS); + CoordinatorAction.Status.SUCCEEDED, EntityType.PROCESS); //change according to test case InstancesResult r = prism.getProcessHelper() .getInstanceParams(processName, "?start=" + startTime); r.getMessage(); http://git-wip-us.apache.org/repos/asf/incubator-falcon/blob/387604d1/falcon-regression/merlin/src/test/java/org/apache/falcon/regression/InstanceSummaryTest.java ---------------------------------------------------------------------- diff --git a/falcon-regression/merlin/src/test/java/org/apache/falcon/regression/InstanceSummaryTest.java b/falcon-regression/merlin/src/test/java/org/apache/falcon/regression/InstanceSummaryTest.java index 77b82ae..8395476 100644 --- a/falcon-regression/merlin/src/test/java/org/apache/falcon/regression/InstanceSummaryTest.java +++ b/falcon-regression/merlin/src/test/java/org/apache/falcon/regression/InstanceSummaryTest.java @@ -60,15 +60,15 @@ import java.util.List; @Test(groups = "embedded") public class InstanceSummaryTest extends BaseTestClass { - String baseTestHDFSDir = baseHDFSDir + "/InstanceSummaryTest"; - String feedInputPath = baseTestHDFSDir + "/testInputData" + MINUTE_DATE_PATTERN; - String aggregateWorkflowDir = baseTestHDFSDir + "/aggregator"; - String startTime; - String endTime; - ColoHelper cluster3 = servers.get(2); - Bundle processBundle; - private static final Logger logger = Logger.getLogger(InstanceSummaryTest.class); - String processName; + private String baseTestHDFSDir = baseHDFSDir + "/InstanceSummaryTest"; + private String feedInputPath = baseTestHDFSDir + "/testInputData" + MINUTE_DATE_PATTERN; + private String aggregateWorkflowDir = baseTestHDFSDir + "/aggregator"; + private String startTime; + private String endTime; + private ColoHelper cluster3 = servers.get(2); + private Bundle processBundle; + private static final Logger LOGGER = Logger.getLogger(InstanceSummaryTest.class); + private String processName; @BeforeClass(alwaysRun = true) public void createTestData() throws Exception { @@ -86,7 +86,7 @@ public class InstanceSummaryTest extends BaseTestClass { @BeforeMethod(alwaysRun = true) public void setup(Method method) throws Exception { - logger.info("test name: " + method.getName()); + LOGGER.info("test name: " + method.getName()); processBundle = BundleUtil.readELBundle(); processBundle = new Bundle(processBundle, cluster3); processBundle.generateUniqueBundle(); @@ -103,12 +103,11 @@ public class InstanceSummaryTest extends BaseTestClass { /** * Schedule single-cluster process. Get its instances summary. - * TODO: should be complete */ @Test(enabled = true, timeOut = 1200000) public void testSummarySingleClusterProcess() - throws URISyntaxException, JAXBException, IOException, ParseException, - OozieClientException, AuthenticationException, InterruptedException { + throws URISyntaxException, JAXBException, IOException, ParseException, + OozieClientException, AuthenticationException, InterruptedException { processBundle.setProcessValidity(startTime, endTime); processBundle.submitFeedsScheduleProcess(prism); InstanceUtil.waitTillInstancesAreCreated(cluster3, processBundle.getProcessData(), 0); @@ -168,7 +167,7 @@ public class InstanceSummaryTest extends BaseTestClass { // both start end out od range r = prism.getProcessHelper().getInstanceSummary(processName, - "?start=" + TimeUtil.addMinsToTime(startTime,-100) + "?start=" + TimeUtil.addMinsToTime(startTime, -100) + "&end=" + TimeUtil.addMinsToTime(endTime, 100)); // end only @@ -178,7 +177,6 @@ public class InstanceSummaryTest extends BaseTestClass { /** * Adjust multi-cluster process. Submit and schedule it. Get its instances summary. - * TODO: should be complete */ @Test(enabled = true, timeOut = 1200000) public void testSummaryMultiClusterProcess() throws JAXBException, @@ -214,7 +212,6 @@ public class InstanceSummaryTest extends BaseTestClass { /** * Adjust multi-cluster feed. Submit and schedule it. Get its instances summary. - * TODO: should be complete */ @Test(enabled = true, timeOut = 1200000) public void testSummaryMultiClusterFeed() throws JAXBException, ParseException, IOException, http://git-wip-us.apache.org/repos/asf/incubator-falcon/blob/387604d1/falcon-regression/merlin/src/test/java/org/apache/falcon/regression/NewRetryTest.java ---------------------------------------------------------------------- diff --git a/falcon-regression/merlin/src/test/java/org/apache/falcon/regression/NewRetryTest.java b/falcon-regression/merlin/src/test/java/org/apache/falcon/regression/NewRetryTest.java index 670f6db..99a42fd 100644 --- a/falcon-regression/merlin/src/test/java/org/apache/falcon/regression/NewRetryTest.java +++ b/falcon-regression/merlin/src/test/java/org/apache/falcon/regression/NewRetryTest.java @@ -30,7 +30,7 @@ import org.apache.falcon.regression.core.util.AssertUtil; import org.apache.falcon.regression.core.util.BundleUtil; import org.apache.falcon.regression.core.util.HadoopUtil; import org.apache.falcon.regression.core.util.InstanceUtil; -import org.apache.falcon.regression.core.util.MathUtil; +import org.apache.falcon.regression.core.util.MatrixUtil; import org.apache.falcon.regression.core.util.OSUtil; import org.apache.falcon.regression.core.util.OozieUtil; import org.apache.falcon.regression.core.util.TimeUtil; @@ -64,19 +64,22 @@ import java.util.HashMap; import java.util.List; import java.util.concurrent.TimeUnit; +/** + * Tests with Retries. + */ @Test(groups = "embedded") public class NewRetryTest extends BaseTestClass { - private static final Logger logger = Logger.getLogger(NewRetryTest.class); - ColoHelper cluster = servers.get(0); - FileSystem clusterFS = serverFS.get(0); - OozieClient clusterOC = serverOC.get(0); + private static final Logger LOGGER = Logger.getLogger(NewRetryTest.class); + private ColoHelper cluster = servers.get(0); + private FileSystem clusterFS = serverFS.get(0); + private OozieClient clusterOC = serverOC.get(0); - DateTimeFormatter formatter = DateTimeFormat.forPattern("yyyy/MM/dd/HH/mm"); - final private String baseTestDir = baseHDFSDir + "/NewRetryTest"; - final private String aggregateWorkflowDir = baseTestDir + "/aggregator"; - final private String lateDir = baseTestDir + "/lateDataTest/testFolders"; - final private String latePath = lateDir + MINUTE_DATE_PATTERN; + private DateTimeFormatter formatter = DateTimeFormat.forPattern("yyyy/MM/dd/HH/mm"); + private final String baseTestDir = baseHDFSDir + "/NewRetryTest"; + private final String aggregateWorkflowDir = baseTestDir + "/aggregator"; + private final String lateDir = baseTestDir + "/lateDataTest/testFolders"; + private final String latePath = lateDir + MINUTE_DATE_PATTERN; private DateTime startDate; private DateTime endDate; @@ -142,12 +145,12 @@ public class NewRetryTest extends BaseTestClass { int defaultRetries = bundles[0].getProcessObject().getRetry().getAttempts(); - + retry.setAttempts((0)); bundles[0].setRetry(retry); - logger.info("going to update process at:" + DateTime.now(DateTimeZone.UTC)); + LOGGER.info("going to update process at:" + DateTime.now(DateTimeZone.UTC)); prism.getProcessHelper() .update((bundles[0].getProcessData()), bundles[0].getProcessData()); String newBundleId = InstanceUtil.getLatestBundleID(cluster, @@ -193,12 +196,12 @@ public class NewRetryTest extends BaseTestClass { Assert.assertTrue(validateFailureRetries(clusterOC, bundleId, 1), "Failure Retry validation failed"); - + retry.setAttempts((retry.getAttempts() - 2)); bundles[0].setRetry(retry); - logger.info("going to update process at:" + DateTime.now(DateTimeZone.UTC)); + LOGGER.info("going to update process at:" + DateTime.now(DateTimeZone.UTC)); if ((retry.getAttempts() - 2) > 0) { Assert.assertTrue(prism.getProcessHelper() @@ -255,7 +258,7 @@ public class NewRetryTest extends BaseTestClass { bundles[0].setRetry(retry); - logger.info("going to update process at:" + DateTime.now(DateTimeZone.UTC)); + LOGGER.info("going to update process at:" + DateTime.now(DateTimeZone.UTC)); Assert.assertTrue(prism.getProcessHelper() .update((bundles[0].getProcessData()), bundles[0].getProcessData()) .getMessage().contains("updated successfully"), @@ -305,12 +308,12 @@ public class NewRetryTest extends BaseTestClass { Assert.assertTrue(validateFailureRetries(clusterOC, bundleId, 2), "Failure Retry validation failed"); - + retry.setAttempts((2)); bundles[0].setRetry(retry); - logger.info("going to update process at:" + DateTime.now(DateTimeZone.UTC)); + LOGGER.info("going to update process at:" + DateTime.now(DateTimeZone.UTC)); Assert.assertTrue( prism.getProcessHelper() .update((bundles[0].getProcessData()), bundles[0].getProcessData()) @@ -359,7 +362,7 @@ public class NewRetryTest extends BaseTestClass { bundles[0].setRetry(retry); - logger.info("going to update process at:" + DateTime.now(DateTimeZone.UTC)); + LOGGER.info("going to update process at:" + DateTime.now(DateTimeZone.UTC)); Assert.assertTrue(prism.getProcessHelper() .update(Util.readEntityName(bundles[0].getProcessData()), null).getMessage() @@ -407,7 +410,7 @@ public class NewRetryTest extends BaseTestClass { bundles[0].setRetry(retry); - logger.info("going to update process at:" + DateTime.now(DateTimeZone.UTC)); + LOGGER.info("going to update process at:" + DateTime.now(DateTimeZone.UTC)); Assert.assertTrue( prism.getProcessHelper().update(Util.readEntityName(bundles[0].getProcessData()), bundles[0].getProcessData()).getMessage() @@ -459,7 +462,7 @@ public class NewRetryTest extends BaseTestClass { bundles[0].setRetry(retry); - logger.info("going to update process at:" + DateTime.now(DateTimeZone.UTC)); + LOGGER.info("going to update process at:" + DateTime.now(DateTimeZone.UTC)); Assert.assertTrue(prism.getProcessHelper() .update(Util.readEntityName(bundles[0].getProcessData()), bundles[0].getProcessData()).getMessage() @@ -512,7 +515,7 @@ public class NewRetryTest extends BaseTestClass { bundles[0].setRetry(retry); - logger.info("going to update process at:" + DateTime.now(DateTimeZone.UTC)); + LOGGER.info("going to update process at:" + DateTime.now(DateTimeZone.UTC)); Assert.assertFalse( prism.getProcessHelper().update(Util.readEntityName(bundles[0].getProcessData()) , bundles[0].getProcessData()).getMessage().contains("updated successfully"), @@ -571,7 +574,7 @@ public class NewRetryTest extends BaseTestClass { @Test(dataProvider = "DP", groups = {"0.2.2", "retry"}, enabled = false) public void testUserRetryWhileAutomaticRetriesHappen(Retry retry) throws Exception { - DateTimeFormatter formatter = DateTimeFormat.forPattern("yyyy-MM-dd/hh:mm"); + DateTimeFormatter timeFormatter = DateTimeFormat.forPattern("yyyy-MM-dd/hh:mm"); bundles[0].setRetry(retry); @@ -579,7 +582,7 @@ public class NewRetryTest extends BaseTestClass { AssertUtil.assertSucceeded(prism.getFeedHelper().submitEntity(data)); } - logger.info("process dates: " + startDate + "," + endDate); + LOGGER.info("process dates: " + startDate + "," + endDate); //submit and schedule process ServiceResponse response = @@ -606,12 +609,12 @@ public class NewRetryTest extends BaseTestClass { "Failure Retry validation failed"); //now start firing random retries - logger.info("now firing user reruns:"); + LOGGER.info("now firing user reruns:"); for (int i = 0; i < 1; i++) { prism.getProcessHelper() .getProcessInstanceRerun(Util.readEntityName(bundles[0].getProcessData()), - "?start=" + formatter.print(startDate).replace("/", "T") + "Z" + - "&end=" + formatter.print(endDate).replace("/", "T") + "Z"); + "?start=" + timeFormatter.print(startDate).replace("/", "T") + "Z" + + "&end=" + timeFormatter.print(endDate).replace("/", "T") + "Z"); } //now to validate all failed instances to check if they were retried or not. validateRetry(clusterOC, bundleId, @@ -625,7 +628,7 @@ public class NewRetryTest extends BaseTestClass { @Test(dataProvider = "DP", groups = {"0.2.2", "retry"}, enabled = false) public void testUserRetryAfterAutomaticRetriesHappen(Retry retry) throws Exception { - DateTimeFormatter formatter = DateTimeFormat.forPattern("yyyy-MM-dd/hh:mm"); + DateTimeFormatter timeFormatter = DateTimeFormat.forPattern("yyyy-MM-dd/hh:mm"); bundles[0].setRetry(retry); @@ -633,7 +636,7 @@ public class NewRetryTest extends BaseTestClass { AssertUtil.assertSucceeded(prism.getFeedHelper().submitEntity(data)); } - logger.info("process dates: " + startDate + "," + endDate); + LOGGER.info("process dates: " + startDate + "," + endDate); //submit and schedule process ServiceResponse response = @@ -656,14 +659,14 @@ public class NewRetryTest extends BaseTestClass { validateRetry(clusterOC, bundleId, bundles[0].getProcessObject().getRetry().getAttempts()); - logger.info("now firing user reruns:"); + LOGGER.info("now firing user reruns:"); DateTime[] dateBoundaries = getFailureTimeBoundaries(clusterOC, bundleId); InstancesResult piResult = prism.getProcessHelper() .getProcessInstanceRerun(Util.readEntityName(bundles[0].getProcessData()), - "?start=" + formatter.print(dateBoundaries[0]).replace("/", "T") + - "Z" + "&end=" + formatter.print(dateBoundaries[dateBoundaries.length - 1]) - .replace("/", "T") + "Z"); + "?start=" + timeFormatter.print(dateBoundaries[0]).replace("/", "T") + "Z" + + "&end=" + timeFormatter.print(dateBoundaries[dateBoundaries.length - 1]) + .replace("/", "T") + "Z"); AssertUtil.assertSucceeded(piResult); @@ -713,9 +716,9 @@ public class NewRetryTest extends BaseTestClass { Assert.assertNotNull(dates, String .format("Start time for running coordinators of bundle: %s should not be null.", bundleId)); - logger.info("Start time: " + formatter.print(startDate)); - logger.info("End time: " + formatter.print(endDate)); - logger.info("candidate nominal time:" + formatter.print(dates.get(0))); + LOGGER.info("Start time: " + formatter.print(startDate)); + LOGGER.info("End time: " + formatter.print(endDate)); + LOGGER.info("candidate nominal time:" + formatter.print(dates.get(0))); for (int attempt = 0; attempt < 10 && !validateFailureRetries(clusterOC, bundleId, 1); ++attempt) { @@ -724,17 +727,17 @@ public class NewRetryTest extends BaseTestClass { Assert.assertTrue(validateFailureRetries(clusterOC, bundleId, 1), "Failure Retry validation failed"); - logger.info("now suspending the process altogether...."); + LOGGER.info("now suspending the process altogether...."); AssertUtil.assertSucceeded( cluster.getProcessHelper().suspend(bundles[0].getProcessData())); HashMap<String, Integer> initialMap = getFailureRetriesForEachWorkflow( clusterOC, getDefaultOozieCoordinator(clusterOC, bundleId)); - logger.info("saved state of workflow retries"); + LOGGER.info("saved state of workflow retries"); for (String key : initialMap.keySet()) { - logger.info(key + "," + initialMap.get(key)); + LOGGER.info(key + "," + initialMap.get(key)); } TimeUnit.MINUTES.sleep(10); @@ -742,10 +745,10 @@ public class NewRetryTest extends BaseTestClass { HashMap<String, Integer> finalMap = getFailureRetriesForEachWorkflow( clusterOC, getDefaultOozieCoordinator(clusterOC, bundleId)); - logger.info("final state of process looks like:"); + LOGGER.info("final state of process looks like:"); for (String key : finalMap.keySet()) { - logger.info(key + "," + finalMap.get(key)); + LOGGER.info(key + "," + finalMap.get(key)); } Assert.assertEquals(initialMap.size(), finalMap.size(), @@ -756,7 +759,7 @@ public class NewRetryTest extends BaseTestClass { "values are different for workflow: " + key); } - logger.info("now resuming the process..."); + LOGGER.info("now resuming the process..."); AssertUtil.assertSucceeded( cluster.getProcessHelper().resume(bundles[0].getProcessData())); @@ -814,9 +817,9 @@ public class NewRetryTest extends BaseTestClass { .format("Start time for running coordinators of bundle: %s should not be null.", bundleId)); - logger.info("Start time: " + formatter.print(startDate)); - logger.info("End time: " + formatter.print(endDate)); - logger.info("candidate nominal time:" + formatter.print(dates.get(0))); + LOGGER.info("Start time: " + formatter.print(startDate)); + LOGGER.info("End time: " + formatter.print(endDate)); + LOGGER.info("candidate nominal time:" + formatter.print(dates.get(0))); DateTime now = dates.get(0); if (formatter.print(startDate).compareToIgnoreCase(formatter.print(dates.get(0))) > 0) { @@ -836,7 +839,7 @@ public class NewRetryTest extends BaseTestClass { String insertionFolder = Util.findFolderBetweenGivenTimeStamps(now, now.plusMinutes(5), initialData); - logger.info("inserting data in folder " + insertionFolder + " at " + DateTime.now()); + LOGGER.info("inserting data in folder " + insertionFolder + " at " + DateTime.now()); HadoopUtil.injectMoreData(clusterFS, lateDir + insertionFolder, OSUtil.OOZIE_EXAMPLE_INPUT_DATA + "lateData"); //now to validate all failed instances to check if they were retried or not. @@ -892,9 +895,9 @@ public class NewRetryTest extends BaseTestClass { - (bundles[0].getProcessObject().getRetry().getAttempts()) / 2) ^ 2)); } else { TimeUnit.MINUTES - .sleep(retry.getDelay().getFrequencyAsInt() * - ((bundles[0].getProcessObject().getRetry().getAttempts()) - - (bundles[0].getProcessObject().getRetry().getAttempts()) / 2)); + .sleep(retry.getDelay().getFrequencyAsInt() + * ((bundles[0].getProcessObject().getRetry().getAttempts()) + - (bundles[0].getProcessObject().getRetry().getAttempts()) / 2)); } //now to validate all failed instances to check if they were retried or not. @@ -919,7 +922,7 @@ public class NewRetryTest extends BaseTestClass { for (int i = 0; i < 60 && !validateFailureRetries(oozieClient, bundleId, maxNumberOfRetries); ++i) { - logger.info("desired state not reached, attempt number: " + i); + LOGGER.info("desired state not reached, attempt number: " + i); TimeUtil.sleepSeconds(10); } Assert.assertTrue(validateFailureRetries(oozieClient, bundleId, maxNumberOfRetries), @@ -933,13 +936,13 @@ public class NewRetryTest extends BaseTestClass { if (maxNumberOfRetries < 0) { maxNumberOfRetries = 0; } - logger.info("coordinator: " + coordinator); + LOGGER.info("coordinator: " + coordinator); HashMap<String, Boolean> workflowMap = new HashMap<String, Boolean>(); if (coordinator == null || coordinator.getActions().size() == 0) { return false; } - logger.info("coordinator.getActions(): " + coordinator.getActions()); + LOGGER.info("coordinator.getActions(): " + coordinator.getActions()); for (CoordinatorAction action : coordinator.getActions()) { if (null == action.getExternalId()) { @@ -948,12 +951,12 @@ public class NewRetryTest extends BaseTestClass { WorkflowJob actionInfo = oozieClient.getJobInfo(action.getExternalId()); - logger + LOGGER .info("actionInfo: " + actionInfo + " actionInfo.getRun(): " + actionInfo.getRun()); - if (!(actionInfo.getStatus() == WorkflowJob.Status.SUCCEEDED || - actionInfo.getStatus() == WorkflowJob.Status.RUNNING)) { + if (!(actionInfo.getStatus() == WorkflowJob.Status.SUCCEEDED + || actionInfo.getStatus() == WorkflowJob.Status.RUNNING)) { if (actionInfo.getRun() == maxNumberOfRetries) { workflowMap.put(actionInfo.getId(), true); } else { @@ -996,13 +999,14 @@ public class NewRetryTest extends BaseTestClass { @DataProvider(name = "DP") public Object[][] getData() { - String[] retryTypes = new String[]{"periodic", "exp-backoff"};//,"exp-backoff" - Integer[] delays = new Integer[]{2, - 0};//removing -1 since this should be checked at validation level while setting + String[] retryTypes = new String[]{"periodic", "exp-backoff"}; //,"exp-backoff" + Integer[] delays = new Integer[]{2, 0}; //removing -1 since this should be checked at + // validation level while setting String[] delayUnits = new String[]{"minutes"}; - Integer[] retryAttempts = new Integer[]{2, 0, 3};//0,-1,2 + Integer[] retryAttempts = new Integer[]{2, 0, 3}; //0,-1,2 - Object[][] crossProd = MathUtil.crossProduct(delays, delayUnits, retryTypes, retryAttempts); + Object[][] crossProd = MatrixUtil + .crossProduct(delays, delayUnits, retryTypes, retryAttempts); Object[][] testData = new Object[crossProd.length][1]; for (int i = 0; i < crossProd.length; ++i) { final Integer delay = (Integer) crossProd[i][0]; @@ -1060,7 +1064,7 @@ public class NewRetryTest extends BaseTestClass { } WorkflowJob actionInfo = oozieClient.getJobInfo(action.getExternalId()); - logger.info("adding workflow " + actionInfo.getId() + " to the map"); + LOGGER.info("adding workflow " + actionInfo.getId() + " to the map"); workflowRetryMap.put(actionInfo.getId(), actionInfo.getRun()); } return workflowRetryMap; @@ -1085,12 +1089,12 @@ public class NewRetryTest extends BaseTestClass { return dateList.toArray(new DateTime[dateList.size()]); } - private void checkIfRetriesWereTriggeredCorrectly(ColoHelper coloHelper, Retry retry, + private void checkIfRetriesWereTriggeredCorrectly(ColoHelper coloHelper, Retry retry, String bundleId) throws Exception { //it is presumed that this delay here will be expressed in minutes. Hourly/daily is // unfeasible to check :) - final DateTimeFormatter formatter = DateTimeFormat.forPattern("HH:mm:ss"); + final DateTimeFormatter timeFormatter = DateTimeFormat.forPattern("HH:mm:ss"); final OozieClient oozieClient = coloHelper.getFeedHelper().getOozieClient(); final CoordinatorJob coordinator = getDefaultOozieCoordinator(oozieClient, bundleId); @@ -1105,38 +1109,37 @@ public class NewRetryTest extends BaseTestClass { List<String> instanceFinishTimes = Util.getInstanceFinishTimes(coloHelper, action.getExternalId()); - logger.info("finish times look like:"); + LOGGER.info("finish times look like:"); for (String line : instanceFinishTimes) { - logger.info(line); + LOGGER.info(line); } - logger.info("retry times look like:"); + LOGGER.info("retry times look like:"); for (String line : instanceRetryTimes) { - logger.info(line); + LOGGER.info(line); } - logger.info("checking timelines for retry type " + retry.getPolicy().value() + - " for delay " + expectedDelay + " for workflow id: " +action.getExternalId()); + LOGGER.info("checking timelines for retry type " + retry.getPolicy().value() + + " for delay " + expectedDelay + " for workflow id: " + action.getExternalId()); if (retry.getPolicy() == PolicyType.PERIODIC) { //in this case the delay unit will always be a constant time diff for (int i = 0; i < instanceFinishTimes.size() - 1; i++) { - DateTime temp = formatter.parseDateTime(instanceFinishTimes.get(i)); + DateTime temp = timeFormatter.parseDateTime(instanceFinishTimes.get(i)); Assert.assertEquals(temp.plusMinutes(expectedDelay).getMillis(), - formatter.parseDateTime(instanceRetryTimes.get(i)).getMillis(), - 5000, "oops! this is out of expected delay range for workflow id " + - action.getExternalId()); + timeFormatter.parseDateTime(instanceRetryTimes.get(i)).getMillis(), + 5000, "oops! this is out of expected delay range for workflow id " + + action.getExternalId()); } } else { //check for exponential for (int i = 0; i < instanceFinishTimes.size() - 1; i++) { - DateTime temp = formatter.parseDateTime(instanceFinishTimes.get(i)); + DateTime temp = timeFormatter.parseDateTime(instanceFinishTimes.get(i)); Assert.assertEquals(temp.plusMinutes(expectedDelay).getMillis(), - formatter.parseDateTime(instanceRetryTimes.get(i)).getMillis(), - 5000, - "oops! this is out of expected delay range for workflow id " + - action.getExternalId()); + timeFormatter.parseDateTime(instanceRetryTimes.get(i)).getMillis(), + 5000, "oops! this is out of expected delay range for workflow id " + + action.getExternalId()); expectedDelay *= 2; } } http://git-wip-us.apache.org/repos/asf/incubator-falcon/blob/387604d1/falcon-regression/merlin/src/test/java/org/apache/falcon/regression/NoOutputProcessTest.java ---------------------------------------------------------------------- diff --git a/falcon-regression/merlin/src/test/java/org/apache/falcon/regression/NoOutputProcessTest.java b/falcon-regression/merlin/src/test/java/org/apache/falcon/regression/NoOutputProcessTest.java index 871e8dc..7a4fee4 100644 --- a/falcon-regression/merlin/src/test/java/org/apache/falcon/regression/NoOutputProcessTest.java +++ b/falcon-regression/merlin/src/test/java/org/apache/falcon/regression/NoOutputProcessTest.java @@ -53,17 +53,17 @@ import java.util.List; @Test(groups = "embedded") public class NoOutputProcessTest extends BaseTestClass { - ColoHelper cluster = servers.get(0); - FileSystem clusterFS = serverFS.get(0); - OozieClient clusterOC = serverOC.get(0); - String testDir = baseHDFSDir + "/NoOutputProcessTest"; - String inputPath = testDir + "/input" + MINUTE_DATE_PATTERN; - String workflowForNoIpOp = baseHDFSDir + "/PrismProcessScheduleTest/noInOp"; - private static final Logger logger = Logger.getLogger(NoOutputProcessTest.class); + private ColoHelper cluster = servers.get(0); + private FileSystem clusterFS = serverFS.get(0); + private OozieClient clusterOC = serverOC.get(0); + private String testDir = baseHDFSDir + "/NoOutputProcessTest"; + private String inputPath = testDir + "/input" + MINUTE_DATE_PATTERN; + private String workflowForNoIpOp = baseHDFSDir + "/PrismProcessScheduleTest/noInOp"; + private static final Logger LOGGER = Logger.getLogger(NoOutputProcessTest.class); @BeforeClass(alwaysRun = true) public void createTestData() throws Exception { - logger.info("in @BeforeClass"); + LOGGER.info("in @BeforeClass"); uploadDirToClusters(workflowForNoIpOp, OSUtil.RESOURCES + "workflows/aggregatorNoOutput/"); Bundle b = BundleUtil.readELBundle(); b.generateUniqueBundle(); @@ -79,7 +79,7 @@ public class NoOutputProcessTest extends BaseTestClass { @BeforeMethod(alwaysRun = true) public void testName(Method method) throws Exception { - logger.info("test name: " + method.getName()); + LOGGER.info("test name: " + method.getName()); bundles[0] = BundleUtil.readELBundle(); bundles[0].generateUniqueBundle(); bundles[0] = new Bundle(bundles[0], cluster); @@ -106,7 +106,7 @@ public class NoOutputProcessTest extends BaseTestClass { */ @Test(enabled = true, groups = {"singleCluster"}) public void checkForJMSMsgWhenNoOutput() throws Exception { - logger.info("attaching messageConsumer to: " + "FALCON.ENTITY.TOPIC"); + LOGGER.info("attaching messageConsumer to: " + "FALCON.ENTITY.TOPIC"); JmsMessageConsumer messageConsumer = new JmsMessageConsumer("FALCON.ENTITY.TOPIC", cluster.getClusterHelper().getActiveMQ()); messageConsumer.start(); http://git-wip-us.apache.org/repos/asf/incubator-falcon/blob/387604d1/falcon-regression/merlin/src/test/java/org/apache/falcon/regression/ProcessFrequencyTest.java ---------------------------------------------------------------------- diff --git a/falcon-regression/merlin/src/test/java/org/apache/falcon/regression/ProcessFrequencyTest.java b/falcon-regression/merlin/src/test/java/org/apache/falcon/regression/ProcessFrequencyTest.java index 7a232ba..79c89df 100644 --- a/falcon-regression/merlin/src/test/java/org/apache/falcon/regression/ProcessFrequencyTest.java +++ b/falcon-regression/merlin/src/test/java/org/apache/falcon/regression/ProcessFrequencyTest.java @@ -7,14 +7,13 @@ * "License"); you may not use this file except in compliance * with the License. You may obtain a copy of the License at * - * http://www.apache.org/licenses/LICENSE-2.0 + * http://www.apache.org/licenses/LICENSE-2.0 * - * Unless required by applicable law or agreed to in writing, - * software distributed under the License is distributed on an - * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY - * KIND, either express or implied. See the License for the - * specific language governing permissions and limitations - * under the License. + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. */ package org.apache.falcon.regression; @@ -48,6 +47,9 @@ import org.testng.annotations.Test; import java.io.IOException; import java.lang.reflect.Method; +/** + * Test process with different frequency combinations. + */ public class ProcessFrequencyTest extends BaseTestClass { private static final Logger LOGGER = Logger.getLogger(ProcessFrequencyTest.class); private ColoHelper cluster = servers.get(0); @@ -91,13 +93,13 @@ public class ProcessFrequencyTest extends BaseTestClass { */ @Test(dataProvider = "generateProcessFrequencies") public void testProcessWithFrequency(final FreqType freqType, final int freqAmount) - throws Exception { + throws Exception { final String startDate = "2010-01-02T01:00Z"; final String endDate = "2010-01-02T01:01Z"; final String inputPath = baseTestHDFSDir + "/input/"; bundles[0].setInputFeedDataPath(inputPath + freqType.getPathValue()); bundles[0].setOutputFeedLocationData( - baseTestHDFSDir + "/output-data/" + freqType.getPathValue()); + baseTestHDFSDir + "/output-data/" + freqType.getPathValue()); bundles[0].setProcessPeriodicity(freqAmount, freqType.getFalconTimeUnit()); bundles[0].setProcessInputStartEnd("now(0,0)", "now(0,0)"); bundles[0].setProcessValidity(startDate, endDate); @@ -106,13 +108,13 @@ public class ProcessFrequencyTest extends BaseTestClass { //upload data HadoopUtil.deleteDirIfExists(inputPath, clusterFS); final String startPath = inputPath + freqType.getFormatter().print( - TimeUtil.oozieDateToDate(startDate)); + TimeUtil.oozieDateToDate(startDate)); HadoopUtil.copyDataToFolder(clusterFS, startPath, OSUtil.NORMAL_INPUT); final String processName = Util.readEntityName(bundles[0].getProcessData()); //InstanceUtil.waitTillInstancesAreCreated(cluster, bundles[0].getProcessData(), 0); InstanceUtil.waitTillInstanceReachState(clusterOC, processName, 1, - CoordinatorAction.Status.SUCCEEDED, EntityType.PROCESS, 5); + CoordinatorAction.Status.SUCCEEDED, EntityType.PROCESS, 5); InstancesResult r = prism.getProcessHelper().getRunningInstance(processName); InstanceUtil.validateSuccessWOInstances(r); } @@ -120,10 +122,10 @@ public class ProcessFrequencyTest extends BaseTestClass { @DataProvider(name = "generateProcessFrequencies") public Object[][] generateProcessFrequencies() { return new Object[][] { - {FreqType.MINUTELY, 2}, - {FreqType.HOURLY, 3}, - {FreqType.DAILY, 5}, - {FreqType.MONTHLY, 7}, + {FreqType.MINUTELY, 2, }, + {FreqType.HOURLY, 3, }, + {FreqType.DAILY, 5, }, + {FreqType.MONTHLY, 7, }, }; } @@ -133,14 +135,14 @@ public class ProcessFrequencyTest extends BaseTestClass { */ @Test public void testProcessWithBadFrequency() - throws Exception { + throws Exception { final String startDate = "2010-01-02T01:00Z"; final String endDate = "2010-01-02T01:01Z"; final String inputPath = baseTestHDFSDir + "/input/"; final FreqType freqType = FreqType.MINUTELY; bundles[0].setInputFeedDataPath(inputPath + freqType.getPathValue()); bundles[0].setOutputFeedLocationData( - baseTestHDFSDir + "/output-data/" + freqType.getPathValue()); + baseTestHDFSDir + "/output-data/" + freqType.getPathValue()); bundles[0].submitClusters(prism); bundles[0].submitFeeds(prism); http://git-wip-us.apache.org/repos/asf/incubator-falcon/blob/387604d1/falcon-regression/merlin/src/test/java/org/apache/falcon/regression/ProcessInstanceColoMixedTest.java ---------------------------------------------------------------------- diff --git a/falcon-regression/merlin/src/test/java/org/apache/falcon/regression/ProcessInstanceColoMixedTest.java b/falcon-regression/merlin/src/test/java/org/apache/falcon/regression/ProcessInstanceColoMixedTest.java index 7d7179c..85d7134 100644 --- a/falcon-regression/merlin/src/test/java/org/apache/falcon/regression/ProcessInstanceColoMixedTest.java +++ b/falcon-regression/merlin/src/test/java/org/apache/falcon/regression/ProcessInstanceColoMixedTest.java @@ -18,12 +18,13 @@ package org.apache.falcon.regression; +import org.apache.falcon.regression.Entities.FeedMerlin; +import org.apache.falcon.regression.Entities.ProcessMerlin; import org.apache.falcon.regression.core.bundle.Bundle; import org.apache.falcon.entity.v0.EntityType; import org.apache.falcon.entity.v0.feed.ActionType; import org.apache.falcon.entity.v0.feed.ClusterType; import org.apache.falcon.regression.core.helpers.ColoHelper; -import org.apache.falcon.regression.core.response.ServiceResponse; import org.apache.falcon.regression.core.util.AssertUtil; import org.apache.falcon.regression.core.util.BundleUtil; import org.apache.falcon.regression.core.util.HadoopUtil; @@ -31,7 +32,6 @@ import org.apache.falcon.regression.core.util.InstanceUtil; import org.apache.falcon.regression.core.util.OSUtil; import org.apache.falcon.regression.core.util.TimeUtil; import org.apache.falcon.regression.core.util.Util; -import org.apache.falcon.regression.core.util.XmlUtil; import org.apache.falcon.regression.testHelper.BaseTestClass; import org.apache.falcon.resource.InstancesResult; import org.apache.hadoop.fs.FileSystem; @@ -112,21 +112,9 @@ public class ProcessInstanceColoMixedTest extends BaseTestClass { //set source and target for the 2 feeds //set clusters to null; - feed01 = InstanceUtil - .setFeedCluster(feed01, - XmlUtil.createValidity("2009-02-01T00:00Z", "2012-01-01T00:00Z"), - XmlUtil.createRetention("days(10000)", ActionType.DELETE), null, - ClusterType.SOURCE, null); - feed02 = InstanceUtil - .setFeedCluster(feed02, - XmlUtil.createValidity("2009-02-01T00:00Z", "2012-01-01T00:00Z"), - XmlUtil.createRetention("days(10000)", ActionType.DELETE), null, - ClusterType.SOURCE, null); - outputFeed = InstanceUtil - .setFeedCluster(outputFeed, - XmlUtil.createValidity("2009-02-01T00:00Z", "2012-01-01T00:00Z"), - XmlUtil.createRetention("days(10000)", ActionType.DELETE), null, - ClusterType.SOURCE, null); + feed01 = FeedMerlin.fromString(feed01).clearFeedClusters().toString(); + feed02 = FeedMerlin.fromString(feed02).clearFeedClusters().toString(); + outputFeed = FeedMerlin.fromString(outputFeed).clearFeedClusters().toString(); //set new feed input data feed01 = Util.setFeedPathValue(feed01, String.format(feedPath, 1)); @@ -147,133 +135,119 @@ public class ProcessInstanceColoMixedTest extends BaseTestClass { String startTime = TimeUtil.getTimeWrtSystemTime(-70); //set clusters for feed01 - feed01 = InstanceUtil - .setFeedCluster(feed01, XmlUtil.createValidity(startTime, "2099-01-01T00:00Z"), - XmlUtil.createRetention("days(10000)", ActionType.DELETE), - Util.readEntityName(bundles[0].getClusters().get(0)), ClusterType.SOURCE, null); - feed01 = InstanceUtil - .setFeedCluster(feed01, XmlUtil.createValidity(startTime, "2099-01-01T00:00Z"), - XmlUtil.createRetention("days(10000)", ActionType.DELETE), - Util.readEntityName(bundles[1].getClusters().get(0)), ClusterType.TARGET, null); + feed01 = FeedMerlin.fromString(feed01).addFeedCluster( + new FeedMerlin.FeedClusterBuilder(Util.readEntityName(bundles[0].getClusters().get(0))) + .withRetention("days(10000)", ActionType.DELETE) + .withValidity(startTime, "2099-01-01T00:00Z") + .withClusterType(ClusterType.SOURCE) + .build()).toString(); + feed01 = FeedMerlin.fromString(feed01).addFeedCluster( + new FeedMerlin.FeedClusterBuilder(Util.readEntityName(bundles[1].getClusters().get(0))) + .withRetention("days(10000)", ActionType.DELETE) + .withValidity(startTime, "2099-01-01T00:00Z") + .withClusterType(ClusterType.TARGET) + .build()).toString(); //set clusters for feed02 - feed02 = InstanceUtil - .setFeedCluster(feed02, XmlUtil.createValidity(startTime, "2099-01-01T00:00Z"), - XmlUtil.createRetention("days(10000)", ActionType.DELETE), - Util.readEntityName(bundles[0].getClusters().get(0)), ClusterType.TARGET, null); - feed02 = InstanceUtil - .setFeedCluster(feed02, XmlUtil.createValidity(startTime, "2099-01-01T00:00Z"), - XmlUtil.createRetention("days(10000)", ActionType.DELETE), - Util.readEntityName(bundles[1].getClusters().get(0)), ClusterType.SOURCE, null); + feed02 = FeedMerlin.fromString(feed02).addFeedCluster( + new FeedMerlin.FeedClusterBuilder(Util.readEntityName(bundles[0].getClusters().get(0))) + .withRetention("days(10000)", ActionType.DELETE) + .withValidity(startTime, "2099-01-01T00:00Z") + .withClusterType(ClusterType.TARGET) + .build()).toString(); + feed02 = FeedMerlin.fromString(feed02).addFeedCluster( + new FeedMerlin.FeedClusterBuilder(Util.readEntityName(bundles[1].getClusters().get(0))) + .withRetention("days(10000)", ActionType.DELETE) + .withValidity(startTime, "2099-01-01T00:00Z") + .withClusterType(ClusterType.SOURCE) + .build()).toString(); //set clusters for output feed - outputFeed = InstanceUtil.setFeedCluster(outputFeed, - XmlUtil.createValidity(startTime, "2099-01-01T00:00Z"), - XmlUtil.createRetention("days(10000)", ActionType.DELETE), - Util.readEntityName(bundles[0].getClusters().get(0)), ClusterType.SOURCE, null); - outputFeed = InstanceUtil.setFeedCluster(outputFeed, - XmlUtil.createValidity(startTime, "2099-01-01T00:00Z"), - XmlUtil.createRetention("days(10000)", ActionType.DELETE), - Util.readEntityName(bundles[1].getClusters().get(0)), ClusterType.TARGET, null); + outputFeed = FeedMerlin.fromString(outputFeed).addFeedCluster( + new FeedMerlin.FeedClusterBuilder(Util.readEntityName(bundles[0].getClusters().get(0))) + .withRetention("days(10000)", ActionType.DELETE) + .withValidity(startTime, "2099-01-01T00:00Z") + .withClusterType(ClusterType.SOURCE) + .build()).toString(); + outputFeed = FeedMerlin.fromString(outputFeed).addFeedCluster( + new FeedMerlin.FeedClusterBuilder(Util.readEntityName(bundles[1].getClusters().get(0))) + .withRetention("days(10000)", ActionType.DELETE) + .withValidity(startTime, "2099-01-01T00:00Z") + .withClusterType(ClusterType.TARGET) + .build()).toString(); //submit and schedule feeds - LOGGER.info("feed01: " + Util.prettyPrintXml(feed01)); - LOGGER.info("feed02: " + Util.prettyPrintXml(feed02)); - LOGGER.info("outputFeed: " + Util.prettyPrintXml(outputFeed)); - - ServiceResponse r = prism.getFeedHelper().submitAndSchedule(feed01); - AssertUtil.assertSucceeded(r); - r = prism.getFeedHelper().submitAndSchedule(feed02); - AssertUtil.assertSucceeded(r); - r = prism.getFeedHelper().submitAndSchedule(outputFeed); - AssertUtil.assertSucceeded(r); - - //create a process with 2 clusters - - //get a process - String process = bundles[0].getProcessData(); - - //add clusters to process + AssertUtil.assertSucceeded(prism.getFeedHelper().submitAndSchedule(feed01)); + AssertUtil.assertSucceeded(prism.getFeedHelper().submitAndSchedule(feed02)); + AssertUtil.assertSucceeded(prism.getFeedHelper().submitAndSchedule(outputFeed)); String processStartTime = TimeUtil.getTimeWrtSystemTime(-16); // String processEndTime = InstanceUtil.getTimeWrtSystemTime(20); - process = InstanceUtil - .setProcessCluster(process, null, - XmlUtil.createProcessValidity(startTime, "2099-01-01T00:00Z")); - process = InstanceUtil - .setProcessCluster(process, Util.readEntityName(bundles[0].getClusters().get(0)), - XmlUtil.createProcessValidity(processStartTime, - TimeUtil.addMinsToTime(processStartTime, 35))); - process = InstanceUtil - .setProcessCluster(process, Util.readEntityName(bundles[1].getClusters().get(0)), - XmlUtil.createProcessValidity( - TimeUtil.addMinsToTime(processStartTime, 16), - TimeUtil.addMinsToTime(processStartTime, 45))); - process = InstanceUtil - .addProcessInputFeed(process, Util.readEntityName(feed02), - Util.readEntityName(feed02)); + String process = bundles[0].getProcessData(); + process = ProcessMerlin.fromString(process).clearProcessCluster().toString(); + process = ProcessMerlin.fromString(process).addProcessCluster( + new ProcessMerlin.ProcessClusterBuilder( + Util.readEntityName(bundles[0].getClusters().get(0))) + .withValidity(processStartTime, TimeUtil.addMinsToTime(processStartTime, 35)) + .build()) + .toString(); + process = ProcessMerlin.fromString(process).addProcessCluster( + new ProcessMerlin.ProcessClusterBuilder( + Util.readEntityName(bundles[1].getClusters().get(0))) + .withValidity(TimeUtil.addMinsToTime(processStartTime, 16), + TimeUtil.addMinsToTime(processStartTime, 45)) + .build()) + .toString(); + process = InstanceUtil.addProcessInputFeed(process, Util.readEntityName(feed02), + Util.readEntityName(feed02)); //submit and schedule process - LOGGER.info("process: " + Util.prettyPrintXml(process)); - prism.getProcessHelper().submitAndSchedule(process); LOGGER.info("Wait till process goes into running "); - InstanceUtil.waitTillInstanceReachState(serverOC.get(0), Util.getProcessName(process), 1, Status.RUNNING, EntityType.PROCESS); InstanceUtil.waitTillInstanceReachState(serverOC.get(1), Util.getProcessName(process), 1, Status.RUNNING, EntityType.PROCESS); - InstancesResult responseInstance = prism.getProcessHelper() - .getProcessInstanceStatus(Util.readEntityName(bundles[0].getProcessData()), - "?start=" + processStartTime + "&end=" + TimeUtil - .addMinsToTime(processStartTime, 45)); + + final String processName = Util.readEntityName(bundles[0].getProcessData()); + InstancesResult responseInstance = prism.getProcessHelper().getProcessInstanceStatus( + processName, "?start=" + processStartTime + + "&end=" + TimeUtil.addMinsToTime(processStartTime, 45)); AssertUtil.assertSucceeded(responseInstance); Assert.assertTrue(responseInstance.getInstances() != null); - responseInstance = prism.getProcessHelper() - .getProcessInstanceSuspend(Util.readEntityName(bundles[0].getProcessData()), - "?start=" + TimeUtil - .addMinsToTime(processStartTime, 37) + "&end=" - + TimeUtil.addMinsToTime(processStartTime, 44)); + responseInstance = prism.getProcessHelper().getProcessInstanceSuspend(processName, + "?start=" + TimeUtil.addMinsToTime(processStartTime, 37) + + "&end=" + TimeUtil.addMinsToTime(processStartTime, 44)); AssertUtil.assertSucceeded(responseInstance); Assert.assertTrue(responseInstance.getInstances() != null); - responseInstance = prism.getProcessHelper() - .getProcessInstanceStatus(Util.readEntityName(bundles[0].getProcessData()), - "?start=" + TimeUtil - .addMinsToTime(processStartTime, 37) + "&end=" - + TimeUtil.addMinsToTime(processStartTime, 44)); + responseInstance = prism.getProcessHelper().getProcessInstanceStatus(processName, + "?start=" + TimeUtil.addMinsToTime(processStartTime, 37) + + "&end=" + TimeUtil.addMinsToTime(processStartTime, 44)); AssertUtil.assertSucceeded(responseInstance); Assert.assertTrue(responseInstance.getInstances() != null); - responseInstance = prism.getProcessHelper() - .getProcessInstanceResume(Util.readEntityName(bundles[0].getProcessData()), - "?start=" + processStartTime + "&end=" + TimeUtil - .addMinsToTime(processStartTime, 7)); + responseInstance = prism.getProcessHelper().getProcessInstanceResume(processName, + "?start=" + processStartTime + "&end=" + TimeUtil.addMinsToTime(processStartTime, 7)); AssertUtil.assertSucceeded(responseInstance); Assert.assertTrue(responseInstance.getInstances() != null); - responseInstance = prism.getProcessHelper() - .getProcessInstanceStatus(Util.readEntityName(bundles[0].getProcessData()), - "?start=" + TimeUtil - .addMinsToTime(processStartTime, 16) + "&end=" - + TimeUtil.addMinsToTime(processStartTime, 45)); + responseInstance = prism.getProcessHelper().getProcessInstanceStatus(processName, + "?start=" + TimeUtil.addMinsToTime(processStartTime, 16) + + "&end=" + TimeUtil.addMinsToTime(processStartTime, 45)); AssertUtil.assertSucceeded(responseInstance); Assert.assertTrue(responseInstance.getInstances() != null); - responseInstance = cluster1.getProcessHelper() - .getProcessInstanceKill(Util.readEntityName(bundles[0].getProcessData()), - "?start=" + processStartTime + "&end=" + TimeUtil - .addMinsToTime(processStartTime, 7)); + responseInstance = cluster1.getProcessHelper().getProcessInstanceKill(processName, + "?start=" + processStartTime + "&end="+ TimeUtil.addMinsToTime(processStartTime, 7)); AssertUtil.assertSucceeded(responseInstance); Assert.assertTrue(responseInstance.getInstances() != null); - responseInstance = prism.getProcessHelper() - .getProcessInstanceRerun(Util.readEntityName(bundles[0].getProcessData()), - "?start=" + processStartTime + "&end=" + TimeUtil - .addMinsToTime(processStartTime, 7)); + responseInstance = prism.getProcessHelper().getProcessInstanceRerun(processName, + "?start=" + processStartTime + "&end=" + TimeUtil.addMinsToTime(processStartTime, 7)); AssertUtil.assertSucceeded(responseInstance); Assert.assertTrue(responseInstance.getInstances() != null); } http://git-wip-us.apache.org/repos/asf/incubator-falcon/blob/387604d1/falcon-regression/merlin/src/test/java/org/apache/falcon/regression/ProcessInstanceStatusTest.java ---------------------------------------------------------------------- diff --git a/falcon-regression/merlin/src/test/java/org/apache/falcon/regression/ProcessInstanceStatusTest.java b/falcon-regression/merlin/src/test/java/org/apache/falcon/regression/ProcessInstanceStatusTest.java index 5bbba8f..6f88103 100644 --- a/falcon-regression/merlin/src/test/java/org/apache/falcon/regression/ProcessInstanceStatusTest.java +++ b/falcon-regression/merlin/src/test/java/org/apache/falcon/regression/ProcessInstanceStatusTest.java @@ -66,8 +66,8 @@ public class ProcessInstanceStatusTest extends BaseTestClass { private String aggregateWorkflowDir = baseTestHDFSDir + "/aggregator"; private String feedInputPath = baseTestHDFSDir + "/input" + MINUTE_DATE_PATTERN; private String feedOutputPath = baseTestHDFSDir + "/output-data" + MINUTE_DATE_PATTERN; - private String feedInputTimedOutPath = baseTestHDFSDir + "/timedoutStatus" + - MINUTE_DATE_PATTERN; + private String feedInputTimedOutPath = baseTestHDFSDir + "/timedoutStatus" + + MINUTE_DATE_PATTERN; private String feedOutputTimedOutPath = baseTestHDFSDir + "/output-data/timedoutStatus" + MINUTE_DATE_PATTERN; private static final Logger LOGGER = Logger.getLogger(ProcessInstanceStatusTest.class); @@ -86,7 +86,7 @@ public class ProcessInstanceStatusTest extends BaseTestClass { } /** - * Configures general process definition which particular properties can be overwritten + * Configures general process definition which particular properties can be overwritten. */ @BeforeMethod(alwaysRun = true) public void setup(Method method) throws Exception { @@ -168,8 +168,8 @@ public class ProcessInstanceStatusTest extends BaseTestClass { */ @Test(groups = {"singleCluster"}) public void testProcessInstanceStatusDateEmpty() - throws JAXBException, AuthenticationException, IOException, URISyntaxException, - OozieClientException, InterruptedException { + throws JAXBException, AuthenticationException, IOException, URISyntaxException, + OozieClientException, InterruptedException { bundles[0].setProcessValidity("2010-01-02T01:00Z", "2010-01-02T01:30Z"); bundles[0].setProcessConcurrency(5); bundles[0].submitFeedsScheduleProcess(prism);