Repository: incubator-falcon Updated Branches: refs/heads/master 9f01d25da -> 2bcc9f9f8
FALCON-844 List instances tests. Contributed by Paul Isaychuk Project: http://git-wip-us.apache.org/repos/asf/incubator-falcon/repo Commit: http://git-wip-us.apache.org/repos/asf/incubator-falcon/commit/2bcc9f9f Tree: http://git-wip-us.apache.org/repos/asf/incubator-falcon/tree/2bcc9f9f Diff: http://git-wip-us.apache.org/repos/asf/incubator-falcon/diff/2bcc9f9f Branch: refs/heads/master Commit: 2bcc9f9f893575fab58c4a4a994b7ca0f5c96729 Parents: 9f01d25 Author: Ruslan Ostafiychuk <rostafiyc...@apache.org> Authored: Thu Oct 30 15:34:24 2014 +0200 Committer: Ruslan Ostafiychuk <rostafiyc...@apache.org> Committed: Thu Oct 30 15:34:24 2014 +0200 ---------------------------------------------------------------------- falcon-regression/CHANGES.txt | 2 + .../core/interfaces/IEntityManagerHelper.java | 19 +- .../falcon/regression/core/util/HadoopUtil.java | 20 +- .../falcon/regression/core/util/Util.java | 3 +- .../lineage/ListFeedInstancesTest.java | 461 +++++++++++++++++++ .../lineage/ListProcessInstancesTest.java | 358 ++++++++++++++ 6 files changed, 856 insertions(+), 7 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/incubator-falcon/blob/2bcc9f9f/falcon-regression/CHANGES.txt ---------------------------------------------------------------------- diff --git a/falcon-regression/CHANGES.txt b/falcon-regression/CHANGES.txt index 9c1da52..f022331 100644 --- a/falcon-regression/CHANGES.txt +++ b/falcon-regression/CHANGES.txt @@ -5,6 +5,8 @@ Trunk (Unreleased) INCOMPATIBLE CHANGES NEW FEATURES + FALCON-844 List instances tests (Paul Isaychuk via Ruslan Ostafiychuk) + FALCON-841 Test falcon process with different frequencies (Raghav Kumar Gautam) http://git-wip-us.apache.org/repos/asf/incubator-falcon/blob/2bcc9f9f/falcon-regression/merlin-core/src/main/java/org/apache/falcon/regression/core/interfaces/IEntityManagerHelper.java ---------------------------------------------------------------------- diff --git a/falcon-regression/merlin-core/src/main/java/org/apache/falcon/regression/core/interfaces/IEntityManagerHelper.java b/falcon-regression/merlin-core/src/main/java/org/apache/falcon/regression/core/interfaces/IEntityManagerHelper.java index 9c5751f..7264142 100644 --- a/falcon-regression/merlin-core/src/main/java/org/apache/falcon/regression/core/interfaces/IEntityManagerHelper.java +++ b/falcon-regression/merlin-core/src/main/java/org/apache/falcon/regression/core/interfaces/IEntityManagerHelper.java @@ -547,9 +547,26 @@ public abstract class IEntityManagerHelper { throws AuthenticationException, IOException, URISyntaxException { String url = createUrl(this.hostname + URLS.ENTITY_SUMMARY.getValue(), getEntityType(), clusterName); - if (!StringUtils.isEmpty(params)){ + if (StringUtils.isNotEmpty(params)) { url += "?" + params; } return Util.sendRequest(url, "get", null, null); } + + /** + * Get list of all instances of a given entity. + * @param entityName entity name + * @param params list of optional parameters + * @param user user name + * @return response + */ + public InstancesResult listInstances(String entityName, String params, String user) + throws AuthenticationException, IOException, URISyntaxException { + String url = createUrl(this.hostname + URLS.INSTANCE_LIST.getValue(), getEntityType(), + entityName + colo); + if (StringUtils.isNotEmpty(params)) { + url += colo.isEmpty() ? "?" + params : "&" + params; + } + return (InstancesResult) InstanceUtil.sendRequestProcessInstance(url, user); + } } http://git-wip-us.apache.org/repos/asf/incubator-falcon/blob/2bcc9f9f/falcon-regression/merlin-core/src/main/java/org/apache/falcon/regression/core/util/HadoopUtil.java ---------------------------------------------------------------------- diff --git a/falcon-regression/merlin-core/src/main/java/org/apache/falcon/regression/core/util/HadoopUtil.java b/falcon-regression/merlin-core/src/main/java/org/apache/falcon/regression/core/util/HadoopUtil.java index 024a652..bd2aaf6 100644 --- a/falcon-regression/merlin-core/src/main/java/org/apache/falcon/regression/core/util/HadoopUtil.java +++ b/falcon-regression/merlin-core/src/main/java/org/apache/falcon/regression/core/util/HadoopUtil.java @@ -40,11 +40,24 @@ public final class HadoopUtil { public static final String SOMETHING_RANDOM = "somethingRandom"; private static final Logger LOGGER = Logger.getLogger(HadoopUtil.class); + private static Pattern protocol = Pattern.compile(":[\\d]+/"); private HadoopUtil() { throw new AssertionError("Instantiating utility class..."); } + /* + * Removes 'hdfs(hftp)://server:port' + */ + private static String cutProtocol(String path) { + if (StringUtils.isNotEmpty(path)) { + if (protocol.matcher(path).find()) { + return '/' + protocol.split(path)[1]; + } + } + return path; + } + /** * Retrieves all file names contained in a given directory. * @param fs filesystem @@ -299,13 +312,10 @@ public final class HadoopUtil { if (!remotePathPrefix.endsWith("/") && !remoteLocations.get(0).startsWith("/")) { remotePathPrefix += "/"; } - Pattern pattern = Pattern.compile(":[\\d]+/"); // remove 'hdfs(hftp)://server:port' List<String> locations = new ArrayList<String>(); for (String remoteDir : remoteLocations) { String remoteLocation = remotePathPrefix + remoteDir; - if (pattern.matcher(remoteLocation).find()) { - remoteLocation = remoteLocation.split(":[\\d]+")[1]; - } + remoteLocation = cutProtocol(remoteLocation); locations.add(remoteLocation); LOGGER.info(String.format("copying to: %s files: %s", fs.getUri() + remoteLocation, Arrays.toString(files))); @@ -381,7 +391,7 @@ public final class HadoopUtil { public static void createFolders(FileSystem fs, final String folderPrefix, List<String> folderList) throws IOException { for (final String folder : folderList) { - fs.mkdirs(new Path(folderPrefix + folder)); + fs.mkdirs(new Path(cutProtocol(folderPrefix + folder))); } } http://git-wip-us.apache.org/repos/asf/incubator-falcon/blob/2bcc9f9f/falcon-regression/merlin-core/src/main/java/org/apache/falcon/regression/core/util/Util.java ---------------------------------------------------------------------- diff --git a/falcon-regression/merlin-core/src/main/java/org/apache/falcon/regression/core/util/Util.java b/falcon-regression/merlin-core/src/main/java/org/apache/falcon/regression/core/util/Util.java index 3a63e7c..f0c6db2 100644 --- a/falcon-regression/merlin-core/src/main/java/org/apache/falcon/regression/core/util/Util.java +++ b/falcon-regression/merlin-core/src/main/java/org/apache/falcon/regression/core/util/Util.java @@ -612,7 +612,8 @@ public final class Util { INSTANCE_SUSPEND("/api/instance/suspend"), INSTANCE_RERUN("/api/instance/rerun"), INSTANCE_SUMMARY("/api/instance/summary"), - INSTANCE_PARAMS("/api/instance/params"); + INSTANCE_PARAMS("/api/instance/params"), + INSTANCE_LIST("/api/instance/list"); private final String url; URLS(String url) { http://git-wip-us.apache.org/repos/asf/incubator-falcon/blob/2bcc9f9f/falcon-regression/merlin/src/test/java/org/apache/falcon/regression/lineage/ListFeedInstancesTest.java ---------------------------------------------------------------------- diff --git a/falcon-regression/merlin/src/test/java/org/apache/falcon/regression/lineage/ListFeedInstancesTest.java b/falcon-regression/merlin/src/test/java/org/apache/falcon/regression/lineage/ListFeedInstancesTest.java new file mode 100644 index 0000000..47b3cb7 --- /dev/null +++ b/falcon-regression/merlin/src/test/java/org/apache/falcon/regression/lineage/ListFeedInstancesTest.java @@ -0,0 +1,461 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.falcon.regression.lineage; + +import org.apache.falcon.entity.v0.EntityType; +import org.apache.falcon.entity.v0.Frequency; +import org.apache.falcon.entity.v0.feed.ActionType; +import org.apache.falcon.entity.v0.feed.ClusterType; +import org.apache.falcon.regression.core.bundle.Bundle; +import org.apache.falcon.regression.core.helpers.ColoHelper; +import org.apache.falcon.regression.core.response.InstancesResult; +import org.apache.falcon.regression.core.util.AssertUtil; +import org.apache.falcon.regression.core.util.BundleUtil; +import org.apache.falcon.regression.core.util.HadoopUtil; +import org.apache.falcon.regression.core.util.InstanceUtil; +import org.apache.falcon.regression.core.util.OSUtil; +import org.apache.falcon.regression.core.util.TimeUtil; +import org.apache.falcon.regression.core.util.Util; +import org.apache.falcon.regression.core.util.XmlUtil; +import org.apache.falcon.regression.testHelper.BaseTestClass; +import org.apache.hadoop.security.authentication.client.AuthenticationException; +import org.apache.log4j.Logger; +import org.apache.oozie.client.CoordinatorAction; +import org.apache.oozie.client.CoordinatorJob; +import org.apache.oozie.client.OozieClient; +import org.apache.oozie.client.OozieClientException; +import org.testng.Assert; +import org.testng.annotations.AfterClass; +import org.testng.annotations.BeforeClass; +import org.testng.annotations.Test; +import org.testng.asserts.SoftAssert; + +import javax.xml.bind.JAXBException; +import java.io.IOException; +import java.net.URISyntaxException; +import java.util.Arrays; +import java.util.Date; +import java.util.List; + +/** + * Testing the list instances api for feed. + */ +@Test(groups = "embedded") +public class ListFeedInstancesTest extends BaseTestClass { + private static final Logger LOGGER = Logger.getLogger(ListFeedInstancesTest.class); + private ColoHelper cluster2 = servers.get(1); + private OozieClient cluster2OC = serverOC.get(1); + private String testDir = "/ListFeedInstancesTest"; + private String baseTestHDFSDir = baseHDFSDir + testDir; + private String aggregateWorkflowDir = baseTestHDFSDir + "/aggregator"; + private String sourcePath = baseTestHDFSDir + "/source"; + private String feedDataLocation = sourcePath + MINUTE_DATE_PATTERN; + private String targetPath = baseTestHDFSDir + "/target"; + private String targetDataLocation = targetPath + MINUTE_DATE_PATTERN; + private String startTime, endTime; + private String feedName; + + @BeforeClass(alwaysRun = true) + public void setUp() + throws IOException, OozieClientException, JAXBException, AuthenticationException, + URISyntaxException { + uploadDirToClusters(aggregateWorkflowDir, OSUtil.RESOURCES_OOZIE); + startTime = TimeUtil.getTimeWrtSystemTime(-55); + endTime = TimeUtil.getTimeWrtSystemTime(5); + LOGGER.info("Time range is between : " + startTime + " and " + endTime); + Bundle bundle = BundleUtil.readELBundle(); + for (int i = 0; i < 2; i++) { + bundles[i] = new Bundle(bundle, servers.get(i)); + bundles[i].generateUniqueBundle(); + } + prepareScenario(); + } + + /* + * Prepares running feed with instances ordered (desc): 1 waiting, 1 suspended, 1 running, + * 3 waiting and 6 killed. Testing is based on expected instances statuses. + */ + private void prepareScenario() throws AuthenticationException, IOException, URISyntaxException, + JAXBException, OozieClientException { + bundles[0].setInputFeedPeriodicity(5, Frequency.TimeUnit.minutes); + bundles[0].setInputFeedDataPath(feedDataLocation); + String feed = bundles[0].getInputFeedFromBundle(); + feedName = Util.readEntityName(feed); + String cluster1Def = bundles[0].getClusters().get(0); + String cluster2Def = bundles[1].getClusters().get(0); + //erase all clusters from feed definition + feed = InstanceUtil.setFeedCluster(feed, + XmlUtil.createValidity("2012-10-01T12:00Z", "2010-01-01T00:00Z"), + XmlUtil.createRetention("days(1000000)", ActionType.DELETE), null, + ClusterType.SOURCE, null); + //set cluster1 as source + feed = InstanceUtil.setFeedCluster(feed, + XmlUtil.createValidity(startTime, endTime), + XmlUtil.createRetention("days(1000000)", ActionType.DELETE), + Util.readEntityName(cluster1Def), ClusterType.SOURCE, null); + //set cluster2 as target + feed = InstanceUtil.setFeedCluster(feed, + XmlUtil.createValidity(startTime, endTime), + XmlUtil.createRetention("days(1000000)", ActionType.DELETE), + Util.readEntityName(cluster2Def), ClusterType.TARGET, null, targetDataLocation); + + //submit clusters + AssertUtil.assertSucceeded(prism.getClusterHelper().submitEntity(cluster1Def)); + AssertUtil.assertSucceeded(prism.getClusterHelper().submitEntity(cluster2Def)); + + //submit and schedule feed + AssertUtil.assertSucceeded(prism.getFeedHelper().submitAndSchedule(feed)); + InstanceUtil.waitTillInstancesAreCreated(cluster2, feed, 0); + + //retrieve specific instances to rule them directly + List<CoordinatorAction> actions = null; + String bundleID = InstanceUtil.getSequenceBundleID(cluster2, feedName, EntityType.FEED, 0); + List<CoordinatorJob> coords = cluster2OC.getBundleJobInfo(bundleID).getCoordinators(); + for (CoordinatorJob coord : coords) { + CoordinatorJob temp = cluster2OC.getCoordJobInfo(coord.getId()); + actions = temp.getActions().size() == 12 ? temp.getActions() : null; + } + Assert.assertNotNull(actions, "Required coordinator not found."); + LOGGER.info(actions); + + //killing first 6 instances + String range; + InstancesResult r; + for (int i = 0; i < 6; i++) { + HadoopUtil.createFolders(serverFS.get(0), "", Arrays.asList(actions.get(i) + .getMissingDependencies().split("#"))); + //only running instance can be killed, so we should make it running and then kill it + InstanceUtil.waitTillInstanceReachState(cluster2OC, feedName, 1, + CoordinatorAction.Status.RUNNING, EntityType.FEED, 3); + range = "?start=" + TimeUtil.addMinsToTime(startTime, i * 5 - 1) + + "&end=" + TimeUtil.addMinsToTime(startTime, i * 5 + 1); + r = prism.getFeedHelper().getProcessInstanceKill(feedName, range); + InstanceUtil.validateResponse(r, 1, 0, 0, 0, 1); + } + //wait for 10th instance to run, suspend it then + HadoopUtil.createFolders(serverFS.get(0), "", Arrays.asList(actions.get(9) + .getMissingDependencies().split("#"))); + InstanceUtil.waitTillInstanceReachState(cluster2OC, feedName, 1, + CoordinatorAction.Status.RUNNING, EntityType.FEED, 3); + range = "?start=" + TimeUtil.addMinsToTime(endTime, -15) + + "&end=" + TimeUtil.addMinsToTime(endTime, -10); + r = prism.getFeedHelper().getProcessInstanceSuspend(feedName, range); + InstanceUtil.validateResponse(r, 1, 0, 1, 0, 0); + + //wait for 11h to run + HadoopUtil.createFolders(serverFS.get(0), "", Arrays.asList(actions.get(10) + .getMissingDependencies().split("#"))); + InstanceUtil.waitTillInstanceReachState(cluster2OC, feedName, 1, + CoordinatorAction.Status.RUNNING, EntityType.FEED, 3); + + //check that the scenario works as expected. + r = prism.getFeedHelper().getProcessInstanceStatus(feedName, + "?start=" + startTime + "&numResults=12"); + InstanceUtil.validateResponse(r, 12, 1, 1, 4, 6); + } + + /** + * Test the list feed instances api using an orderBy parameter. Check the order. + */ + @Test + public void testFeedOrderBy() + throws URISyntaxException, OozieClientException, JAXBException, AuthenticationException, + IOException { + SoftAssert softAssert = new SoftAssert(); + //orderBy start time + InstancesResult r = prism.getFeedHelper().listInstances(feedName, + "orderBy=startTime&sortOrder=desc", null); + InstancesResult.Instance[] instances = r.getInstances(); + Date previousDate = new Date(); + for (InstancesResult.Instance instance : instances) { + Date current = instance.getStartTime(); + if (current != null) { //e.g if instance is WAITING it doesn't have start time + softAssert.assertTrue(current.before(previousDate) || current.equals(previousDate), + "Wrong order. Current startTime :" + current + " Previous: " + previousDate); + previousDate = (Date) current.clone(); + } + } + //orderBy status + r = prism.getFeedHelper().listInstances(feedName, + "start=" + startTime + "&numResults=12&orderBy=status&sortOrder=desc", null); + InstanceUtil.validateResponse(r, 12, 1, 1, 4, 6); + instances = r.getInstances(); + InstancesResult.WorkflowStatus previousStatus = InstancesResult.WorkflowStatus.WAITING; + for (InstancesResult.Instance instance : instances) { + InstancesResult.WorkflowStatus current = instance.getStatus(); + softAssert.assertTrue(current.compareTo(previousStatus) <= 0, + "Wrong order. Compared " + current + " and " + previousStatus + " statuses."); + previousStatus = current; + } + //sort by endTime + r = prism.getFeedHelper().listInstances(feedName, + "start=" + startTime + "&numResults=12&orderBy=endTime&sortOrder=desc", null); + instances = r.getInstances(); + previousDate = new Date(); + for (InstancesResult.Instance instance : instances) { + Date current = instance.getEndTime(); + if (current != null) { //e.g if instance is WAITING it doesn't have end time + softAssert.assertTrue(current.before(previousDate) || current.equals(previousDate), + "Wrong order. Current startTime :" + current + " Previous: " + previousDate); + previousDate = (Date) current.clone(); + } + } + softAssert.assertAll(); + } + + /** + * Test the list feed instance api using start/end parameters. Check instances number. + */ + @Test + public void testFeedStartEnd() + throws URISyntaxException, OozieClientException, JAXBException, AuthenticationException, + IOException { + //actual start/end values. + InstancesResult r = prism.getFeedHelper().listInstances(feedName, + "start=" + startTime + "&end=" + endTime, null); + InstanceUtil.validateResponse(r, 10, 1, 1, 4, 4); + + //without params, the default start/end should be applied. + r = prism.getFeedHelper().listInstances(feedName, null, null); + InstanceUtil.validateResponse(r, 10, 1, 1, 4, 4); + + //increasing a -start, the -end stays the same. + r = prism.getFeedHelper().listInstances(feedName, + "start=" + TimeUtil.addMinsToTime(startTime, 6) + + "&end=" + TimeUtil.addMinsToTime(endTime, -5), null); + InstanceUtil.validateResponse(r, 9, 1, 1, 3, 4); + r = prism.getFeedHelper().listInstances(feedName, + "start=" + TimeUtil.addMinsToTime(startTime, 11) + + "&end=" + TimeUtil.addMinsToTime(endTime, -5), null); + InstanceUtil.validateResponse(r, 8, 1, 1, 3, 3); + r = prism.getFeedHelper().listInstances(feedName, + "start=" + TimeUtil.addMinsToTime(startTime, 16) + + "&end=" + TimeUtil.addMinsToTime(endTime, -5), null); + InstanceUtil.validateResponse(r, 7, 1, 1, 3, 2); + + //one instance between start/end, use instances with different statuses. + r = prism.getFeedHelper().listInstances(feedName, + "start=" + TimeUtil.addMinsToTime(startTime, 12) + + "&end=" + TimeUtil.addMinsToTime(startTime, 16), null); + InstanceUtil.validateResponse(r, 1, 0, 0, 0, 1); + r = prism.getFeedHelper().listInstances(feedName, + "start=" + TimeUtil.addMinsToTime(endTime, -5) + "&end=" + endTime, null); + InstanceUtil.validateResponse(r, 1, 0, 0, 1, 0); + + //only start, actual feed startTime, should get 10 most recent instances(by default). + r = prism.getFeedHelper().listInstances(feedName, "start=" + startTime, null); + InstanceUtil.validateResponse(r, 10, 1, 1, 4, 4); + + //only start, greater then the actual startTime. + r = prism.getFeedHelper().listInstances(feedName, + "start=" + TimeUtil.addMinsToTime(startTime, 16), null); + InstanceUtil.validateResponse(r, 8, 1, 1, 4, 2); + + //only end, 1 instance is expected + r = prism.getFeedHelper().listInstances(feedName, + "end=" + TimeUtil.addMinsToTime(startTime, 4), null); + InstanceUtil.validateResponse(r, 1, 0, 0, 0, 1); + + //only the end, 10 the most recent instances are expected + r = prism.getFeedHelper().listInstances(feedName, "end=" + endTime, null); + InstanceUtil.validateResponse(r, 10, 1, 1, 4, 4); + + //only the end + r = prism.getFeedHelper().listInstances(feedName, + "end=" + TimeUtil.addMinsToTime(endTime, -31), null); + InstanceUtil.validateResponse(r, 6, 0, 0, 0, 6); + r = prism.getFeedHelper().listInstances(feedName, + "end=" + TimeUtil.addMinsToTime(endTime, -21), null); + InstanceUtil.validateResponse(r, 8, 0, 0, 2, 6); + } + + /** + * List feed instances with -offset and -numResults params expecting the list of feed + * instances which start at the right offset and number of instances matches to expected. + */ + @Test + public void testFeedOffsetNumResults() + throws URISyntaxException, IOException, AuthenticationException { + //check the default value of the numResults param. Expecting 10 instances. + InstancesResult r = prism.getFeedHelper().listInstances(feedName, null, null); + InstanceUtil.validateResponse(r, 10, 1, 1, 4, 4); + + //changing a value to 6. 6 instances are expected + r = prism.getFeedHelper().listInstances(feedName, "numResults=6", null); + InstanceUtil.validateResponse(r, 6, 1, 1, 4, 0); + + //use a start option without a numResults parameter. 10 instances are expected + r = prism.getFeedHelper().listInstances(feedName, "start=" + startTime, null); + InstanceUtil.validateResponse(r, 10, 1, 1, 4, 4); + + //use a start option with a numResults value which is smaller then the default. + r = prism.getFeedHelper().listInstances(feedName, + "start=" + startTime + "&numResults=8", null); + InstanceUtil.validateResponse(r, 8, 1, 1, 4, 2); + + //use a start option with a numResults value greater then the default. + r = prism.getFeedHelper().listInstances(feedName, + "start=" + startTime + "&numResults=12", null); + InstanceUtil.validateResponse(r, 12, 1, 1, 4, 6); + + //get all instances + InstancesResult.Instance[] allInstances = r.getInstances(); + + //adding an offset param into request. Expected (total number - offset) instances. + int offset = 3; + r = prism.getFeedHelper().listInstances(feedName, + "start=" + startTime + "&offset=" + offset + "&numResults=12", null); + InstanceUtil.validateResponse(r, 9, 0, 0, 3, 6); + + //check that expected instances were retrieved + InstancesResult.Instance[] instances = r.getInstances(); + for (int i = 0; i < 9; i++) { + LOGGER.info("Comparing instances: " + instances[i] + " and " + allInstances[i + offset]); + Assert.assertTrue(instances[i].getInstance().equals(allInstances[i + offset].getInstance())); + } + //use different offset and numResults params in the request + offset = 6; + r = prism.getFeedHelper().listInstances(feedName, + "start=" + startTime + "&offset=" + offset + "&numResults=6", null); + InstanceUtil.validateResponse(r, 6, 0, 0, 0, 6); + + //check that expected instances are present in response + instances = r.getInstances(); + for (int i = 0; i < 6; i++) { + LOGGER.info("Comparing instances: " + instances[i] + " and " + allInstances[i + offset]); + Assert.assertTrue(instances[i].getInstance().equals(allInstances[i + offset].getInstance())); + } + } + + /** + * Test the list feed instances api using filterBy parameter. + */ + @Test + public void testFeedFilterBy() + throws OozieClientException, AuthenticationException, IOException, URISyntaxException { + //test with the filterBy status. + InstancesResult r = prism.getFeedHelper().listInstances(feedName, + "filterBy=STATUS:RUNNING", null); + InstanceUtil.validateResponse(r, 1, 1, 0, 0, 0); + r = prism.getFeedHelper().listInstances(feedName, "filterBy=STATUS:WAITING", null); + InstanceUtil.validateResponse(r, 4, 0, 0, 4, 0); + + //get all instances. + r = prism.getFeedHelper().listInstances(feedName, + "start=" + startTime + "&numResults=12", null); + InstanceUtil.validateResponse(r, 12, 1, 1, 4, 6); + + //use different statuses, filterBy among all instances. + r = prism.getFeedHelper().listInstances(feedName, + "start=" + startTime + "&filterBy=STATUS:KILLED", null); + InstanceUtil.validateResponse(r, 6, 0, 0, 0, 6); + r = prism.getFeedHelper().listInstances(feedName, + "start=" + startTime + "&filterBy=STATUS:SUSPENDED", null); + InstanceUtil.validateResponse(r, 1, 0, 1, 0, 0); + r = prism.getFeedHelper().listInstances(feedName, + "start=" + startTime + "&filterBy=STATUS:RUNNING", null); + InstanceUtil.validateResponse(r, 1, 1, 0, 0, 0); + r = prism.getFeedHelper().listInstances(feedName, + "start=" + startTime + "&filterBy=STATUS:WAITING", null); + InstanceUtil.validateResponse(r, 4, 0, 0, 4, 0); + + //use additional filters. + String sourceCluster = bundles[0].getClusterNames().get(0); + String clusterName = bundles[1].getClusterNames().get(0); + r = prism.getFeedHelper().listInstances(feedName, + "start=" + startTime + "&filterBy=CLUSTER:" + clusterName, null); + InstanceUtil.validateResponse(r, 10, 1, 1, 4, 4); + r = prism.getFeedHelper().listInstances(feedName, "start=" + startTime + "&numResults=12" + + "&filterBy=SOURCECLUSTER:" + sourceCluster, null); + InstanceUtil.validateResponse(r, 8, 1, 1, 0, 6); + r = prism.getFeedHelper().listInstances(feedName, + "filterBy=SOURCECLUSTER:" + sourceCluster, null); + InstanceUtil.validateResponse(r, 6, 1, 1, 0, 4); + } + + /** + * Test list feed instances using custom filter. Expecting list of feed instances which + * satisfy custom filters. + */ + @Test + public void testFeedCustomFilter() + throws URISyntaxException, IOException, AuthenticationException { + String params = "start=" + startTime + "&filterBy=status:RUNNING"; + InstancesResult r = prism.getFeedHelper().listInstances(feedName, params, null); + InstanceUtil.validateResponse(r, 1, 1, 0, 0, 0); + + params = "start=" + startTime + "&end=" + endTime + "&filterBy=status:RUNNING&offset=2"; + r = prism.getFeedHelper().listInstances(feedName, params, null); + InstanceUtil.validateSuccessWOInstances(r); + + params = "start=" + startTime + "&end=" + endTime + "&filterBy=status:WAITING"; + r = prism.getFeedHelper().listInstances(feedName, params, null); + InstanceUtil.validateResponse(r, 4, 0, 0, 4, 0); + + params = "start=" + startTime + "&end=" + TimeUtil.addMinsToTime(startTime, 41) + + "&filterBy=status:WAITING"; + r = prism.getFeedHelper().listInstances(feedName, params, null); + InstanceUtil.validateResponse(r, 3, 0, 0, 3, 0); + + params = "start=" + startTime + "&offset=1&numResults=1&filterBy=status:WAITING"; + r = prism.getFeedHelper().listInstances(feedName, params, null); + InstanceUtil.validateResponse(r, 1, 0, 0, 1, 0); + + params = "start=" + TimeUtil.addMinsToTime(startTime, 16) + "&offset=2&numResults=12"; + r = prism.getFeedHelper().listInstances(feedName, params, null); + InstanceUtil.validateResponse(r, 6, 0, 1, 3, 2); + + String sourceCluster = bundles[0].getClusterNames().get(0); + String clusterName = bundles[1].getClusterNames().get(0); + params = "start=" + startTime + "&filterBy=STATUS:KILLED,CLUSTER:"+ clusterName + + "&numResults=5&orderBy=startTime&sortOrder=desc"; + r = prism.getFeedHelper().listInstances(feedName, params, null); + InstanceUtil.validateResponse(r, 5, 0, 0, 0, 5); + + //should be ordered by a start time + SoftAssert softAssert = new SoftAssert(); + InstancesResult.Instance[] instances = r.getInstances(); + Date previousDate = new Date(); + for (InstancesResult.Instance instance : instances) { + Date current = instance.getStartTime(); + softAssert.assertNotNull(current, "Start time shouldn't be null for KILLED instance."); + softAssert.assertTrue(current.before(previousDate) || current.equals(previousDate), + "Wrong order. Current startTime :" + current + " Previous: " + previousDate); + previousDate = (Date) current.clone(); + } + softAssert.assertAll(); + + //missing 1st, 11th, 12th instances, all other instances should be retrieved. + params = "start=" + TimeUtil.addMinsToTime(startTime, 2) + "&offset=2"; + r = prism.getFeedHelper().listInstances(feedName, params, null); + InstanceUtil.validateResponse(r, 9, 0, 1, 3, 5); + + //missing the 1st, 11th, 12th instance, all instances which have progressed should be present: + //5 killed + 1 suspended, but numResults=5, so expecting 1 suspended and 4 killed instances. + params = "start=" + TimeUtil.addMinsToTime(startTime, 2) + "&filterBy=SOURCECLUSTER:" + + sourceCluster + "&offset=2&numResults=5"; + r = prism.getFeedHelper().listInstances(feedName, params, null); + InstanceUtil.validateResponse(r, 5, 0, 1, 0, 4); + } + + @AfterClass(alwaysRun = true) + public void tearDown() throws IOException { + removeBundles(); + cleanTestDirs(); + } +} http://git-wip-us.apache.org/repos/asf/incubator-falcon/blob/2bcc9f9f/falcon-regression/merlin/src/test/java/org/apache/falcon/regression/lineage/ListProcessInstancesTest.java ---------------------------------------------------------------------- diff --git a/falcon-regression/merlin/src/test/java/org/apache/falcon/regression/lineage/ListProcessInstancesTest.java b/falcon-regression/merlin/src/test/java/org/apache/falcon/regression/lineage/ListProcessInstancesTest.java new file mode 100644 index 0000000..3f5ce28 --- /dev/null +++ b/falcon-regression/merlin/src/test/java/org/apache/falcon/regression/lineage/ListProcessInstancesTest.java @@ -0,0 +1,358 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.falcon.regression.lineage; + +import org.apache.falcon.entity.v0.EntityType; +import org.apache.falcon.regression.core.bundle.Bundle; +import org.apache.falcon.regression.core.helpers.ColoHelper; +import org.apache.falcon.regression.core.response.InstancesResult; +import org.apache.falcon.regression.core.util.BundleUtil; +import org.apache.falcon.regression.core.util.InstanceUtil; +import org.apache.falcon.regression.core.util.OSUtil; +import org.apache.falcon.regression.core.util.OozieUtil; +import org.apache.falcon.regression.core.util.TimeUtil; +import org.apache.falcon.regression.testHelper.BaseTestClass; +import org.apache.log4j.Logger; +import org.apache.oozie.client.CoordinatorAction; +import org.apache.oozie.client.OozieClient; +import org.testng.Assert; +import org.testng.annotations.AfterClass; +import org.testng.annotations.AfterMethod; +import org.testng.annotations.BeforeClass; +import org.testng.annotations.BeforeMethod; +import org.testng.annotations.Test; +import org.testng.asserts.SoftAssert; + +import java.io.IOException; +import java.util.Date; + +/** + * Test list instances api for process. + */ +@Test(groups = "embedded") +public class ListProcessInstancesTest extends BaseTestClass { + private static final Logger LOGGER = Logger.getLogger(ListProcessInstancesTest.class); + private ColoHelper cluster = servers.get(0); + private OozieClient clusterOC = serverOC.get(0); + private String testDir = "/ListProcessInstancesTest"; + private String baseTestHDFSDir = baseHDFSDir + testDir; + private String aggregateWorkflowDir = baseTestHDFSDir + "/aggregator"; + private String sourcePath = baseTestHDFSDir + "/source"; + private String feedDataLocation = sourcePath + MINUTE_DATE_PATTERN; + private String startTime, endTime; + private String processName; + + @BeforeClass(alwaysRun = true) + public void setUp() throws IOException { + uploadDirToClusters(aggregateWorkflowDir, OSUtil.RESOURCES_OOZIE); + startTime = TimeUtil.getTimeWrtSystemTime(-55); + endTime = TimeUtil.getTimeWrtSystemTime(5); + LOGGER.info("Time range is between : " + startTime + " and " + endTime); + } + + @BeforeMethod(alwaysRun = true) + public void prepareData() throws Exception { + bundles[0] = BundleUtil.readELBundle(); + bundles[0] = new Bundle(bundles[0], servers.get(0)); + bundles[0].generateUniqueBundle(); + //prepare process + bundles[0].setProcessWorkflow(aggregateWorkflowDir); + bundles[0].setInputFeedDataPath(feedDataLocation); + bundles[0].setProcessValidity(startTime, endTime); + bundles[0].setProcessConcurrency(3); + bundles[0].submitAndScheduleProcess(); + processName = bundles[0].getProcessName(); + InstanceUtil.waitTillInstancesAreCreated(cluster, bundles[0].getProcessData(), 0); + //create data for processes to run and wait some time for instances to make progress + OozieUtil.createMissingDependencies(cluster, EntityType.PROCESS, processName, 0); + InstanceUtil.waitTillInstanceReachState(clusterOC, processName, 3, + CoordinatorAction.Status.RUNNING, EntityType.PROCESS, 3); + } + + @AfterMethod(alwaysRun = true) + public void tearDown() throws IOException { + removeBundles(); + } + + /** + * List process instances using orderBy - status, -startTime, -endTime params, expecting list of + * process instances in the right order. + */ + @Test + public void testProcessOrderBy() throws Exception { + SoftAssert softAssert = new SoftAssert(); + //orderBy startTime descending order + InstancesResult r = prism.getProcessHelper().listInstances(processName, + "orderBy=startTime&sortOrder=desc", null); + InstancesResult.Instance[] instances = r.getInstances(); + Date previousDate = new Date(); + for (InstancesResult.Instance instance : instances) { + Date current = instance.getStartTime(); + if (current != null) { //e.g if instance is WAITING it doesn't have start time + softAssert.assertTrue(current.before(previousDate) || current.equals(previousDate), + "Wrong order. Current startTime :" + current + " Previous: " + previousDate); + previousDate = (Date) current.clone(); + } + } + //suspend one instance and kill another one to create variety of statuses + r = prism.getProcessHelper().getProcessInstanceSuspend(processName, + "?start=" + TimeUtil.addMinsToTime(startTime, 4) + + "&end=" + TimeUtil.addMinsToTime(startTime, 8)); + InstanceUtil.validateResponse(r, 1, 0, 1, 0, 0); + r = prism.getProcessHelper().getProcessInstanceKill(processName, + "?start=" + startTime + "&end=" + TimeUtil.addMinsToTime(startTime, 3)); + InstanceUtil.validateResponse(r, 1, 0, 0, 0, 1); + //wait till instances status be stable + InstanceUtil.waitTillInstanceReachState(clusterOC, processName, 3, + CoordinatorAction.Status.RUNNING, EntityType.PROCESS, 3); + + //orderBy status ascending order + r = prism.getProcessHelper().listInstances(processName, + "start=" + startTime + "&numResults=12&orderBy=status&sortOrder=desc", null); + InstanceUtil.validateResponse(r, 12, 3, 1, 7, 1); + instances = r.getInstances(); + InstancesResult.WorkflowStatus previousStatus = InstancesResult.WorkflowStatus.WAITING; + for (InstancesResult.Instance instance : instances) { + InstancesResult.WorkflowStatus current = instance.getStatus(); + softAssert.assertTrue(current.compareTo(previousStatus) <= 0, + "Wrong order. Compared " + current + " and " + previousStatus + " statuses."); + previousStatus = current; + } + //only instances which already have finished have endTime + InstanceUtil.waitTillInstanceReachState(clusterOC, processName, 2, + CoordinatorAction.Status.SUCCEEDED, EntityType.PROCESS, 10); + //sort by end time, descending order + r = prism.getProcessHelper().listInstances(processName, + "start=" + startTime + "&numResults=12&orderBy=endTime&sortOrder=desc", null); + instances = r.getInstances(); + previousDate = new Date(); + for (InstancesResult.Instance instance : instances) { + Date current = instance.getEndTime(); + if (current != null) { //e.g if instance is WAITING it doesn't have end time + softAssert.assertTrue(current.before(previousDate) || current.equals(previousDate), + "Wrong order. Current startTime :" + current + " Previous: " + previousDate); + previousDate = (Date) current.clone(); + } + } + softAssert.assertAll(); + } + + /** + * List process instances using -offset and -numResults params expecting list of process + * instances to start at the right offset and give expected number of instances. + */ + @Test + public void testProcessOffsetNumResults() throws Exception { + //check default number. Should be 10. + InstancesResult r = prism.getProcessHelper().listInstances(processName, null, null); + InstanceUtil.validateResponse(r, 10, 1, 0, 9, 0); + + //change to 6 expected 6 + r = prism.getProcessHelper().listInstances(processName, "numResults=6", null); + InstanceUtil.validateResponse(r, 6, 0, 0, 6, 0); + + //use start option without numResults. 10 instances expected + r = prism.getProcessHelper().listInstances(processName, "start=" + startTime, null); + InstanceUtil.validateResponse(r, 10, 1, 0, 9, 0); + + //use start option with numResults value which is smaller then default. + r = prism.getProcessHelper().listInstances(processName, + "start=" + startTime + "&numResults=8", null); + InstanceUtil.validateResponse(r, 8, 0, 0, 8, 0); + + //use start option with numResults value greater then default. All 12 instances expected + r = prism.getProcessHelper().listInstances(processName, + "start=" + startTime + "&numResults=12", null); + InstanceUtil.validateResponse(r, 12, 3, 0, 9, 0); + + //get all instances + InstancesResult.Instance[] allInstances = r.getInstances(); + + //adding offset param into request. Expected (total number - offset) instances. + int offset = 3; + r = prism.getProcessHelper().listInstances(processName, + "start=" + startTime + "&offset=" + offset + "&numResults=12", null); + InstanceUtil.validateResponse(r, 9, 3, 0, 6, 0); + + //check that expected instances were retrieved + InstancesResult.Instance[] instances = r.getInstances(); + for (int i = 0; i < 9; i++) { + LOGGER.info("Comparing instances: " + instances[i] + " and " + allInstances[i + offset]); + Assert.assertTrue(instances[i].getInstance().equals(allInstances[i + offset].getInstance())); + } + //use different offset and numResults params in request + offset = 6; + r = prism.getProcessHelper().listInstances(processName, + "start=" + startTime + "&offset=" + offset + "&numResults=6", null); + InstanceUtil.validateResponse(r, 6, 3, 0, 3, 0); + + //check that expected instance are contained in response + instances = r.getInstances(); + for (int i = 0; i < 6; i++) { + LOGGER.info("Comparing instances: " + instances[i] + " and " + allInstances[i + offset]); + Assert.assertTrue(instances[i].getInstance().equals(allInstances[i + offset].getInstance())); + } + } + + /** + * List process instances using -filterBy param. Expecting list of process instances + * which have the given status. + */ + @Test + public void testProcessFilterBy() throws Exception { + //test with simple filters + InstancesResult r = prism.getProcessHelper().listInstances(processName, + "filterBy=STATUS:RUNNING", null); + //it gets 10 most recent instances, in our case only the oldest will be RUNNING + InstanceUtil.validateResponse(r, 1, 1, 0, 0, 0); + r = prism.getProcessHelper().listInstances(processName, "filterBy=STATUS:WAITING", null); + InstanceUtil.validateResponse(r, 9, 0, 0, 9, 0); + + //get all instances + r = prism.getProcessHelper().listInstances(processName, + "start=" + startTime + "&numResults=12", null); + InstanceUtil.validateResponse(r, 12, 3, 0, 9, 0); + + //suspend one instance and kill another to create variety of statuses + r = prism.getProcessHelper().getProcessInstanceSuspend(processName, + "?start=" + TimeUtil.addMinsToTime(startTime, 4) + + "&end=" + TimeUtil.addMinsToTime(startTime, 8)); + InstanceUtil.validateResponse(r, 1, 0, 1, 0, 0); + r = prism.getProcessHelper().getProcessInstanceKill(processName, + "?start=" + startTime + "&end=" + TimeUtil.addMinsToTime(startTime, 3)); + InstanceUtil.validateResponse(r, 1, 0, 0, 0, 1); + + //wait till new instances be RUNNING and total status count be stable + InstanceUtil.waitTillInstanceReachState(clusterOC, processName, 3, + CoordinatorAction.Status.RUNNING, EntityType.PROCESS, 3); + + //get all instances + r = prism.getProcessHelper().listInstances(processName, + "start=" + startTime + "&numResults=12", null); + InstanceUtil.validateResponse(r, 12, 3, 1, 7, 1); + + //use different statuses, filterBy among all instances + r = prism.getProcessHelper().listInstances(processName, + "start=" + startTime + "&filterBy=STATUS:KILLED", null); + InstanceUtil.validateResponse(r, 1, 0, 0, 0, 1); + r = prism.getProcessHelper().listInstances(processName, + "start=" + startTime + "&filterBy=STATUS:SUSPENDED", null); + InstanceUtil.validateResponse(r, 1, 0, 1, 0, 0); + r = prism.getProcessHelper().listInstances(processName, + "start=" + startTime + "&filterBy=STATUS:RUNNING", null); + InstanceUtil.validateResponse(r, 3, 3, 0, 0, 0); + r = prism.getProcessHelper().listInstances(processName, + "start=" + startTime + "&filterBy=STATUS:WAITING", null); + InstanceUtil.validateResponse(r, 7, 0, 0, 7, 0); + } + + /** + * List process instances using start/end filter. Expecting list of process instances which + * satisfy start and end filters. + */ + @Test + public void testProcessStartEnd() throws Exception { + InstancesResult r = prism.getProcessHelper().listInstances(processName, + "start=" + startTime + "&end=" + endTime, null); + InstanceUtil.validateResponse(r, 10, 1, 0, 9, 0); + + //without params, default start/end should be used + r = prism.getProcessHelper().listInstances(processName, null, null); + InstanceUtil.validateResponse(r, 10, 1, 0, 9, 0); + + //increasing -start, -end stays the same + r = prism.getProcessHelper().listInstances(processName, + "start=" + startTime + "&end=" + TimeUtil.addMinsToTime(startTime, 29), null); + InstanceUtil.validateResponse(r, 6, 3, 0, 3, 0); + r = prism.getProcessHelper().listInstances(processName, + "start=" + TimeUtil.addMinsToTime(startTime, 4) + + "&end=" + TimeUtil.addMinsToTime(startTime, 29), null); + InstanceUtil.validateResponse(r, 5, 2, 0, 3, 0); + r = prism.getProcessHelper().listInstances(processName, + "start=" + TimeUtil.addMinsToTime(startTime, 9) + + "&end=" + TimeUtil.addMinsToTime(startTime, 29), null); + InstanceUtil.validateResponse(r, 4, 1, 0, 3, 0); + + //one instance between start/end + r = prism.getProcessHelper().listInstances(processName, + "start=" + TimeUtil.addMinsToTime(startTime, 12) + + "&end=" + TimeUtil.addMinsToTime(startTime, 16), null); + InstanceUtil.validateResponse(r, 1, 0, 0, 1, 0); + + //only start, actual startTime, should get 10 most recent instances + r = prism.getProcessHelper().listInstances(processName, "start=" + startTime, null); + InstanceUtil.validateResponse(r, 10, 1, 0, 9, 0); + + //only start, greater then actual startTime + r = prism.getProcessHelper().listInstances(processName, + "start=" + TimeUtil.addMinsToTime(startTime, 19), null); + InstanceUtil.validateResponse(r, 8, 0, 0, 8, 0); + + //only end, 1 instance + r = prism.getProcessHelper().listInstances(processName, + "end=" + TimeUtil.addMinsToTime(startTime, 4), null); + InstanceUtil.validateResponse(r, 1, 1, 0, 0, 0); + + //only end, 10 the most recent instances + r = prism.getProcessHelper().listInstances(processName, + "end=" + endTime, null); + InstanceUtil.validateResponse(r, 10, 1, 0, 9, 0); + + //only end, middle value + r = prism.getProcessHelper().listInstances(processName, + "end=" + TimeUtil.addMinsToTime(endTime, -31), null); + InstanceUtil.validateResponse(r, 6, 3, 0, 3, 0); + } + + /** + * Test list process instances using custom filter. Expecting list of process instances which + * satisfy custom filters. + */ + @Test + public void testProcessCustomFilter() throws Exception { + String params = "start=" + startTime + "&end=" + TimeUtil.addMinsToTime(startTime, 26) + + "&filterBy=status:RUNNING"; + InstancesResult r = prism.getProcessHelper().listInstances(processName, params, null); + InstanceUtil.validateResponse(r, 3, 3, 0, 0, 0); + + params = "start=" + startTime + "&end=" + TimeUtil.addMinsToTime(startTime, 21) + + "&filterBy=status:WAITING"; + r = prism.getProcessHelper().listInstances(processName, params, null); + InstanceUtil.validateResponse(r, 2, 0, 0, 2, 0); + + params = "start=" + startTime + "&filterBy=status:WAITING&offset=2&numResult=12"; + r = prism.getProcessHelper().listInstances(processName, params, null); + InstanceUtil.validateResponse(r, 7, 0, 0, 7, 0); + + params = "start=" + TimeUtil.addMinsToTime(startTime, 16) + "&filterBy=STATUS:WAITING,CLUSTER:" + + bundles[0].getClusterNames().get(0) + "&offset=4&numResult=7&sortOrder=desc"; + r = prism.getProcessHelper().listInstances(processName, params, null); + InstanceUtil.validateResponse(r, 4, 0, 0, 4, 0); + + params = "start=" + TimeUtil.addMinsToTime(startTime, 7) + "&filterBy=CLUSTER:" + + bundles[0].getClusterNames().get(0) + "&offset=4&numResult=7&sortOrder=asc"; + r = prism.getProcessHelper().listInstances(processName, params, null); + InstanceUtil.validateResponse(r, 6, 1, 0, 5, 0); + } + + @AfterClass(alwaysRun = true) + public void cleanUp() throws IOException { + cleanTestDirs(); + } +}