http://git-wip-us.apache.org/repos/asf/falcon/blob/8e49379d/falcon-regression/merlin/src/test/java/org/apache/falcon/regression/ELValidationsTest.java ---------------------------------------------------------------------- diff --git a/falcon-regression/merlin/src/test/java/org/apache/falcon/regression/ELValidationsTest.java b/falcon-regression/merlin/src/test/java/org/apache/falcon/regression/ELValidationsTest.java deleted file mode 100644 index a0922cb..0000000 --- a/falcon-regression/merlin/src/test/java/org/apache/falcon/regression/ELValidationsTest.java +++ /dev/null @@ -1,283 +0,0 @@ -/** - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ - -package org.apache.falcon.regression; - -import org.apache.falcon.entity.v0.EntityType; -import org.apache.falcon.regression.core.bundle.Bundle; -import org.apache.falcon.regression.core.helpers.ColoHelper; -import org.apache.falcon.regression.core.util.BundleUtil; -import org.apache.falcon.regression.core.util.HadoopUtil; -import org.apache.falcon.regression.core.util.OSUtil; -import org.apache.falcon.regression.core.util.OozieUtil; -import org.apache.falcon.regression.core.util.TimeUtil; -import org.apache.falcon.regression.core.util.Util; -import org.apache.falcon.regression.testHelper.BaseTestClass; -import org.apache.log4j.Logger; -import org.apache.oozie.client.OozieClient; -import org.testng.Assert; -import org.testng.TestNGException; -import org.testng.annotations.DataProvider; -import org.testng.annotations.Test; - -import javax.xml.bind.JAXBException; -import java.io.IOException; -import java.net.URISyntaxException; -import java.text.DecimalFormat; -import java.text.ParseException; -import java.text.SimpleDateFormat; -import java.util.ArrayList; -import java.util.Arrays; -import java.util.Calendar; -import java.util.Collections; -import java.util.Date; -import java.util.List; -import java.util.TimeZone; - - -/** - * EL Validations tests. - */ -@Test(groups = "embedded") -public class ELValidationsTest extends BaseTestClass { - - private ColoHelper cluster = servers.get(0); - private static final Logger LOGGER = Logger.getLogger(ELValidationsTest.class); - private String aggregateWorkflowDir = cleanAndGetTestDir() + "/aggregator"; - - - @Test(groups = {"0.1", "0.2"}) - public void startInstBeforeFeedStartToday02() throws Exception { - String response = - testWith("2009-02-02T20:00Z", "2011-12-31T00:00Z", "2009-02-02T20:00Z", - "2011-12-31T00:00Z", "now(-40,0)", "currentYear(20,30,24,20)", false); - validate(response); - } - - @Test(groups = {"singleCluster"}) - public void startInstAfterFeedEnd() throws Exception { - String response = testWith(null, null, null, null, - "currentYear(10,0,22,0)", "now(4,20)", false); - validate(response); - } - - @Test(groups = {"singleCluster"}) - public void bothInstReverse() throws Exception { - String response = testWith(null, null, null, null, - "now(0,0)", "now(-100,0)", false); - validate(response); - } - - @Test(groups = {"singleCluster"}, dataProvider = "EL-DP") - public void expressionLanguageTest(String startInstance, String endInstance) throws Exception { - testWith(null, null, null, null, startInstance, endInstance, true); - } - - @DataProvider(name = "EL-DP") - public Object[][] getELData() { - return new Object[][]{ - {"now(-3,0)", "now(4,20)"}, - {"yesterday(22,0)", "now(4,20)"}, - {"currentMonth(0,22,0)", "now(4,20)"}, - {"lastMonth(30,22,0)", "now(4,20)"}, - {"currentYear(0,0,22,0)", "currentYear(1,1,22,0)"}, - {"currentMonth(0,22,0)", "currentMonth(1,22,20)"}, - {"lastMonth(30,22,0)", "lastMonth(60,2,40)"}, - {"lastYear(12,0,22,0)", "lastYear(13,1,22,0)"}, - }; - } - - private void validate(String response) { - if ((response.contains("End instance ") || response.contains("Start instance")) - && (response.contains("for feed") || response.contains("of feed")) - && (response.contains("is before the start of feed") - || response.contains("is after the end of feed"))) { - return; - } - if (response.contains("End instance") - && response.contains("is before the start instance")) { - return; - } - Assert.fail("Response is not valid"); - } - - private String testWith(String feedStart, - String feedEnd, String processStart, - String processEnd, - String startInstance, String endInstance, boolean isMatch) - throws IOException, JAXBException, ParseException, URISyntaxException { - HadoopUtil.uploadDir(cluster.getClusterHelper().getHadoopFS(), - aggregateWorkflowDir, OSUtil.RESOURCES_OOZIE); - Bundle bundle = BundleUtil.readELBundle(); - bundle = new Bundle(bundle, cluster.getPrefix()); - bundle.generateUniqueBundle(this); - bundle.setProcessWorkflow(aggregateWorkflowDir); - if (feedStart != null && feedEnd != null) { - bundle.setFeedValidity(feedStart, feedEnd, bundle.getInputFeedNameFromBundle()); - } - if (processStart != null && processEnd != null) { - bundle.setProcessValidity(processStart, processEnd); - } - try { - bundle.setInvalidData(); - bundle.setDatasetInstances(startInstance, endInstance); - String submitResponse = bundle.submitFeedsScheduleProcess(prism).getMessage(); - LOGGER.info("processData in try is: " + Util.prettyPrintXml(bundle.getProcessData())); - TimeUtil.sleepSeconds(45); - if (isMatch) { - getAndMatchDependencies(serverOC.get(0), bundle); - } - return submitResponse; - } catch (Exception e) { - e.printStackTrace(); - throw new TestNGException(e); - } finally { - LOGGER.info("deleting entity:"); - bundle.deleteBundle(prism); - } - } - - private void getAndMatchDependencies(OozieClient oozieClient, Bundle bundle) { - try { - List<String> bundles = null; - for (int i = 0; i < 10; ++i) { - bundles = OozieUtil.getBundles(oozieClient, bundle.getProcessName(), EntityType.PROCESS); - if (bundles.size() > 0) { - break; - } - TimeUtil.sleepSeconds(30); - } - Assert.assertTrue(bundles != null && bundles.size() > 0, "Bundle job not created."); - String coordID = bundles.get(0); - LOGGER.info("coord id: " + coordID); - List<String> missingDependencies = OozieUtil.getMissingDependencies(oozieClient, coordID); - for (int i = 0; i < 10 && missingDependencies == null; ++i) { - TimeUtil.sleepSeconds(30); - missingDependencies = OozieUtil.getMissingDependencies(oozieClient, coordID); - } - Assert.assertNotNull(missingDependencies, "Missing dependencies not found."); - for (String dependency : missingDependencies) { - LOGGER.info("dependency from job: " + dependency); - } - Date jobNominalTime = OozieUtil.getNominalTime(oozieClient, coordID); - Calendar time = Calendar.getInstance(); - time.setTime(jobNominalTime); - LOGGER.info("nominalTime:" + jobNominalTime); - SimpleDateFormat df = new SimpleDateFormat("dd MMM yyyy HH:mm:ss"); - LOGGER.info( - "nominalTime in GMT string: " + df.format(jobNominalTime.getTime()) + " GMT"); - TimeZone z = time.getTimeZone(); - int offset = z.getRawOffset(); - int offsetHrs = offset / 1000 / 60 / 60; - int offsetMins = offset / 1000 / 60 % 60; - - LOGGER.info("offset: " + offsetHrs); - LOGGER.info("offset: " + offsetMins); - - time.add(Calendar.HOUR_OF_DAY, (-offsetHrs)); - time.add(Calendar.MINUTE, (-offsetMins)); - - LOGGER.info("GMT Time: " + time.getTime()); - - int frequency = bundle.getInitialDatasetFrequency(); - List<String> qaDependencyList = - getQADepedencyList(time, bundle.getStartInstanceProcess(time), - bundle.getEndInstanceProcess(time), - frequency, bundle); - for (String qaDependency : qaDependencyList) { - LOGGER.info("qa qaDependencyList: " + qaDependency); - } - - Assert.assertTrue(matchDependencies(missingDependencies, qaDependencyList)); - } catch (Exception e) { - e.printStackTrace(); - throw new TestNGException(e); - } - } - - private boolean matchDependencies(List<String> fromJob, List<String> qaList) { - if (fromJob.size() != qaList.size()) { - return false; - } - Collections.sort(fromJob); - Collections.sort(qaList); - for (int index = 0; index < fromJob.size(); index++) { - if (!fromJob.get(index).contains(qaList.get(index))) { - return false; - } - } - return true; - } - - private List<String> getQADepedencyList(Calendar nominalTime, Date startRef, - Date endRef, int frequency, Bundle bundle) { - LOGGER.info("start ref:" + startRef); - LOGGER.info("end ref:" + endRef); - Calendar initialTime = Calendar.getInstance(); - initialTime.setTime(startRef); - Calendar finalTime = Calendar.getInstance(); - - finalTime.setTime(endRef); - String path = bundle.getDatasetPath(); - - TimeZone tz = TimeZone.getTimeZone("GMT"); - nominalTime.setTimeZone(tz); - LOGGER.info("nominalTime: " + initialTime.getTime()); - LOGGER.info("finalTime: " + finalTime.getTime()); - List<String> returnList = new ArrayList<>(); - while (initialTime.getTime().before(finalTime.getTime())) { - LOGGER.info("initialTime: " + initialTime.getTime()); - returnList.add(getPath(path, initialTime)); - initialTime.add(Calendar.MINUTE, frequency); - } - returnList.add(getPath(path, initialTime)); - Collections.reverse(returnList); - return returnList; - } - - private String getPath(String path, Calendar time) { - if (path.contains("${YEAR}")) { - path = path.replaceAll("\\$\\{YEAR\\}", Integer.toString(time.get(Calendar.YEAR))); - } - if (path.contains("${MONTH}")) { - path = path.replaceAll("\\$\\{MONTH\\}", intToString(time.get(Calendar.MONTH) + 1, 2)); - } - if (path.contains("${DAY}")) { - path = path.replaceAll("\\$\\{DAY\\}", intToString(time.get(Calendar.DAY_OF_MONTH), 2)); - } - if (path.contains("${HOUR}")) { - path = path.replaceAll("\\$\\{HOUR\\}", intToString(time.get(Calendar.HOUR_OF_DAY), 2)); - } - if (path.contains("${MINUTE}")) { - path = path.replaceAll("\\$\\{MINUTE\\}", intToString(time.get(Calendar.MINUTE), 2)); - } - return path; - } - - private String intToString(int num, int digits) { - assert digits > 0 : "Invalid number of digits"; - - // create variable length array of zeros - char[] zeros = new char[digits]; - Arrays.fill(zeros, '0'); - - // format number as String - DecimalFormat df = new DecimalFormat(String.valueOf(zeros)); - return df.format(num); - } -}
http://git-wip-us.apache.org/repos/asf/falcon/blob/8e49379d/falcon-regression/merlin/src/test/java/org/apache/falcon/regression/EmbeddedPigScriptTest.java ---------------------------------------------------------------------- diff --git a/falcon-regression/merlin/src/test/java/org/apache/falcon/regression/EmbeddedPigScriptTest.java b/falcon-regression/merlin/src/test/java/org/apache/falcon/regression/EmbeddedPigScriptTest.java deleted file mode 100644 index c49c381..0000000 --- a/falcon-regression/merlin/src/test/java/org/apache/falcon/regression/EmbeddedPigScriptTest.java +++ /dev/null @@ -1,164 +0,0 @@ -/** - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ - -package org.apache.falcon.regression; - -import org.apache.falcon.entity.v0.EntityType; -import org.apache.falcon.entity.v0.Frequency.TimeUnit; -import org.apache.falcon.entity.v0.process.EngineType; -import org.apache.falcon.regression.Entities.ProcessMerlin; -import org.apache.falcon.regression.core.bundle.Bundle; -import org.apache.falcon.regression.core.enumsAndConstants.ResponseErrors; -import org.apache.falcon.regression.core.helpers.ColoHelper; -import org.apache.falcon.regression.core.response.ServiceResponse; -import org.apache.falcon.regression.core.util.AssertUtil; -import org.apache.falcon.regression.core.util.BundleUtil; -import org.apache.falcon.regression.core.util.HadoopUtil; -import org.apache.falcon.regression.core.util.InstanceUtil; -import org.apache.falcon.regression.core.util.OSUtil; -import org.apache.falcon.regression.core.util.OozieUtil; -import org.apache.falcon.regression.core.util.TimeUtil; -import org.apache.falcon.regression.core.util.Util; -import org.apache.falcon.regression.testHelper.BaseTestClass; -import org.apache.falcon.resource.InstancesResult; -import org.apache.falcon.resource.InstancesResult.WorkflowStatus; -import org.apache.hadoop.fs.FileSystem; -import org.apache.log4j.Logger; -import org.apache.oozie.client.Job; -import org.apache.oozie.client.OozieClient; -import org.testng.Assert; -import org.testng.annotations.AfterMethod; -import org.testng.annotations.BeforeClass; -import org.testng.annotations.BeforeMethod; -import org.testng.annotations.Test; - -import java.util.List; - -/** - * Embedded pig script test. - */ -@Test(groups = "embedded") -public class EmbeddedPigScriptTest extends BaseTestClass { - - private ColoHelper cluster = servers.get(0); - private FileSystem clusterFS = serverFS.get(0); - private OozieClient clusterOC = serverOC.get(0); - private String pigTestDir = cleanAndGetTestDir(); - private String pigScriptDir = pigTestDir + "/pig"; - private String pigScriptLocation = pigScriptDir + "/id.pig"; - private String inputPath = pigTestDir + "/input" + MINUTE_DATE_PATTERN; - private static final Logger LOGGER = Logger.getLogger(EmbeddedPigScriptTest.class); - private static final double TIMEOUT = 15; - private String processName; - private String process; - - @BeforeClass(alwaysRun = true) - public void createTestData() throws Exception { - LOGGER.info("in @BeforeClass"); - - //copy pig script - HadoopUtil.uploadDir(clusterFS, pigScriptDir, OSUtil.concat(OSUtil.RESOURCES, "pig")); - Bundle bundle = BundleUtil.readELBundle(); - bundle.generateUniqueBundle(this); - bundle = new Bundle(bundle, cluster); - String startDate = "2010-01-02T00:40Z"; - String endDate = "2010-01-02T01:10Z"; - bundle.setInputFeedDataPath(inputPath); - List<String> dataDates = TimeUtil.getMinuteDatesOnEitherSide(startDate, endDate, 20); - HadoopUtil.flattenAndPutDataInFolder(clusterFS, OSUtil.NORMAL_INPUT, - bundle.getFeedDataPathPrefix(), dataDates); - } - - @BeforeMethod(alwaysRun = true) - public void setUp() throws Exception { - bundles[0] = BundleUtil.readELBundle(); - bundles[0] = new Bundle(bundles[0], cluster); - bundles[0].generateUniqueBundle(this); - bundles[0].setInputFeedDataPath(inputPath); - bundles[0].setOutputFeedLocationData(pigTestDir + "/output-data" + MINUTE_DATE_PATTERN); - bundles[0].setProcessWorkflow(pigScriptLocation); - bundles[0].setProcessInputNames("INPUT"); - bundles[0].setProcessOutputNames("OUTPUT"); - bundles[0].setProcessValidity("2010-01-02T01:00Z", "2010-01-02T01:10Z"); - bundles[0].setProcessPeriodicity(5, TimeUnit.minutes); - bundles[0].setOutputFeedPeriodicity(5, TimeUnit.minutes); - - final ProcessMerlin processElement = bundles[0].getProcessObject(); - processElement.clearProperties().withProperty("queueName", "default"); - processElement.getWorkflow().setEngine(EngineType.PIG); - bundles[0].setProcessData(processElement.toString()); - bundles[0].submitFeedsScheduleProcess(prism); - process = bundles[0].getProcessData(); - processName = Util.readEntityName(process); - } - - @AfterMethod(alwaysRun = true) - public void tearDown() { - removeTestClassEntities(); - } - - @Test(groups = {"singleCluster"}, timeOut = 600000) - public void getResumedProcessInstance() throws Exception { - AssertUtil.checkStatus(clusterOC, EntityType.PROCESS, process, Job.Status.RUNNING); - prism.getProcessHelper().suspend(process); - TimeUtil.sleepSeconds(TIMEOUT); - ServiceResponse status = prism.getProcessHelper().getStatus(process); - Assert.assertTrue(status.getMessage().contains("SUSPENDED"), "Process not suspended."); - prism.getProcessHelper().resume(process); - TimeUtil.sleepSeconds(TIMEOUT); - AssertUtil.checkStatus(clusterOC, EntityType.PROCESS, process, Job.Status.RUNNING); - InstancesResult r = prism.getProcessHelper().getRunningInstance(processName); - InstanceUtil.validateSuccess(r, bundles[0], WorkflowStatus.RUNNING); - } - - @Test(groups = {"singleCluster"}, timeOut = 600000) - public void getSuspendedProcessInstance() throws Exception { - prism.getProcessHelper().suspend(process); - TimeUtil.sleepSeconds(TIMEOUT); - AssertUtil.checkStatus(clusterOC, EntityType.PROCESS, process, Job.Status.SUSPENDED); - InstancesResult r = prism.getProcessHelper().getRunningInstance(processName); - InstanceUtil.validateSuccessWOInstances(r); - } - - @Test(groups = {"singleCluster"}, timeOut = 600000) - public void getRunningProcessInstance() throws Exception { - AssertUtil.checkStatus(clusterOC, EntityType.PROCESS, process, Job.Status.RUNNING); - TimeUtil.sleepSeconds(TIMEOUT); - InstancesResult r = prism.getProcessHelper().getRunningInstance(processName); - InstanceUtil.validateSuccess(r, bundles[0], WorkflowStatus.RUNNING); - } - - @Test(groups = {"singleCluster"}, timeOut = 600000) - public void getKilledProcessInstance() throws Exception { - prism.getProcessHelper().delete(process); - TimeUtil.sleepSeconds(TIMEOUT); - InstancesResult r = prism.getProcessHelper().getRunningInstance(processName); - InstanceUtil.validateError(r, ResponseErrors.PROCESS_NOT_FOUND); - } - - @Test(groups = {"singleCluster"}, timeOut = 6000000) - public void getSucceededProcessInstance() throws Exception { - AssertUtil.checkStatus(clusterOC, EntityType.PROCESS, process, Job.Status.RUNNING); - InstancesResult r = prism.getProcessHelper().getRunningInstance(processName); - InstanceUtil.validateSuccess(r, bundles[0], WorkflowStatus.RUNNING); - int counter = OSUtil.IS_WINDOWS ? 100 : 50; - OozieUtil.waitForBundleToReachState(clusterOC, bundles[0].getProcessName(), Job.Status.SUCCEEDED, counter); - r = prism.getProcessHelper().getRunningInstance(processName); - InstanceUtil.validateSuccessWOInstances(r); - } -} http://git-wip-us.apache.org/repos/asf/falcon/blob/8e49379d/falcon-regression/merlin/src/test/java/org/apache/falcon/regression/ExternalFSTest.java ---------------------------------------------------------------------- diff --git a/falcon-regression/merlin/src/test/java/org/apache/falcon/regression/ExternalFSTest.java b/falcon-regression/merlin/src/test/java/org/apache/falcon/regression/ExternalFSTest.java deleted file mode 100644 index 728b797..0000000 --- a/falcon-regression/merlin/src/test/java/org/apache/falcon/regression/ExternalFSTest.java +++ /dev/null @@ -1,219 +0,0 @@ -/** - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ - -package org.apache.falcon.regression; - -import org.apache.commons.lang.StringUtils; -import org.apache.falcon.entity.v0.EntityType; -import org.apache.falcon.entity.v0.cluster.Interfacetype; -import org.apache.falcon.entity.v0.feed.ActionType; -import org.apache.falcon.entity.v0.feed.ClusterType; -import org.apache.falcon.regression.Entities.FeedMerlin; -import org.apache.falcon.regression.core.bundle.Bundle; -import org.apache.falcon.regression.core.enumsAndConstants.MerlinConstants; -import org.apache.falcon.regression.core.helpers.ColoHelper; -import org.apache.falcon.regression.core.util.AssertUtil; -import org.apache.falcon.regression.core.util.BundleUtil; -import org.apache.falcon.regression.core.util.HadoopUtil; -import org.apache.falcon.regression.core.util.InstanceUtil; -import org.apache.falcon.regression.core.util.MatrixUtil; -import org.apache.falcon.regression.core.util.OSUtil; -import org.apache.falcon.regression.core.util.OozieUtil; -import org.apache.falcon.regression.core.util.TimeUtil; -import org.apache.falcon.regression.core.util.Util; -import org.apache.falcon.regression.testHelper.BaseTestClass; -import org.apache.hadoop.conf.Configuration; -import org.apache.hadoop.fs.ContentSummary; -import org.apache.hadoop.fs.FileSystem; -import org.apache.hadoop.fs.Path; -import org.apache.log4j.Logger; -import org.apache.oozie.client.CoordinatorAction; -import org.apache.oozie.client.OozieClient; -import org.joda.time.DateTime; -import org.joda.time.DateTimeZone; -import org.joda.time.format.DateTimeFormat; -import org.joda.time.format.DateTimeFormatter; -import org.testng.Assert; -import org.testng.annotations.AfterClass; -import org.testng.annotations.AfterMethod; -import org.testng.annotations.BeforeClass; -import org.testng.annotations.BeforeMethod; -import org.testng.annotations.DataProvider; -import org.testng.annotations.Test; - -import javax.xml.bind.JAXBException; -import java.io.IOException; -import java.util.List; -import java.util.UUID; - -/** - * Tests for operations with external file systems. - */ -@Test(groups = "embedded") -public class ExternalFSTest extends BaseTestClass{ - - public static final String WASB_END_POINT = - "wasb://" + MerlinConstants.WASB_CONTAINER + "@" + MerlinConstants.WASB_ACCOUNT; - private ColoHelper cluster = servers.get(0); - private FileSystem clusterFS = serverFS.get(0); - private OozieClient clusterOC = serverOC.get(0); - private FileSystem wasbFS; - private Bundle externalBundle; - - private String baseTestDir = cleanAndGetTestDir(); - private String sourcePath = baseTestDir + "/source"; - private String baseWasbDir = "/falcon-regression/" + UUID.randomUUID().toString().split("-")[0]; - private String testWasbTargetDir = baseWasbDir + '/' - + UUID.randomUUID().toString().split("-")[0] + '/'; - - private static final Logger LOGGER = Logger.getLogger(ExternalFSTest.class); - - @BeforeClass - public void setUpClass() throws IOException { - HadoopUtil.recreateDir(clusterFS, baseTestDir); - Configuration conf = new Configuration(); - conf.set("fs.defaultFS", WASB_END_POINT); - conf.set("fs.azure.account.key." + MerlinConstants.WASB_ACCOUNT, - MerlinConstants.WASB_SECRET); - conf.setBoolean("fs.hdfs.impl.disable.cache", false); - wasbFS = FileSystem.get(conf); - LOGGER.info("creating base wasb dir" + baseWasbDir); - } - - @BeforeMethod(alwaysRun = true) - public void setUp() throws JAXBException, IOException { - Bundle bundle = BundleUtil.readFeedReplicationBundle(); - - bundles[0] = new Bundle(bundle, cluster); - externalBundle = new Bundle(bundle, cluster); - - bundles[0].generateUniqueBundle(this); - externalBundle.generateUniqueBundle(this); - - LOGGER.info("checking wasb credentials with location: " + testWasbTargetDir); - wasbFS.create(new Path(testWasbTargetDir)); - wasbFS.delete(new Path(testWasbTargetDir), true); - } - - @AfterMethod - public void tearDown() throws IOException { - removeTestClassEntities(); - wasbFS.delete(new Path(testWasbTargetDir), true); - } - - @AfterClass(alwaysRun = true) - public void tearDownClass() throws IOException { - wasbFS.delete(new Path(baseWasbDir), true); - } - - - @Test(dataProvider = "getInvalidTargets") - public void invalidCredentialsExtFS(String endpoint) throws Exception { - bundles[0].setClusterInterface(Interfacetype.READONLY, endpoint); - bundles[0].setClusterInterface(Interfacetype.WRITE, endpoint); - - AssertUtil.assertFailed(prism.getClusterHelper() - .submitEntity(bundles[0].getClusterElement().toString())); - - } - - @Test(dataProvider = "getData") - public void replicateToExternalFS(final FileSystem externalFS, - final String separator, final boolean withData) throws Exception { - final String endpoint = externalFS.getUri().toString(); - Bundle.submitCluster(bundles[0], externalBundle); - String startTime = TimeUtil.getTimeWrtSystemTime(0); - String endTime = TimeUtil.addMinsToTime(startTime, 5); - LOGGER.info("Time range between : " + startTime + " and " + endTime); - String datePattern = StringUtils .join( - new String[]{"${YEAR}", "${MONTH}", "${DAY}", "${HOUR}", "${MINUTE}"}, separator); - - //configure feed - FeedMerlin feed = new FeedMerlin(bundles[0].getDataSets().get(0)); - String targetDataLocation = endpoint + testWasbTargetDir + datePattern; - feed.setFilePath(sourcePath + '/' + datePattern); - //erase all clusters from feed definition - feed.clearFeedClusters(); - //set local cluster as source - feed.addFeedCluster( - new FeedMerlin.FeedClusterBuilder(Util.readEntityName(bundles[0].getClusters().get(0))) - .withRetention("days(1000000)", ActionType.DELETE) - .withValidity(startTime, endTime) - .withClusterType(ClusterType.SOURCE) - .build()); - //set externalFS cluster as target - feed.addFeedCluster( - new FeedMerlin.FeedClusterBuilder(Util.readEntityName(externalBundle.getClusters().get(0))) - .withRetention("days(1000000)", ActionType.DELETE) - .withValidity(startTime, endTime) - .withClusterType(ClusterType.TARGET) - .withDataLocation(targetDataLocation) - .build()); - - //submit and schedule feed - LOGGER.info("Feed : " + Util.prettyPrintXml(feed.toString())); - AssertUtil.assertSucceeded(prism.getFeedHelper().submitAndSchedule(feed.toString())); - datePattern = StringUtils.join(new String[]{"yyyy", "MM", "dd", "HH", "mm"}, separator); - //upload necessary data - DateTime date = new DateTime(startTime, DateTimeZone.UTC); - DateTimeFormatter fmt = DateTimeFormat.forPattern(datePattern); - String timePattern = fmt.print(date); - HadoopUtil.recreateDir(clusterFS, sourcePath + '/' + timePattern); - if (withData) { - HadoopUtil.copyDataToFolder(clusterFS, sourcePath + '/' + timePattern, OSUtil.SINGLE_FILE); - } - - Path srcPath = new Path(sourcePath + '/' + timePattern); - Path dstPath = new Path(endpoint + testWasbTargetDir + '/' + timePattern); - - //check if coordinator exists - TimeUtil.sleepSeconds(10); - InstanceUtil.waitTillInstancesAreCreated(clusterOC, feed.toString(), 0); - Assert.assertEquals(OozieUtil.checkIfFeedCoordExist(clusterOC, feed.getName(), "REPLICATION"), 1); - - //replication should start, wait while it ends - InstanceUtil.waitTillInstanceReachState(clusterOC, Util.readEntityName(feed.toString()), 1, - CoordinatorAction.Status.SUCCEEDED, EntityType.FEED); - - //check if data has been replicated correctly - List<Path> cluster1ReplicatedData = - HadoopUtil.getAllFilesRecursivelyHDFS(clusterFS, srcPath); - List<Path> cluster2ReplicatedData = - HadoopUtil.getAllFilesRecursivelyHDFS(externalFS, dstPath); - AssertUtil.checkForListSizes(cluster1ReplicatedData, cluster2ReplicatedData); - final ContentSummary srcSummary = clusterFS.getContentSummary(srcPath); - final ContentSummary dstSummary = externalFS.getContentSummary(dstPath); - Assert.assertEquals(dstSummary.getLength(), srcSummary.getLength()); - } - - - - @DataProvider - public Object[][] getData() { - //"-" for single directory, "/" - for dir with subdirs }; - return MatrixUtil.crossProduct(new FileSystem[]{wasbFS}, - new String[]{"/", "-"}, - new Boolean[]{true, false}); - } - - @DataProvider - public Object[][] getInvalidTargets() { - return new Object[][]{{"wasb://[email protected]/"}}; - } - -} http://git-wip-us.apache.org/repos/asf/falcon/blob/8e49379d/falcon-regression/merlin/src/test/java/org/apache/falcon/regression/FeedClusterUpdateTest.java ---------------------------------------------------------------------- diff --git a/falcon-regression/merlin/src/test/java/org/apache/falcon/regression/FeedClusterUpdateTest.java b/falcon-regression/merlin/src/test/java/org/apache/falcon/regression/FeedClusterUpdateTest.java deleted file mode 100644 index feb0cc1..0000000 --- a/falcon-regression/merlin/src/test/java/org/apache/falcon/regression/FeedClusterUpdateTest.java +++ /dev/null @@ -1,678 +0,0 @@ -/** - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ - -package org.apache.falcon.regression; - -import org.apache.falcon.regression.Entities.FeedMerlin; -import org.apache.falcon.regression.core.bundle.Bundle; -import org.apache.falcon.entity.v0.feed.ActionType; -import org.apache.falcon.entity.v0.feed.ClusterType; -import org.apache.falcon.regression.core.helpers.ColoHelper; -import org.apache.falcon.regression.core.response.ServiceResponse; -import org.apache.falcon.regression.core.util.AssertUtil; -import org.apache.falcon.regression.core.util.BundleUtil; -import org.apache.falcon.regression.core.util.HadoopUtil; -import org.apache.falcon.regression.core.util.OSUtil; -import org.apache.falcon.regression.core.util.OozieUtil; -import org.apache.falcon.regression.core.util.TimeUtil; -import org.apache.falcon.regression.core.util.Util; -import org.apache.falcon.regression.core.util.XmlUtil; -import org.apache.falcon.regression.testHelper.BaseTestClass; -import org.apache.hadoop.fs.FileSystem; -import org.apache.log4j.Logger; -import org.apache.oozie.client.OozieClient; -import org.testng.Assert; -import org.testng.annotations.AfterMethod; -import org.testng.annotations.BeforeClass; -import org.testng.annotations.BeforeMethod; -import org.testng.annotations.Test; - - -/** - * Feed cluster update tests. - */ -@Test(groups = "distributed") -public class FeedClusterUpdateTest extends BaseTestClass { - - private String baseTestDir = cleanAndGetTestDir(); - private String aggregateWorkflowDir = baseTestDir + "/aggregator"; - private ColoHelper cluster1 = servers.get(0); - private ColoHelper cluster2 = servers.get(1); - private ColoHelper cluster3 = servers.get(2); - private OozieClient cluster1OC = serverOC.get(0); - private OozieClient cluster2OC = serverOC.get(1); - private OozieClient cluster3OC = serverOC.get(2); - private FileSystem cluster2FS = serverFS.get(1); - private FileSystem cluster3FS = serverFS.get(2); - private String feed; - private String feedName; - private String startTime; - private String feedOriginalSubmit; - private String feedUpdated; - private String cluster1Name; - private String cluster2Name; - private String cluster3Name; - private static final Logger LOGGER = Logger.getLogger(FeedClusterUpdateTest.class); - - - @BeforeClass(alwaysRun = true) - public void createTestData() throws Exception { - uploadDirToClusters(aggregateWorkflowDir, OSUtil.RESOURCES_OOZIE); - Bundle bundle = BundleUtil.readELBundle(); - for (int i = 0; i < 3; i++) { - bundles[i] = new Bundle(bundle, servers.get(i)); - bundles[i].generateUniqueBundle(this); - bundles[i].setProcessWorkflow(aggregateWorkflowDir); - } - try { - String postFix = "/US/" + servers.get(1).getClusterHelper().getColoName(); - HadoopUtil.deleteDirIfExists(baseTestDir, cluster2FS); - HadoopUtil.lateDataReplenish(cluster2FS, 80, 1, baseTestDir, postFix); - postFix = "/UK/" + servers.get(2).getClusterHelper().getColoName(); - HadoopUtil.deleteDirIfExists(baseTestDir, cluster3FS); - HadoopUtil.lateDataReplenish(cluster3FS, 80, 1, baseTestDir, postFix); - } finally { - removeTestClassEntities(); - } - } - - @BeforeMethod(alwaysRun = true) - public void setup() throws Exception { - Bundle bundle = BundleUtil.readELBundle(); - for (int i = 0; i < 3; i++) { - bundles[i] = new Bundle(bundle, servers.get(i)); - bundles[i].generateUniqueBundle(this); - bundles[i].setProcessWorkflow(aggregateWorkflowDir); - } - BundleUtil.submitAllClusters(prism, bundles[0], bundles[1], bundles[2]); - feed = bundles[0].getDataSets().get(0); - feed = FeedMerlin.fromString(feed).clearFeedClusters().toString(); - startTime = TimeUtil.getTimeWrtSystemTime(-50); - feedName = Util.readEntityName(feed); - cluster1Name = Util.readEntityName(bundles[0].getClusters().get(0)); - cluster2Name = Util.readEntityName(bundles[1].getClusters().get(0)); - cluster3Name = Util.readEntityName(bundles[2].getClusters().get(0)); - } - - @AfterMethod(alwaysRun = true) - public void tearDown() { - removeTestClassEntities(); - } - - @Test(enabled = true, groups = {"multiCluster"}) - public void addSourceCluster() throws Exception { - //add one source and one target , schedule only on source - feedOriginalSubmit = FeedMerlin.fromString(feed) - .addFeedCluster(new FeedMerlin.FeedClusterBuilder(cluster2Name) - .withRetention("hours(10)", ActionType.DELETE) - .withValidity(startTime, TimeUtil.addMinsToTime(startTime, 65)) - .withClusterType(ClusterType.SOURCE) - .build()) - .toString(); - feedOriginalSubmit = FeedMerlin.fromString(feedOriginalSubmit).addFeedCluster( - new FeedMerlin.FeedClusterBuilder(cluster1Name) - .withRetention("hours(10)", ActionType.DELETE) - .withValidity(TimeUtil.addMinsToTime(startTime, 20), - TimeUtil.addMinsToTime(startTime, 85)) - .withClusterType(ClusterType.TARGET) - .build()) - .toString(); - - LOGGER.info("Feed: " + Util.prettyPrintXml(feedOriginalSubmit)); - ServiceResponse response = prism.getFeedHelper().submitEntity(feedOriginalSubmit); - TimeUtil.sleepSeconds(10); - AssertUtil.assertSucceeded(response); - - //schedule on source - response = cluster2.getFeedHelper().schedule(feedOriginalSubmit); - TimeUtil.sleepSeconds(20); - AssertUtil.assertSucceeded(response); - - Assert.assertEquals(OozieUtil.checkIfFeedCoordExist(cluster2OC, feedName, "REPLICATION"), 0); - Assert.assertEquals(OozieUtil.checkIfFeedCoordExist(cluster2OC, feedName, "RETENTION"), 1); - Assert.assertEquals(OozieUtil.checkIfFeedCoordExist(cluster3OC, feedName, "REPLICATION"), 0); - Assert.assertEquals(OozieUtil.checkIfFeedCoordExist(cluster3OC, feedName, "RETENTION"), 0); - Assert.assertEquals(OozieUtil.checkIfFeedCoordExist(cluster1OC, feedName, "REPLICATION"), 0); - Assert.assertEquals(OozieUtil.checkIfFeedCoordExist(cluster1OC, feedName, "RETENTION"), 0); - - //prepare updated Feed - feedUpdated = FeedMerlin.fromString(feed) - .addFeedCluster(new FeedMerlin.FeedClusterBuilder(cluster2Name) - .withRetention("hours(10)", ActionType.DELETE) - .withValidity(startTime, TimeUtil.addMinsToTime(startTime, 65)) - .withClusterType(ClusterType.SOURCE) - .withPartition("UK/${cluster.colo}") - .build()) - .toString(); - feedUpdated = FeedMerlin.fromString(feedUpdated).addFeedCluster( - new FeedMerlin.FeedClusterBuilder(cluster1Name) - .withRetention("hours(10)", ActionType.DELETE) - .withValidity(TimeUtil.addMinsToTime(startTime, 20), - TimeUtil.addMinsToTime(startTime, 85)) - .withClusterType(ClusterType.TARGET) - .build()) - .toString(); - feedUpdated = FeedMerlin.fromString(feedUpdated).addFeedCluster( - new FeedMerlin.FeedClusterBuilder(cluster3Name) - .withRetention("hours(10)", ActionType.DELETE) - .withValidity(TimeUtil.addMinsToTime(startTime, 40), - TimeUtil.addMinsToTime(startTime, 110)) - .withClusterType(ClusterType.SOURCE) - .withPartition("UK/${cluster.colo}") - .build()) - .toString(); - - response = prism.getFeedHelper().update(feedUpdated, feedUpdated); - TimeUtil.sleepSeconds(20); - AssertUtil.assertSucceeded(response); - - prism.getFeedHelper().submitAndSchedule(feedUpdated); - Assert.assertEquals(OozieUtil.checkIfFeedCoordExist(cluster2OC, feedName, "REPLICATION"), 0); - Assert.assertEquals(OozieUtil.checkIfFeedCoordExist(cluster2OC, feedName, "RETENTION"), 2); - Assert.assertEquals(OozieUtil.checkIfFeedCoordExist(cluster3OC, feedName, "REPLICATION"), 0); - Assert.assertEquals(OozieUtil.checkIfFeedCoordExist(cluster3OC, feedName, "RETENTION"), 1); - Assert.assertEquals(OozieUtil.checkIfFeedCoordExist(cluster1OC, feedName, "REPLICATION"), 2); - Assert.assertEquals(OozieUtil.checkIfFeedCoordExist(cluster1OC, feedName, "RETENTION"), 1); - } - - @Test(enabled = true, groups = {"multiCluster"}) - public void addTargetCluster() throws Exception { - //add one source and one target , schedule only on source - feedOriginalSubmit = FeedMerlin.fromString(feed) - .addFeedCluster(new FeedMerlin.FeedClusterBuilder(cluster2Name) - .withRetention("hours(10)", ActionType.DELETE) - .withValidity(startTime, TimeUtil.addMinsToTime(startTime, 65)) - .withClusterType(ClusterType.SOURCE) - .build()) - .toString(); - feedOriginalSubmit = FeedMerlin.fromString(feedOriginalSubmit).addFeedCluster( - new FeedMerlin.FeedClusterBuilder(cluster3Name) - .withRetention("hours(10)", ActionType.DELETE) - .withValidity(TimeUtil.addMinsToTime(startTime, 40), - TimeUtil.addMinsToTime(startTime, 110)) - .withClusterType(ClusterType.SOURCE) - .withPartition("UK/${cluster.colo}") - .build()) - .toString(); - - LOGGER.info("Feed: " + Util.prettyPrintXml(feedOriginalSubmit)); - ServiceResponse response = prism.getFeedHelper().submitEntity(feedOriginalSubmit); - TimeUtil.sleepSeconds(10); - AssertUtil.assertSucceeded(response); - - //schedule on source - response = cluster2.getFeedHelper().schedule(feedOriginalSubmit); - TimeUtil.sleepSeconds(20); - AssertUtil.assertSucceeded(response); - - Assert.assertEquals(OozieUtil.checkIfFeedCoordExist(cluster2OC, feedName, "REPLICATION"), 0); - Assert.assertEquals(OozieUtil.checkIfFeedCoordExist(cluster2OC, feedName, "RETENTION"), 1); - Assert.assertEquals(OozieUtil.checkIfFeedCoordExist(cluster3OC, feedName, "REPLICATION"), 0); - Assert.assertEquals(OozieUtil.checkIfFeedCoordExist(cluster3OC, feedName, "RETENTION"), 0); - Assert.assertEquals(OozieUtil.checkIfFeedCoordExist(cluster1OC, feedName, "REPLICATION"), 0); - Assert.assertEquals(OozieUtil.checkIfFeedCoordExist(cluster1OC, feedName, "RETENTION"), 0); - - //prepare updated Feed - feedUpdated = FeedMerlin.fromString(feed) - .addFeedCluster(new FeedMerlin.FeedClusterBuilder(cluster2Name) - .withRetention("hours(10)", ActionType.DELETE) - .withValidity(startTime, TimeUtil.addMinsToTime(startTime, 65)) - .withClusterType(ClusterType.SOURCE) - .withPartition("US/${cluster.colo}") - .build()) - .toString(); - feedUpdated = FeedMerlin.fromString(feedUpdated).addFeedCluster( - new FeedMerlin.FeedClusterBuilder(cluster1Name) - .withRetention("hours(10)", ActionType.DELETE) - .withValidity(TimeUtil.addMinsToTime(startTime, 20), - TimeUtil.addMinsToTime(startTime, 85)) - .withClusterType(ClusterType.TARGET) - .build()) - .toString(); - feedUpdated = FeedMerlin.fromString(feedUpdated).addFeedCluster( - new FeedMerlin.FeedClusterBuilder(cluster3Name) - .withRetention("hours(10)", ActionType.DELETE) - .withValidity(TimeUtil.addMinsToTime(startTime, 40), - TimeUtil.addMinsToTime(startTime, 110)) - .withClusterType(ClusterType.SOURCE) - .withPartition("UK/${cluster.colo}") - .build()) - .toString(); - - LOGGER.info("Updated Feed: " + Util.prettyPrintXml(feedUpdated)); - response = prism.getFeedHelper().update(feedUpdated, feedUpdated); - TimeUtil.sleepSeconds(20); - AssertUtil.assertSucceeded(response); - - prism.getFeedHelper().submitAndSchedule(feedUpdated); - - Assert.assertEquals(OozieUtil.checkIfFeedCoordExist(cluster2OC, feedName, "REPLICATION"), 0); - Assert.assertEquals(OozieUtil.checkIfFeedCoordExist(cluster2OC, feedName, "RETENTION"), 2); - Assert.assertEquals(OozieUtil.checkIfFeedCoordExist(cluster3OC, feedName, "REPLICATION"), 0); - Assert.assertEquals(OozieUtil.checkIfFeedCoordExist(cluster3OC, feedName, "RETENTION"), 1); - Assert.assertEquals(OozieUtil.checkIfFeedCoordExist(cluster1OC, feedName, "REPLICATION"), 2); - Assert.assertEquals(OozieUtil.checkIfFeedCoordExist(cluster1OC, feedName, "RETENTION"), 1); - } - - @Test(enabled = true, groups = {"multiCluster"}) - public void add2SourceCluster() throws Exception { - //add one source , schedule only on source - feedOriginalSubmit = FeedMerlin.fromString(feed) - .addFeedCluster(new FeedMerlin.FeedClusterBuilder(cluster2Name) - .withRetention("hours(10)", ActionType.DELETE) - .withValidity(startTime, TimeUtil.addMinsToTime(startTime, 65)) - .withClusterType(ClusterType.SOURCE) - .build()) - .toString(); - - LOGGER.info("Feed: " + Util.prettyPrintXml(feedOriginalSubmit)); - ServiceResponse response = prism.getFeedHelper().submitEntity(feedOriginalSubmit); - TimeUtil.sleepSeconds(10); - AssertUtil.assertSucceeded(response); - - //schedule on source - response = cluster2.getFeedHelper().schedule(feedOriginalSubmit); - TimeUtil.sleepSeconds(20); - AssertUtil.assertSucceeded(response); - - Assert.assertEquals(OozieUtil.checkIfFeedCoordExist(cluster2OC, feedName, "REPLICATION"), 0); - Assert.assertEquals(OozieUtil.checkIfFeedCoordExist(cluster2OC, feedName, "RETENTION"), 1); - Assert.assertEquals(OozieUtil.checkIfFeedCoordExist(cluster3OC, feedName, "REPLICATION"), 0); - Assert.assertEquals(OozieUtil.checkIfFeedCoordExist(cluster3OC, feedName, "RETENTION"), 0); - Assert.assertEquals(OozieUtil.checkIfFeedCoordExist(cluster1OC, feedName, "REPLICATION"), 0); - Assert.assertEquals(OozieUtil.checkIfFeedCoordExist(cluster1OC, feedName, "RETENTION"), 0); - - //prepare updated Feed - feedUpdated = FeedMerlin.fromString(feed) - .addFeedCluster(new FeedMerlin.FeedClusterBuilder(cluster2Name) - .withRetention("hours(10)", ActionType.DELETE) - .withValidity(startTime, TimeUtil.addMinsToTime(startTime, 65)) - .withClusterType(ClusterType.SOURCE) - .withPartition("US/${cluster.colo}") - .build()) - .toString(); - feedUpdated = FeedMerlin.fromString(feedUpdated).addFeedCluster( - new FeedMerlin.FeedClusterBuilder(cluster1Name) - .withRetention("hours(10)", ActionType.DELETE) - .withValidity(TimeUtil.addMinsToTime(startTime, 20), - TimeUtil.addMinsToTime(startTime, 85)) - .withClusterType(ClusterType.SOURCE) - .build()) - .toString(); - feedUpdated = FeedMerlin.fromString(feedUpdated).addFeedCluster( - new FeedMerlin.FeedClusterBuilder(cluster3Name) - .withRetention("hours(10)", ActionType.DELETE) - .withValidity(TimeUtil.addMinsToTime(startTime, 40), - TimeUtil.addMinsToTime(startTime, 110)) - .withClusterType(ClusterType.SOURCE) - .withPartition("UK/${cluster.colo}") - .build()) - .toString(); - - LOGGER.info("Updated Feed: " + Util.prettyPrintXml(feedUpdated)); - response = prism.getFeedHelper().update(feedUpdated, feedUpdated); - TimeUtil.sleepSeconds(20); - AssertUtil.assertSucceeded(response); - - prism.getFeedHelper().submitAndSchedule(feedUpdated); - - Assert.assertEquals(OozieUtil.checkIfFeedCoordExist(cluster2OC, feedName, "REPLICATION"), 0); - Assert.assertEquals(OozieUtil.checkIfFeedCoordExist(cluster2OC, feedName, "RETENTION"), 2); - Assert.assertEquals(OozieUtil.checkIfFeedCoordExist(cluster3OC, feedName, "REPLICATION"), 0); - Assert.assertEquals(OozieUtil.checkIfFeedCoordExist(cluster3OC, feedName, "RETENTION"), 1); - Assert.assertEquals(OozieUtil.checkIfFeedCoordExist(cluster1OC, feedName, "REPLICATION"), 0); - Assert.assertEquals(OozieUtil.checkIfFeedCoordExist(cluster1OC, feedName, "RETENTION"), 1); - } - - @Test(enabled = true, groups = {"multiCluster"}) - public void add2TargetCluster() throws Exception { - //add one source and one target , schedule only on source - feedOriginalSubmit = FeedMerlin.fromString(feed) - .addFeedCluster(new FeedMerlin.FeedClusterBuilder(cluster2Name) - .withRetention("hours(10)", ActionType.DELETE) - .withValidity(startTime, TimeUtil.addMinsToTime(startTime, 65)) - .withClusterType(ClusterType.SOURCE) - .build()) - .toString(); - - LOGGER.info("Feed: " + Util.prettyPrintXml(feedOriginalSubmit)); - ServiceResponse response = prism.getFeedHelper().submitEntity(feedOriginalSubmit); - TimeUtil.sleepSeconds(10); - AssertUtil.assertSucceeded(response); - - //schedule on source - response = cluster2.getFeedHelper().schedule(feedOriginalSubmit); - TimeUtil.sleepSeconds(20); - AssertUtil.assertSucceeded(response); - - Assert.assertEquals(OozieUtil.checkIfFeedCoordExist(cluster2OC, feedName, "REPLICATION"), 0); - Assert.assertEquals(OozieUtil.checkIfFeedCoordExist(cluster2OC, feedName, "RETENTION"), 1); - Assert.assertEquals(OozieUtil.checkIfFeedCoordExist(cluster3OC, feedName, "REPLICATION"), 0); - Assert.assertEquals(OozieUtil.checkIfFeedCoordExist(cluster3OC, feedName, "RETENTION"), 0); - Assert.assertEquals(OozieUtil.checkIfFeedCoordExist(cluster1OC, feedName, "REPLICATION"), 0); - Assert.assertEquals(OozieUtil.checkIfFeedCoordExist(cluster1OC, feedName, "RETENTION"), 0); - - //prepare updated Feed - feedUpdated = FeedMerlin.fromString(feed) - .addFeedCluster(new FeedMerlin.FeedClusterBuilder(cluster2Name) - .withRetention("hours(10)", ActionType.DELETE) - .withValidity(startTime, TimeUtil.addMinsToTime(startTime, 65)) - .withClusterType(ClusterType.SOURCE) - .build()) - .toString(); - feedUpdated = FeedMerlin.fromString(feedUpdated).addFeedCluster( - new FeedMerlin.FeedClusterBuilder(cluster1Name) - .withRetention("hours(10)", ActionType.DELETE) - .withValidity(TimeUtil.addMinsToTime(startTime, 20), - TimeUtil.addMinsToTime(startTime, 85)) - .withClusterType(ClusterType.TARGET) - .build()) - .toString(); - feedUpdated = FeedMerlin.fromString(feedUpdated).addFeedCluster( - new FeedMerlin.FeedClusterBuilder(cluster3Name) - .withRetention("hours(10)", ActionType.DELETE) - .withValidity(TimeUtil.addMinsToTime(startTime, 40), - TimeUtil.addMinsToTime(startTime, 110)) - .withClusterType(ClusterType.TARGET) - .build()) - .toString(); - - LOGGER.info("Updated Feed: " + Util.prettyPrintXml(feedUpdated)); - response = prism.getFeedHelper().update(feedUpdated, feedUpdated); - TimeUtil.sleepSeconds(20); - AssertUtil.assertSucceeded(response); - - prism.getFeedHelper().submitAndSchedule(feedUpdated); - - Assert.assertEquals(OozieUtil.checkIfFeedCoordExist(cluster2OC, feedName, "REPLICATION"), 0); - Assert.assertEquals(OozieUtil.checkIfFeedCoordExist(cluster2OC, feedName, "RETENTION"), 1); - Assert.assertEquals(OozieUtil.checkIfFeedCoordExist(cluster3OC, feedName, "REPLICATION"), 1); - Assert.assertEquals(OozieUtil.checkIfFeedCoordExist(cluster3OC, feedName, "RETENTION"), 1); - Assert.assertEquals(OozieUtil.checkIfFeedCoordExist(cluster1OC, feedName, "REPLICATION"), 1); - Assert.assertEquals(OozieUtil.checkIfFeedCoordExist(cluster1OC, feedName, "RETENTION"), 1); - } - - @Test(enabled = true, groups = {"multiCluster"}) - public void add1Source1TargetCluster() throws Exception { - //add one source and one target , schedule only on source - feedOriginalSubmit = FeedMerlin.fromString(feed) - .addFeedCluster(new FeedMerlin.FeedClusterBuilder(cluster2Name) - .withRetention("hours(10)", ActionType.DELETE) - .withValidity(startTime, TimeUtil.addMinsToTime(startTime, 65)) - .withClusterType(ClusterType.SOURCE) - .build()) - .toString(); - - LOGGER.info("Feed: " + Util.prettyPrintXml(feedOriginalSubmit)); - ServiceResponse response = prism.getFeedHelper().submitEntity(feedOriginalSubmit); - TimeUtil.sleepSeconds(10); - AssertUtil.assertSucceeded(response); - - //schedule on source - response = cluster2.getFeedHelper().schedule(feedOriginalSubmit); - TimeUtil.sleepSeconds(20); - AssertUtil.assertSucceeded(response); - - Assert.assertEquals(OozieUtil.checkIfFeedCoordExist(cluster2OC, feedName, "REPLICATION"), 0); - Assert.assertEquals(OozieUtil.checkIfFeedCoordExist(cluster2OC, feedName, "RETENTION"), 1); - Assert.assertEquals(OozieUtil.checkIfFeedCoordExist(cluster3OC, feedName, "REPLICATION"), 0); - Assert.assertEquals(OozieUtil.checkIfFeedCoordExist(cluster3OC, feedName, "RETENTION"), 0); - Assert.assertEquals(OozieUtil.checkIfFeedCoordExist(cluster1OC, feedName, "REPLICATION"), 0); - Assert.assertEquals(OozieUtil.checkIfFeedCoordExist(cluster1OC, feedName, "RETENTION"), 0); - - //prepare updated Feed - feedUpdated = FeedMerlin.fromString(feed) - .addFeedCluster(new FeedMerlin.FeedClusterBuilder(cluster2Name) - .withRetention("hours(10)", ActionType.DELETE) - .withValidity(startTime, TimeUtil.addMinsToTime(startTime, 65)) - .withClusterType(ClusterType.SOURCE) - .withPartition("US/${cluster.colo}") - .build()) - .toString(); - feedUpdated = FeedMerlin.fromString(feedUpdated).addFeedCluster( - new FeedMerlin.FeedClusterBuilder(cluster1Name) - .withRetention("hours(10)", ActionType.DELETE) - .withValidity(TimeUtil.addMinsToTime(startTime, 20), - TimeUtil.addMinsToTime(startTime, 85)) - .withClusterType(ClusterType.TARGET) - .build()) - .toString(); - feedUpdated = FeedMerlin.fromString(feedUpdated).addFeedCluster( - new FeedMerlin.FeedClusterBuilder(cluster3Name) - .withRetention("hours(10)", ActionType.DELETE) - .withValidity(TimeUtil.addMinsToTime(startTime, 40), - TimeUtil.addMinsToTime(startTime, 110)) - .withClusterType(ClusterType.SOURCE) - .withPartition("UK/${cluster.colo}") - .build()) - .toString(); - - LOGGER.info("Updated Feed: " + Util.prettyPrintXml(feedUpdated)); - response = prism.getFeedHelper().update(feedUpdated, feedUpdated); - TimeUtil.sleepSeconds(20); - AssertUtil.assertSucceeded(response); - - prism.getFeedHelper().submitAndSchedule(feedUpdated); - - Assert.assertEquals(OozieUtil.checkIfFeedCoordExist(cluster2OC, feedName, "REPLICATION"), 0); - Assert.assertEquals(OozieUtil.checkIfFeedCoordExist(cluster2OC, feedName, "RETENTION"), 2); - Assert.assertEquals(OozieUtil.checkIfFeedCoordExist(cluster3OC, feedName, "REPLICATION"), 0); - Assert.assertEquals(OozieUtil.checkIfFeedCoordExist(cluster3OC, feedName, "RETENTION"), 1); - Assert.assertEquals(OozieUtil.checkIfFeedCoordExist(cluster1OC, feedName, "REPLICATION"), 2); - Assert.assertEquals(OozieUtil.checkIfFeedCoordExist(cluster1OC, feedName, "RETENTION"), 1); - } - - @Test(enabled = true, groups = {"multiCluster"}) - public void deleteSourceCluster() throws Exception { - //add one source and one target , schedule only on source - feedOriginalSubmit = FeedMerlin.fromString(feed) - .addFeedCluster(new FeedMerlin.FeedClusterBuilder(cluster2Name) - .withRetention("hours(10)", ActionType.DELETE) - .withValidity(startTime, TimeUtil.addMinsToTime(startTime, 65)) - .withClusterType(ClusterType.SOURCE) - .withPartition("US/${cluster.colo}") - .build()) - .toString(); - feedOriginalSubmit = FeedMerlin.fromString(feedOriginalSubmit).addFeedCluster( - new FeedMerlin.FeedClusterBuilder(cluster1Name) - .withRetention("hours(10)", ActionType.DELETE) - .withValidity(TimeUtil.addMinsToTime(startTime, 20), - TimeUtil.addMinsToTime(startTime, 85)) - .withClusterType(ClusterType.TARGET) - .build()) - .toString(); - feedOriginalSubmit = FeedMerlin.fromString(feedOriginalSubmit).addFeedCluster( - new FeedMerlin.FeedClusterBuilder(cluster3Name) - .withRetention("hours(10)", ActionType.DELETE) - .withValidity(TimeUtil.addMinsToTime(startTime, 40), - TimeUtil.addMinsToTime(startTime, 110)) - .withClusterType(ClusterType.SOURCE) - .withPartition("UK/${cluster.colo}") - .build()) - .toString(); - - LOGGER.info("Feed: " + Util.prettyPrintXml(feedOriginalSubmit)); - ServiceResponse response = prism.getFeedHelper().submitEntity(feedOriginalSubmit); - TimeUtil.sleepSeconds(10); - AssertUtil.assertSucceeded(response); - - //schedule on source - response = prism.getFeedHelper().schedule(feedOriginalSubmit); - TimeUtil.sleepSeconds(20); - AssertUtil.assertSucceeded(response); - - Assert.assertEquals(OozieUtil.checkIfFeedCoordExist(cluster2OC, feedName, "REPLICATION"), 0); - Assert.assertEquals(OozieUtil.checkIfFeedCoordExist(cluster2OC, feedName, "RETENTION"), 1); - Assert.assertEquals(OozieUtil.checkIfFeedCoordExist(cluster3OC, feedName, "REPLICATION"), 0); - Assert.assertEquals(OozieUtil.checkIfFeedCoordExist(cluster3OC, feedName, "RETENTION"), 1); - Assert.assertEquals(OozieUtil.checkIfFeedCoordExist(cluster1OC, feedName, "REPLICATION"), 2); - Assert.assertEquals(OozieUtil.checkIfFeedCoordExist(cluster1OC, feedName, "RETENTION"), 1); - - //prepare updated Feed - feedUpdated = FeedMerlin.fromString(feed) - .addFeedCluster(new FeedMerlin.FeedClusterBuilder(cluster2Name) - .withRetention("hours(10)", ActionType.DELETE) - .withValidity(startTime, TimeUtil.addMinsToTime(startTime, 65)) - .withClusterType(ClusterType.SOURCE) - .build()) - .toString(); - feedUpdated = FeedMerlin.fromString(feedUpdated).addFeedCluster( - new FeedMerlin.FeedClusterBuilder(cluster1Name) - .withRetention("hours(10)", ActionType.DELETE) - .withValidity(TimeUtil.addMinsToTime(startTime, 20), - TimeUtil.addMinsToTime(startTime, 85)) - .withClusterType(ClusterType.TARGET) - .build()) - .toString(); - - response = prism.getFeedHelper().update(feedUpdated, feedUpdated); - TimeUtil.sleepSeconds(20); - AssertUtil.assertSucceeded(response); - - response = cluster3.getFeedHelper().getEntityDefinition(feedUpdated); - AssertUtil.assertFailed(response); - - prism.getFeedHelper().submitAndSchedule(feedUpdated); - Assert.assertEquals(OozieUtil.checkIfFeedCoordExist(cluster2OC, feedName, "REPLICATION"), 0); - Assert.assertEquals(OozieUtil.checkIfFeedCoordExist(cluster2OC, feedName, "RETENTION"), 2); - Assert.assertEquals(OozieUtil.checkIfFeedCoordExist(cluster3OC, feedName, "REPLICATION"), 0); - Assert.assertEquals(OozieUtil.checkIfFeedCoordExist(cluster3OC, feedName, "RETENTION"), 1); - Assert.assertEquals(OozieUtil.checkIfFeedCoordExist(cluster1OC, feedName, "REPLICATION"), 3); - Assert.assertEquals(OozieUtil.checkIfFeedCoordExist(cluster1OC, feedName, "RETENTION"), 2); - } - - @Test(enabled = true, groups = {"multiCluster"}) - public void deleteTargetCluster() throws Exception { - /* - this test creates a multiCluster feed. Cluster1 is the target cluster - and cluster3 and Cluster2 are the source cluster. - - feed is submitted through prism so submitted to both target and - source. Feed is scheduled through prism, so only on Cluster3 and - Cluster2 retention coord should exists. Cluster1 one which - is target both retention and replication coord should exists. there - will be 2 replication coord, one each for each source cluster. - - then we update feed by deleting cluster1 and cluster2 from the feed - xml and send update request. - - Once update is over. definition should go missing from cluster1 and - cluster2 and prism and cluster3 should have new def - - there should be a new retention coord on cluster3 and old number of - coord on cluster1 and cluster2 - */ - - //add two source and one target - feedOriginalSubmit = FeedMerlin.fromString(feed).clearFeedClusters().toString(); - - feedOriginalSubmit = FeedMerlin.fromString(feedOriginalSubmit) - .addFeedCluster(new FeedMerlin.FeedClusterBuilder(cluster2Name) - .withRetention("hours(10)", ActionType.DELETE) - .withValidity(startTime, TimeUtil.addMinsToTime(startTime, 65)) - .withClusterType(ClusterType.SOURCE) - .withPartition("US/${cluster.colo}") - .build()) - .toString(); - feedOriginalSubmit = FeedMerlin.fromString(feedOriginalSubmit).addFeedCluster( - new FeedMerlin.FeedClusterBuilder(cluster1Name) - .withRetention("hours(10)", ActionType.DELETE) - .withValidity(TimeUtil.addMinsToTime(startTime, 20), - TimeUtil.addMinsToTime(startTime, 85)) - .withClusterType(ClusterType.TARGET) - .build()) - .toString(); - feedOriginalSubmit = FeedMerlin.fromString(feedOriginalSubmit).addFeedCluster( - new FeedMerlin.FeedClusterBuilder(cluster3Name) - .withRetention("hours(10)", ActionType.DELETE) - .withValidity(TimeUtil.addMinsToTime(startTime, 40), - TimeUtil.addMinsToTime(startTime, 110)) - .withClusterType(ClusterType.SOURCE) - .withPartition("UK/${cluster.colo}") - .build()) - .toString(); - - LOGGER.info("Feed: " + Util.prettyPrintXml(feedOriginalSubmit)); - ServiceResponse response = prism.getFeedHelper().submitEntity(feedOriginalSubmit); - TimeUtil.sleepSeconds(10); - AssertUtil.assertSucceeded(response); - - //schedule on source - response = prism.getFeedHelper().schedule(feedOriginalSubmit); - TimeUtil.sleepSeconds(20); - AssertUtil.assertSucceeded(response); - - Assert.assertEquals(OozieUtil.checkIfFeedCoordExist(cluster2OC, feedName, "REPLICATION"), 0); - Assert.assertEquals(OozieUtil.checkIfFeedCoordExist(cluster2OC, feedName, "RETENTION"), 1); - Assert.assertEquals(OozieUtil.checkIfFeedCoordExist(cluster3OC, feedName, "REPLICATION"), 0); - Assert.assertEquals(OozieUtil.checkIfFeedCoordExist(cluster3OC, feedName, "RETENTION"), 1); - Assert.assertEquals(OozieUtil.checkIfFeedCoordExist(cluster1OC, feedName, "REPLICATION"), 2); - Assert.assertEquals(OozieUtil.checkIfFeedCoordExist(cluster1OC, feedName, "RETENTION"), 1); - - //prepare updated Feed - feedUpdated = FeedMerlin.fromString(feed).clearFeedClusters().toString(); - feedUpdated = FeedMerlin.fromString(feedUpdated).addFeedCluster( - new FeedMerlin.FeedClusterBuilder(cluster3Name) - .withRetention("hours(10)", ActionType.DELETE) - .withValidity(TimeUtil.addMinsToTime(startTime, 40), - TimeUtil.addMinsToTime(startTime, 110)) - .withClusterType(ClusterType.SOURCE) - .withPartition("UK/${cluster.colo}") - .build()) - .toString(); - - LOGGER.info("Feed: " + Util.prettyPrintXml(feedUpdated)); - response = prism.getFeedHelper().update(feedUpdated, feedUpdated); - TimeUtil.sleepSeconds(20); - AssertUtil.assertSucceeded(response); - - //verify xmls definitions - response = cluster1.getFeedHelper().getEntityDefinition(feedUpdated); - AssertUtil.assertFailed(response); - response = cluster2.getFeedHelper().getEntityDefinition(feedUpdated); - AssertUtil.assertFailed(response); - response = cluster3.getFeedHelper().getEntityDefinition(feedUpdated); - Assert.assertTrue(XmlUtil.isIdentical(feedUpdated, response.getMessage())); - response = prism.getFeedHelper().getEntityDefinition(feedUpdated); - Assert.assertTrue(XmlUtil.isIdentical(feedUpdated, response.getMessage())); - - Assert.assertEquals(OozieUtil.checkIfFeedCoordExist(cluster2OC, feedName, "REPLICATION"), 0); - Assert.assertEquals(OozieUtil.checkIfFeedCoordExist(cluster2OC, feedName, "RETENTION"), 1); - Assert.assertEquals(OozieUtil.checkIfFeedCoordExist(cluster3OC, feedName, "REPLICATION"), 0); - Assert.assertEquals(OozieUtil.checkIfFeedCoordExist(cluster3OC, feedName, "RETENTION"), 1); - Assert.assertEquals(OozieUtil.checkIfFeedCoordExist(cluster1OC, feedName, "REPLICATION"), 2); - Assert.assertEquals(OozieUtil.checkIfFeedCoordExist(cluster1OC, feedName, "RETENTION"), 1); - } - - /* - @Test(enabled = false) - public void delete2SourceCluster() { - - } - - @Test(enabled = false) - public void delete2TargetCluster() { - - } - - @Test(enabled = false) - public void delete1Source1TargetCluster() { - - } - */ -} http://git-wip-us.apache.org/repos/asf/falcon/blob/8e49379d/falcon-regression/merlin/src/test/java/org/apache/falcon/regression/FeedInstanceListingTest.java ---------------------------------------------------------------------- diff --git a/falcon-regression/merlin/src/test/java/org/apache/falcon/regression/FeedInstanceListingTest.java b/falcon-regression/merlin/src/test/java/org/apache/falcon/regression/FeedInstanceListingTest.java deleted file mode 100644 index ecb5798..0000000 --- a/falcon-regression/merlin/src/test/java/org/apache/falcon/regression/FeedInstanceListingTest.java +++ /dev/null @@ -1,261 +0,0 @@ -/** - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ - -package org.apache.falcon.regression; - -import org.apache.falcon.entity.v0.EntityType; -import org.apache.falcon.entity.v0.Frequency; -import org.apache.falcon.regression.core.bundle.Bundle; -import org.apache.falcon.regression.core.helpers.ColoHelper; -import org.apache.falcon.regression.core.response.ServiceResponse; -import org.apache.falcon.regression.core.util.BundleUtil; -import org.apache.falcon.regression.core.util.HadoopUtil; -import org.apache.falcon.regression.core.util.InstanceUtil; -import org.apache.falcon.regression.core.util.Util; -import org.apache.falcon.regression.core.util.OozieUtil; -import org.apache.falcon.regression.core.util.OSUtil; -import org.apache.falcon.regression.core.util.AssertUtil; -import org.apache.falcon.regression.testHelper.BaseTestClass; -import org.apache.falcon.resource.FeedInstanceResult; -import org.apache.hadoop.fs.FileSystem; -import org.apache.log4j.Logger; -import org.apache.oozie.client.CoordinatorAction; -import org.apache.oozie.client.OozieClient; -import org.testng.Assert; -import org.testng.annotations.AfterMethod; -import org.testng.annotations.BeforeMethod; -import org.testng.annotations.Test; - -import java.io.IOException; -import java.util.ArrayList; -import java.util.Arrays; -import java.util.Collections; -import java.util.List; - -/** - * Test for https://issues.apache.org/jira/browse/FALCON-761. - */ -@Test(groups = "embedded", timeOut = 900000) -public class FeedInstanceListingTest extends BaseTestClass{ - private String baseTestDir = cleanAndGetTestDir(); - private String aggregateWorkflowDir = baseTestDir + "/aggregator"; - private String feedInputPath = baseTestDir + "/input" + MINUTE_DATE_PATTERN; - private String feedOutputPath = baseTestDir + "/output-data" + MINUTE_DATE_PATTERN; - private String processName; - - private ColoHelper cluster = servers.get(0); - private FileSystem clusterFS = serverFS.get(0); - private OozieClient clusterOC = serverOC.get(0); - - private static final Logger LOGGER = Logger.getLogger(FeedInstanceListingTest.class); - - @BeforeMethod(alwaysRun = true) - public void setup() throws Exception { - bundles[0] = BundleUtil.readELBundle(); - bundles[0] = new Bundle(bundles[0], cluster); - bundles[0].generateUniqueBundle(this); - bundles[0].setProcessWorkflow(aggregateWorkflowDir); - bundles[0].setInputFeedDataPath(feedInputPath); - bundles[0].setInputFeedPeriodicity(5, Frequency.TimeUnit.minutes); - bundles[0].setOutputFeedPeriodicity(5, Frequency.TimeUnit.minutes); - bundles[0].setOutputFeedLocationData(feedOutputPath); - bundles[0].setProcessPeriodicity(5, Frequency.TimeUnit.minutes); - processName = bundles[0].getProcessName(); - HadoopUtil.uploadDir(clusterFS, aggregateWorkflowDir, OSUtil.RESOURCES_OOZIE); - } - - @AfterMethod(alwaysRun = true) - public void tearDown() throws IOException{ - cleanTestsDirs(); - removeTestClassEntities(); - } - - /** - * Test when all data is available for all instances. - */ - @Test - public void testFeedListingWhenAllAvailable() throws Exception { - bundles[0].setProcessValidity("2010-01-02T01:00Z", "2010-01-02T01:21Z"); - bundles[0].setProcessConcurrency(1); - bundles[0].submitFeedsScheduleProcess(prism); - InstanceUtil.waitTillInstancesAreCreated(clusterOC, bundles[0].getProcessData(), 0); - List<List<String>> missingDependencies = OozieUtil.createMissingDependencies(cluster, - EntityType.PROCESS, processName, 0); - List<String> missingDependencyLastInstance = missingDependencies.get(missingDependencies.size()-1); - HadoopUtil.flattenAndPutDataInFolder(clusterFS, OSUtil.SINGLE_FILE, missingDependencyLastInstance); - InstanceUtil.waitTillInstanceReachState(clusterOC, processName, 1, - CoordinatorAction.Status.RUNNING, EntityType.PROCESS, 5); - FeedInstanceResult r = prism.getFeedHelper() - .getFeedInstanceListing(Util.readEntityName(bundles[0].getDataSets().get(0)), - "?start=2010-01-02T01:00Z&end=2010-01-02T01:21Z"); - validateResponse(r, 5, 0, 0, 0, 5); - } - - /** - *Test when only empty directories exist for all instances. - */ - @Test - public void testFeedListingWhenAllEmpty() throws Exception { - bundles[0].setProcessValidity("2010-01-02T01:00Z", "2010-01-02T01:21Z"); - bundles[0].setProcessConcurrency(1); - bundles[0].submitFeedsScheduleProcess(prism); - InstanceUtil.waitTillInstancesAreCreated(clusterOC, bundles[0].getProcessData(), 0); - OozieUtil.createMissingDependencies(cluster, EntityType.PROCESS, processName, 0); - InstanceUtil.waitTillInstanceReachState(clusterOC, processName, 1, - CoordinatorAction.Status.RUNNING, EntityType.PROCESS, 5); - FeedInstanceResult r = prism.getFeedHelper() - .getFeedInstanceListing(Util.readEntityName(bundles[0].getDataSets().get(0)), - "?start=2010-01-02T01:00Z&end=2010-01-02T01:21Z"); - validateResponse(r, 5, 0, 5, 0, 0); - } - - /** - * Test when no data is present for any instance. - */ - @Test - public void testFeedListingWhenAllMissing() throws Exception { - bundles[0].setProcessValidity("2010-01-02T01:00Z", "2010-01-02T01:21Z"); - bundles[0].setProcessConcurrency(1); - bundles[0].submitFeedsScheduleProcess(prism); - InstanceUtil.waitTillInstancesAreCreated(clusterOC, bundles[0].getProcessData(), 0); - FeedInstanceResult r = prism.getFeedHelper() - .getFeedInstanceListing(Util.readEntityName(bundles[0].getDataSets().get(0)), - "?start=2010-01-02T01:00Z&end=2010-01-02T01:21Z"); - validateResponse(r, 5, 5, 0, 0, 0); - } - - /** - * Initially no availability flag is set for the feed. And data is created, so instance status is available. - * Then, set the availability flag and update the feed. The instance status should change to partial. - */ - @Test - public void testFeedListingAfterFeedAvailabilityFlagUpdate() throws Exception { - bundles[0].setProcessValidity("2010-01-02T01:00Z", "2010-01-02T01:21Z"); - bundles[0].setProcessConcurrency(1); - bundles[0].submitFeedsScheduleProcess(prism); - InstanceUtil.waitTillInstancesAreCreated(clusterOC, bundles[0].getProcessData(), 0); - List<List<String>> missingDependencies = OozieUtil.createMissingDependencies(cluster, - EntityType.PROCESS, processName, 0); - List<String> missingDependencyLastInstance = missingDependencies.get(missingDependencies.size()-1); - HadoopUtil.flattenAndPutDataInFolder(clusterFS, OSUtil.SINGLE_FILE, missingDependencyLastInstance); - InstanceUtil.waitTillInstanceReachState(clusterOC, processName, 1, - CoordinatorAction.Status.RUNNING, EntityType.PROCESS, 5); - FeedInstanceResult r = prism.getFeedHelper() - .getFeedInstanceListing(Util.readEntityName(bundles[0].getDataSets().get(0)), - "?start=2010-01-02T01:00Z&end=2010-01-02T01:21Z"); - validateResponse(r, 5, 0, 0, 0, 5); - String inputFeed = bundles[0].getInputFeedFromBundle(); - bundles[0].setInputFeedAvailabilityFlag("_SUCCESS"); - ServiceResponse serviceResponse = prism.getFeedHelper().update(inputFeed, bundles[0].getInputFeedFromBundle()); - AssertUtil.assertSucceeded(serviceResponse); - //Since we have not created availability flag on HDFS, the feed instance status should be partial - r = prism.getFeedHelper() - .getFeedInstanceListing(Util.readEntityName(bundles[0].getDataSets().get(0)), - "?start=2010-01-02T01:00Z&end=2010-01-02T01:21Z"); - validateResponse(r, 5, 0, 0, 5, 0); - } - - /** - * Data is created for the feed, so instance status is available. - * Then, change the data path and update the feed. The instance status should change to partial. - */ - @Test - public void testFeedListingAfterFeedDataPathUpdate() throws Exception { - bundles[0].setProcessValidity("2010-01-02T01:00Z", "2010-01-02T01:21Z"); - bundles[0].setProcessConcurrency(1); - bundles[0].submitFeedsScheduleProcess(prism); - InstanceUtil.waitTillInstancesAreCreated(clusterOC, bundles[0].getProcessData(), 0); - List<List<String>> missingDependencies = OozieUtil.createMissingDependencies(cluster, - EntityType.PROCESS, processName, 0); - List<String> missingDependencyLastInstance = missingDependencies.get(missingDependencies.size()-1); - HadoopUtil.flattenAndPutDataInFolder(clusterFS, OSUtil.SINGLE_FILE, missingDependencyLastInstance); - InstanceUtil.waitTillInstanceReachState(clusterOC, processName, 1, - CoordinatorAction.Status.RUNNING, EntityType.PROCESS, 5); - FeedInstanceResult r = prism.getFeedHelper() - .getFeedInstanceListing(Util.readEntityName(bundles[0].getDataSets().get(0)), - "?start=2010-01-02T01:00Z&end=2010-01-02T01:21Z"); - validateResponse(r, 5, 0, 0, 0, 5); - String inputFeed = bundles[0].getInputFeedFromBundle(); - bundles[0].setInputFeedDataPath(baseTestDir + "/inputNew" + MINUTE_DATE_PATTERN); - ServiceResponse serviceResponse = prism.getFeedHelper().update(inputFeed, bundles[0].getInputFeedFromBundle()); - AssertUtil.assertSucceeded(serviceResponse); - //Since we have not created directories for new path, the feed instance status should be missing - r = prism.getFeedHelper() - .getFeedInstanceListing(Util.readEntityName(bundles[0].getDataSets().get(0)), - "?start=2010-01-02T01:00Z&end=2010-01-02T01:21Z"); - validateResponse(r, 5, 5, 0, 0, 0); - } - - /** - * Submit the feeds on prism, and request for instance status on server. Request should succeed. - */ - @Test - public void testFeedListingFeedSubmitOnPrismRequestOnServer() throws Exception { - bundles[0].setProcessValidity("2010-01-02T01:00Z", "2010-01-02T01:21Z"); - bundles[0].setProcessConcurrency(1); - bundles[0].submitFeedsScheduleProcess(prism); - InstanceUtil.waitTillInstancesAreCreated(clusterOC, bundles[0].getProcessData(), 0); - FeedInstanceResult r = cluster.getFeedHelper() - .getFeedInstanceListing(Util.readEntityName(bundles[0].getDataSets().get(0)), - "?start=2010-01-02T01:00Z&end=2010-01-02T01:21Z"); - validateResponse(r, 5, 5, 0, 0, 0); - } - - /** - * Checks that actual number of instances with different statuses are equal to expected number - * of instances with matching statuses. - * - * @param instancesResult kind of response from API which should contain information about - * instances <p/> - * All parameters below reflect number of expected instances with some - * kind of status. - * @param totalCount total number of instances. - * @param missingCount number of running instances. - * @param emptyCount number of suspended instance. - * @param partialCount number of waiting instance. - * @param availableCount number of killed instance. - */ - private void validateResponse(FeedInstanceResult instancesResult, int totalCount, - int missingCount, int emptyCount, int partialCount, int availableCount) { - FeedInstanceResult.Instance[] instances = instancesResult.getInstances(); - LOGGER.info("instances: " + Arrays.toString(instances)); - Assert.assertNotNull(instances, "instances should be not null"); - Assert.assertEquals(instances.length, totalCount, "Total Instances"); - List<String> statuses = new ArrayList<>(); - for (FeedInstanceResult.Instance instance : instances) { - Assert.assertNotNull(instance.getCluster()); - Assert.assertNotNull(instance.getInstance()); - Assert.assertNotNull(instance.getStatus()); - Assert.assertNotNull(instance.getUri()); - Assert.assertNotNull(instance.getCreationTime()); - Assert.assertNotNull(instance.getSize()); - final String status = instance.getStatus(); - LOGGER.info("status: "+ status + ", instance: " + instance.getInstance()); - statuses.add(status); - } - - Assert.assertEquals(Collections.frequency(statuses, "MISSING"), - missingCount, "Missing Instances"); - Assert.assertEquals(Collections.frequency(statuses, "EMPTY"), - emptyCount, "Empty Instances"); - Assert.assertEquals(Collections.frequency(statuses, "PARTIAL"), - partialCount, "Partial Instances"); - Assert.assertEquals(Collections.frequency(statuses, "AVAILABLE"), - availableCount, "Available Instances"); - } -}
