Repository: falcon Updated Branches: refs/heads/master 10ee01a85 -> 237bab6ed
FALCON-1567 Test case for Lifecycle feature Author: Pragya <[email protected]> Reviewers: Paul Isaychuk <[email protected]> Closes #43 from pragya-mittal/FALCON-1567 and squashes the following commits: 9d46ea4 [Pragya] FALCON-1567 Test case for Lifecycle feature cc84d5a [Pragya] Merge branch 'master' of https://github.com/apache/falcon f037385 [Pragya] Merge branch 'master' of https://github.com/apache/falcon 4c19ec0 [Pragya] Merge branch 'master' of https://github.com/apache/falcon 3b7fd63 [Pragya] FALCON-1829 Add regression for submit and schedule process on native scheduler (time based) Project: http://git-wip-us.apache.org/repos/asf/falcon/repo Commit: http://git-wip-us.apache.org/repos/asf/falcon/commit/237bab6e Tree: http://git-wip-us.apache.org/repos/asf/falcon/tree/237bab6e Diff: http://git-wip-us.apache.org/repos/asf/falcon/diff/237bab6e Branch: refs/heads/master Commit: 237bab6edcefa54b1c3818d8d4f81039b893d637 Parents: 10ee01a Author: Pragya <[email protected]> Authored: Wed Feb 17 10:45:39 2016 +0530 Committer: Pragya Mittal <[email protected]> Committed: Wed Feb 17 10:45:39 2016 +0530 ---------------------------------------------------------------------- falcon-regression/CHANGES.txt | 2 + .../falcon/regression/core/util/AssertUtil.java | 12 + .../falcon/regression/core/util/OozieUtil.java | 34 ++- .../falcon/regression/prism/RetentionTest.java | 301 ++++++++++++++++++- 4 files changed, 332 insertions(+), 17 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/falcon/blob/237bab6e/falcon-regression/CHANGES.txt ---------------------------------------------------------------------- diff --git a/falcon-regression/CHANGES.txt b/falcon-regression/CHANGES.txt index 77772a0..566f7e1 100644 --- a/falcon-regression/CHANGES.txt +++ b/falcon-regression/CHANGES.txt @@ -5,6 +5,8 @@ Trunk (Unreleased) INCOMPATIBLE CHANGES NEW FEATURES + FALCON-1567 Test case for Lifecycle feature (Pragya Mittal) + FALCON-1784 Add regression test for for FALCON-1647 (Paul Isaychuk) FALCON-1829 Add regression for submit and schedule process on native scheduler (time based) (Pragya Mittal) http://git-wip-us.apache.org/repos/asf/falcon/blob/237bab6e/falcon-regression/merlin-core/src/main/java/org/apache/falcon/regression/core/util/AssertUtil.java ---------------------------------------------------------------------- diff --git a/falcon-regression/merlin-core/src/main/java/org/apache/falcon/regression/core/util/AssertUtil.java b/falcon-regression/merlin-core/src/main/java/org/apache/falcon/regression/core/util/AssertUtil.java index 0af0c1e..cb79e9c 100644 --- a/falcon-regression/merlin-core/src/main/java/org/apache/falcon/regression/core/util/AssertUtil.java +++ b/falcon-regression/merlin-core/src/main/java/org/apache/falcon/regression/core/util/AssertUtil.java @@ -299,6 +299,18 @@ public final class AssertUtil { } /** + * Checks that ServiceResponse status is status FAILED with expectedMessage. + * + * @param response ServiceResponse + * @param expectedMessage expected message + * @throws JAXBException + */ + public static void assertFailedWithMessage(ServiceResponse response, String expectedMessage) throws JAXBException { + assertFailed(response); + Assert.assertTrue(response.getMessage().contains(expectedMessage), "Incorrect message in response"); + } + + /** * Checks that Instance/Triage result status is FAILED. * * @param response APIResult response http://git-wip-us.apache.org/repos/asf/falcon/blob/237bab6e/falcon-regression/merlin-core/src/main/java/org/apache/falcon/regression/core/util/OozieUtil.java ---------------------------------------------------------------------- diff --git a/falcon-regression/merlin-core/src/main/java/org/apache/falcon/regression/core/util/OozieUtil.java b/falcon-regression/merlin-core/src/main/java/org/apache/falcon/regression/core/util/OozieUtil.java index e73bc5d..4c609b3 100644 --- a/falcon-regression/merlin-core/src/main/java/org/apache/falcon/regression/core/util/OozieUtil.java +++ b/falcon-regression/merlin-core/src/main/java/org/apache/falcon/regression/core/util/OozieUtil.java @@ -30,6 +30,7 @@ import org.apache.oozie.client.Job; import org.apache.oozie.client.CoordinatorAction; import org.apache.oozie.client.CoordinatorJob; import org.apache.oozie.client.WorkflowAction; +import org.apache.oozie.client.WorkflowJob; import org.joda.time.DateTime; import org.apache.log4j.Logger; import org.joda.time.DateTimeZone; @@ -754,8 +755,8 @@ public final class OozieUtil { * Returns configuration object of a given bundleID for a given instanceTime. * * @param oozieClient oozie client of cluster job is running on - * @param bundleID name of process which job is being analyzed - * @param time job status we are waiting for + * @param bundleID bundleID of given cluster + * @param time instanceTime * @throws org.apache.oozie.client.OozieClientException * @throws org.json.JSONException */ @@ -822,4 +823,33 @@ public final class OozieUtil { } return counter == propMap.size(); } + + /** + * Returns configuration object of a given bundleID for a given retentionFeed. + * + * @param oozieClient oozie client of cluster job is running on + * @param bundleID bundleID of given cluster + * @throws OozieClientException + */ + public static Configuration getRetentionConfiguration(OozieClient oozieClient, String bundleID) + throws OozieClientException { + waitForCoordinatorJobCreation(oozieClient, bundleID); + CoordinatorJob coord = null; + List<CoordinatorJob> coordJobs = oozieClient.getBundleJobInfo(bundleID).getCoordinators(); + for (CoordinatorJob coordinatorJob : coordJobs) { + if (coordinatorJob.getAppName().startsWith("FALCON_FEED_RETENTION")) { + coord = oozieClient.getCoordJobInfo(coordinatorJob.getId()); + } + } + + Configuration configuration = new Configuration(); + if (coord != null) { + WorkflowJob wid = oozieClient.getJobInfo(coord.getActions().get(0).getExternalId()); + configuration.addResource(new ByteArrayInputStream(wid.getConf().getBytes())); + } else { + configuration = null; + } + + return configuration; + } } http://git-wip-us.apache.org/repos/asf/falcon/blob/237bab6e/falcon-regression/merlin/src/test/java/org/apache/falcon/regression/prism/RetentionTest.java ---------------------------------------------------------------------- diff --git a/falcon-regression/merlin/src/test/java/org/apache/falcon/regression/prism/RetentionTest.java b/falcon-regression/merlin/src/test/java/org/apache/falcon/regression/prism/RetentionTest.java index 018b83a..b677433 100644 --- a/falcon-regression/merlin/src/test/java/org/apache/falcon/regression/prism/RetentionTest.java +++ b/falcon-regression/merlin/src/test/java/org/apache/falcon/regression/prism/RetentionTest.java @@ -20,6 +20,12 @@ package org.apache.falcon.regression.prism; import org.apache.falcon.entity.v0.EntityType; +import org.apache.falcon.entity.v0.Frequency; +import org.apache.falcon.entity.v0.feed.LateArrival; +import org.apache.falcon.entity.v0.feed.Lifecycle; +import org.apache.falcon.entity.v0.feed.Properties; +import org.apache.falcon.entity.v0.feed.Property; +import org.apache.falcon.entity.v0.feed.RetentionStage; import org.apache.falcon.regression.Entities.FeedMerlin; import org.apache.falcon.regression.core.bundle.Bundle; import org.apache.falcon.regression.core.enumsAndConstants.FreqType; @@ -36,14 +42,17 @@ import org.apache.falcon.regression.core.util.OozieUtil; import org.apache.falcon.regression.core.util.TimeUtil; import org.apache.falcon.regression.core.util.Util; import org.apache.falcon.regression.testHelper.BaseTestClass; +import org.apache.hadoop.conf.Configuration; import org.apache.hadoop.fs.FileSystem; import org.apache.hadoop.fs.Path; import org.apache.hadoop.security.authentication.client.AuthenticationException; import org.apache.log4j.Logger; +import org.apache.oozie.client.CoordinatorJob; import org.apache.oozie.client.OozieClient; import org.apache.oozie.client.OozieClientException; import org.joda.time.DateTime; import org.joda.time.DateTimeZone; +import org.json.JSONException; import org.testng.Assert; import org.testng.annotations.AfterMethod; import org.testng.annotations.BeforeMethod; @@ -110,7 +119,7 @@ public class RetentionTest extends BaseTestClass { */ @Test(groups = {"0.1", "0.2", "prism"}, dataProvider = "betterDP", priority = -1) public void testRetention(final int retentionPeriod, final RetentionUnit retentionUnit, - final boolean gaps, final FreqType freqType, final boolean withData) throws Exception { + final boolean gaps, final FreqType freqType, final boolean withData) throws Exception { bundles[0].setInputFeedDataPath(testHDFSDir + freqType.getPathValue()); final FeedMerlin feedObject = new FeedMerlin(bundles[0].getInputFeedFromBundle()); feedObject.setRetentionValue(retentionUnit.getValue() + "(" + retentionPeriod + ")"); @@ -122,7 +131,7 @@ public class RetentionTest extends BaseTestClass { replenishData(freqType, gaps, withData); commonDataRetentionWorkflow(feedObject.toString(), freqType, retentionUnit, - retentionPeriod); + retentionPeriod); } else { AssertUtil.assertFailed(response); } @@ -143,7 +152,7 @@ public class RetentionTest extends BaseTestClass { } final DateTime today = new DateTime(DateTimeZone.UTC); final List<DateTime> times = TimeUtil.getDatesOnEitherSide( - freqType.addTime(today, -36), freqType.addTime(today, -1), skip, freqType); + freqType.addTime(today, -36), freqType.addTime(today, -1), skip, freqType); final List<String> dataDates = TimeUtil.convertDatesToString(times, freqType.getFormatter()); LOGGER.info("dataDates = " + dataDates); dataDates.add(HadoopUtil.SOMETHING_RANDOM); @@ -169,7 +178,7 @@ public class RetentionTest extends BaseTestClass { * @throws JMSException */ private void commonDataRetentionWorkflow(String feed, FreqType freqType, - RetentionUnit retentionUnit, int retentionPeriod) throws OozieClientException, + RetentionUnit retentionUnit, int retentionPeriod) throws OozieClientException, IOException, URISyntaxException, AuthenticationException, JMSException, InterruptedException { //get Data created in the cluster @@ -187,7 +196,7 @@ public class RetentionTest extends BaseTestClass { List<String> workflows = OozieUtil.waitForRetentionWorkflowToSucceed(bundleId, clusterOC); //get current time minus duration of last status check - to get actual time when eviction has started - final DateTime currentTime = new DateTime(new DateTime(DateTimeZone.UTC).toDate().getTime() - 10000); + final DateTime currentTime = new DateTime(DateTimeZone.UTC).minus(10000); LOGGER.info("Current time is " + new SimpleDateFormat("yyyy-MM-dd HH:mm:ss").format(currentTime.toDate())); LOGGER.info("workflows: " + workflows); messageConsumer.interrupt(); @@ -198,7 +207,7 @@ public class RetentionTest extends BaseTestClass { //now see if retention value was matched to as expected List<String> expectedOutput = filterDataOnRetention(initialData, currentTime, retentionUnit, - retentionPeriod, freqType); + retentionPeriod, freqType); LOGGER.info("initialData = " + initialData); LOGGER.info("finalData = " + finalData); LOGGER.info("expectedOutput = " + expectedOutput); @@ -207,10 +216,10 @@ public class RetentionTest extends BaseTestClass { missingData.removeAll(expectedOutput); validateDataFromFeedQueue(feedName, messageConsumer.getReceivedMessages(), missingData); Assert.assertEquals(finalData.size(), expectedOutput.size(), - "Expected and actual sizes of retained data are different! Please check."); + "Expected and actual sizes of retained data are different! Please check."); Assert.assertTrue(Arrays.deepEquals(finalData.toArray(new String[finalData.size()]), - expectedOutput.toArray(new String[expectedOutput.size()]))); + expectedOutput.toArray(new String[expectedOutput.size()]))); //check that root directory exists Assert.assertTrue(clusterFS.exists(new Path(testHDFSDir)), "Base data directory should be present."); @@ -227,7 +236,7 @@ public class RetentionTest extends BaseTestClass { * @throws JMSException */ private void validateDataFromFeedQueue(String feedName, List<MapMessage> messages, - List<String> missingData) throws OozieClientException, JMSException { + List<String> missingData) throws OozieClientException, JMSException { //just verify that each element in queue is same as deleted data! List<String> workflowIds = OozieUtil.getWorkflowJobs(clusterOC, OozieUtil.getBundles(clusterOC, feedName, EntityType.FEED).get(0)); @@ -248,10 +257,10 @@ public class RetentionTest extends BaseTestClass { } } Assert.assertEquals(deletedFolders.size(), missingData.size(), - "Output size is different than expected!"); + "Output size is different than expected!"); Assert.assertTrue(Arrays.deepEquals(missingData.toArray(new String[missingData.size()]), - deletedFolders.toArray(new String[deletedFolders.size()])), - "The missing data and message for delete operation don't correspond"); + deletedFolders.toArray(new String[deletedFolders.size()])), + "The missing data and message for delete operation don't correspond"); } /** @@ -265,7 +274,7 @@ public class RetentionTest extends BaseTestClass { * @return list of data folders which are expected to be present on cluster */ private List<String> filterDataOnRetention(List<String> inputData, DateTime currentTime, - RetentionUnit retentionUnit, int retentionPeriod, FreqType freqType) { + RetentionUnit retentionUnit, int retentionPeriod, FreqType freqType) { final List<String> finalData = new ArrayList<>(); //end date is today's date final String startLimit = freqType.getFormatter().print( @@ -289,8 +298,8 @@ public class RetentionTest extends BaseTestClass { // a negative value like -4 should be covered in validation scenarios. Integer[] retentionPeriods = new Integer[]{0, 10080, 60, 8, 24}; RetentionUnit[] retentionUnits = new RetentionUnit[]{ - RetentionUnit.HOURS, - RetentionUnit.DAYS, + RetentionUnit.HOURS, + RetentionUnit.DAYS, }; // "minutes","hours", "days", Boolean[] gaps = new Boolean[]{false, true}; FreqType[] freqTypes = new FreqType[]{FreqType.DAILY, FreqType.YEARLY, FreqType.MONTHLY}; @@ -298,4 +307,266 @@ public class RetentionTest extends BaseTestClass { return MatrixUtil.crossProduct(retentionPeriods, retentionUnits, gaps, freqTypes, withData); } + + /** + * Submit a feed having minutely lifecycle frequency. + * It would fail since lifecycle retention frequency has to be >= 1 hour. + */ + @Test + public void testTooFrequentRetentionLifecycleStage() throws Exception { + String startTime = TimeUtil.getTimeWrtSystemTime(0); + String endTime = TimeUtil.addMinsToTime(startTime, 120); + + LateArrival lateArrival = new LateArrival(); + lateArrival.setCutOff(new Frequency("1", Frequency.TimeUnit.minutes)); + + FreqType freqType = FreqType.MINUTELY; + Frequency retentionPeriodGlobal=new Frequency("30", Frequency.TimeUnit.minutes); + Frequency retentionFrequencyGlobal=new Frequency("15", Frequency.TimeUnit.minutes); + + bundles[0].setInputFeedDataPath(testHDFSDir + freqType.getPathValue()); + final FeedMerlin feedObject = new FeedMerlin(bundles[0].getInputFeedFromBundle()); + feedObject.setLateArrival(lateArrival); + feedObject.setValidity(startTime, endTime); + feedObject.setFrequency(new Frequency("minutes(10)")); + feedObject.setRetentionValue("minutes(10)"); + + feedObject.setLifecycle(createLifecycle(retentionPeriodGlobal, retentionFrequencyGlobal, + "", "", true)); + + final ServiceResponse response = prism.getFeedHelper().submitEntity(feedObject.toString()); + AssertUtil.assertFailedWithMessage(response, "Feed Retention can not be more frequent than hours(1)"); + + } + + + /** + * Submits and schedules a feed with lifecycle tag at cluster and global level. + * Responses are checked and retention is validated correspondingly. + * Uses lifecycleDPFail dataProvider to handle possible scenarios. + * + * @param globalLevel : boolean (whether lifecycle is enabled for global level or not) + * @param clusterLevel : boolean (whether lifecycle is enabled for cluster level or not) + */ + @Test(dataProvider = "lifecycleDPFail") + public void clusterGlobalNoRetentionStageTest(boolean globalLevel, boolean clusterLevel) throws Exception { + + String startTime = TimeUtil.getTimeWrtSystemTime(0); + String endTime = TimeUtil.addMinsToTime(startTime, 120); + + LateArrival lateArrival = new LateArrival(); + lateArrival.setCutOff(new Frequency("1", Frequency.TimeUnit.hours)); + + final FeedMerlin feedObject = new FeedMerlin(bundles[0].getInputFeedFromBundle()); + feedObject.setLateArrival(lateArrival); + feedObject.setValidity(startTime, endTime); + + if (globalLevel) { + feedObject.setLifecycle(new Lifecycle()); + } + if (clusterLevel) { + feedObject.getClusters().getClusters().get(0).setLifecycle(new Lifecycle()); + } + + final ServiceResponse response = prism.getFeedHelper().submitEntity(feedObject.toString()); + AssertUtil.assertFailedWithMessage(response, "Retention is a mandatory stage, didn't find it for cluster"); + + } + + @DataProvider(name = "lifecycleDPFail") + public Object[][] getLifecycleFail() { + return new Object[][]{ + {true, true}, // cluster/global : No retention stage. Should fail. + {true, false}, // global : no retention stage. Should fail. + {false, true}, // cluster : no retention stage.Should fail. + }; + } + + /** + * Submits and schedules a feed with lifecycle tag at cluster and global level. + * Responses are checked and retention is validated correspondingly. + * Uses getLifecycleWithGlobalStage dataProvider to handle possible scenarios. + * + * @param globalLevel : boolean (whether lifecycle is enabled for global level or not) + * @param clusterLevel : boolean (whether lifecycle is enabled for cluster level or not) + */ + @Test(dataProvider = "getLifecycleWithGlobalStage") + public void retentionStageFromGlobalTest(boolean globalLevel, boolean clusterLevel) throws Exception { + + String startTime = TimeUtil.getTimeWrtSystemTime(0); + String endTime = TimeUtil.addMinsToTime(startTime, 120); + + FreqType freqType = FreqType.HOURLY; + Frequency retentionPeriodGlobal=new Frequency("2", Frequency.TimeUnit.hours); + Frequency retentionFrequencyGlobal=new Frequency("1", Frequency.TimeUnit.hours); + + String priorityGlobal = "HIGH"; + String queue = "default"; + + LateArrival lateArrival = new LateArrival(); + lateArrival.setCutOff(new Frequency("1", Frequency.TimeUnit.hours)); + + bundles[0].setInputFeedDataPath(testHDFSDir + freqType.getPathValue()); + final FeedMerlin feedObject = new FeedMerlin(bundles[0].getInputFeedFromBundle()); + feedObject.setLateArrival(lateArrival); + feedObject.setValidity(startTime, endTime); + + if (globalLevel) { + feedObject.setLifecycle(createLifecycle(retentionPeriodGlobal, retentionFrequencyGlobal, + priorityGlobal, queue, true)); + } + + if (clusterLevel) { + feedObject.getClusters().getClusters().get(0).setLifecycle(new Lifecycle()); + } + + replenishData(freqType, false, false); + + final ServiceResponse response = prism.getFeedHelper().submitEntity(feedObject.toString()); + + AssertUtil.assertSucceeded(response); + commonDataRetentionWorkflow(feedObject.toString(), freqType, RetentionUnit.HOURS, + retentionPeriodGlobal.getFrequencyAsInt()); + validateFrequency(feedObject.getName(), retentionFrequencyGlobal.getFrequencyAsInt()*60); + validatePriorityAndQueue(feedObject.getName(), priorityGlobal, queue); + + } + + @DataProvider(name = "getLifecycleWithGlobalStage") + public Object[][] getLifecycleWithGlobalStage() { + return new Object[][]{ + {true, false}, // Global level lifecycle. Should pass. + {true, true}, // Cluster level : no retention stage - (pick from global). Should pass. + + }; + } + + /** + * Submits and schedules a feed with lifecycle tag at cluster and global level. + * Responses are checked and retention is validated correspondingly. + * Uses getLifecycleWithClusterStage dataProvider to handle possible scenarios. + * + * @param globalLevel : boolean (whether lifecycle is enabled for global level or not) + * @param globalWithStage : boolean (whether global lifecycle has retention stage defined or not) + */ + @Test(dataProvider = "getLifecycleWithClusterStage") + public void retentionStageFromClusterTest(boolean globalLevel, boolean globalWithStage) throws Exception { + + String startTime = TimeUtil.getTimeWrtSystemTime(0); + String endTime = TimeUtil.addMinsToTime(startTime, 120); + + FreqType freqType = FreqType.HOURLY; + Frequency retentionPeriodGlobal=new Frequency("2", Frequency.TimeUnit.hours); + Frequency retentionFrequencyGlobal=new Frequency("1", Frequency.TimeUnit.hours); + + Frequency retentionPeriodCluster=new Frequency("4", Frequency.TimeUnit.hours); + Frequency retentionFrequencyCluster=new Frequency("3", Frequency.TimeUnit.hours); + + String priorityGlobal = "HIGH"; + String priorityCluster = "LOW"; + String queue = "default"; + + LateArrival lateArrival = new LateArrival(); + lateArrival.setCutOff(new Frequency("1", Frequency.TimeUnit.hours)); + + bundles[0].setInputFeedDataPath(testHDFSDir + freqType.getPathValue()); + final FeedMerlin feedObject = new FeedMerlin(bundles[0].getInputFeedFromBundle()); + feedObject.setLateArrival(lateArrival); + feedObject.setValidity(startTime, endTime); + + if (globalLevel) { + feedObject.setLifecycle(createLifecycle(retentionPeriodGlobal, retentionFrequencyGlobal, + priorityGlobal, queue, globalWithStage)); + } + + feedObject.getClusters().getClusters().get(0).setLifecycle(createLifecycle(retentionPeriodCluster, + retentionFrequencyCluster, priorityCluster, queue, true)); + + replenishData(freqType, false, false); + + final ServiceResponse response = prism.getFeedHelper().submitEntity(feedObject.toString()); + + AssertUtil.assertSucceeded(response); + commonDataRetentionWorkflow(feedObject.toString(), freqType, RetentionUnit.HOURS, + retentionPeriodCluster.getFrequencyAsInt()); + validateFrequency(feedObject.getName(), retentionFrequencyCluster.getFrequencyAsInt()*60); + validatePriorityAndQueue(feedObject.getName(), priorityCluster, queue); + + } + + @DataProvider(name = "getLifecycleWithClusterStage") + public Object[][] getLifecycleWithClusterStage() { + return new Object[][]{ + + {true, true}, // Cluster level lifecylce. Should pass. + {false, false}, // Cluster level with no global level lifecylce. Should pass. + {true, false}, // Cluster level with empty global level lifecycle.Should pass. + + }; + } + + /** + * Method to create lifecycle tag to be used by feed for lifecycle retention. + * @param retentionPeriod : lifecycle retention period. + * @param retentionFrequency : lifecycle retention frequency. + * @param priority : lifecycle retention priority. + * @param queue : lifecycle retention queue. + */ + private Lifecycle createLifecycle(Frequency retentionPeriod, Frequency retentionFrequency, + String priority, String queue, boolean stage) { + Lifecycle lifecycle = new Lifecycle(); + if (stage) { + String LIMIT_PROPERTY_NAME = "retention.policy.agebaseddelete.limit"; + Property property = new Property(); + property.setName(LIMIT_PROPERTY_NAME); + property.setValue(retentionPeriod.getTimeUnit() + "(" + retentionPeriod.getFrequencyAsInt() + ")"); + + Properties properties = new Properties(); + properties.getProperties().add(property); + RetentionStage retentionStage = new RetentionStage(); + retentionStage.setFrequency(new Frequency(retentionFrequency.getTimeUnit() + + "(" + retentionFrequency.getFrequencyAsInt() + ")")); + + if (!priority.isEmpty()) { + retentionStage.setPriority(priority); + } + if (!queue.isEmpty()) { + retentionStage.setQueue(queue); + } + retentionStage.setProperties(properties); + lifecycle.setRetentionStage(retentionStage); + } + return lifecycle; + } + + /** + * Validates feed retention frequency with expected frequency. + * @param feedName : feed name. + * @param frequency : expected frequency. + */ + private void validateFrequency(String feedName, int frequency) + throws OozieClientException, JMSException, JSONException { + List<CoordinatorJob> coordJobs = OozieUtil.getBundleCoordinators(clusterOC, + OozieUtil.getBundles(clusterOC, feedName, EntityType.FEED).get(0)); + CoordinatorJob coordJobInfo = clusterOC.getCoordJobInfo(coordJobs.get(0).getId()); + Assert.assertEquals(coordJobInfo.getFrequency(), String.valueOf(frequency), + "Invalid retention frequency : " + frequency); + } + + /** + * Validates feed retention queue and priority with expected values. + * @param feedName : feed name. + * @param expectedPriority : expected priority. + * @param expectedQueue : expected queue. + */ + private void validatePriorityAndQueue(String feedName, String expectedPriority, String expectedQueue) + throws OozieClientException, JMSException, JSONException { + + Configuration configuration = OozieUtil.getRetentionConfiguration(clusterOC, + OozieUtil.getBundles(clusterOC, feedName, EntityType.FEED).get(0)); + String priority = configuration.get("jobPriority"); + String queue = configuration.get("queueName"); + Assert.assertEquals(priority, expectedPriority, "Priority should be : " + expectedPriority); + Assert.assertEquals(queue, expectedQueue, "Queue should be : " + expectedQueue); + } }
