http://git-wip-us.apache.org/repos/asf/falcon/blob/8e49379d/falcon-regression/merlin/src/main/java/org/apache/falcon/regression/ui/search/SearchPage.java ---------------------------------------------------------------------- diff --git a/falcon-regression/merlin/src/main/java/org/apache/falcon/regression/ui/search/SearchPage.java b/falcon-regression/merlin/src/main/java/org/apache/falcon/regression/ui/search/SearchPage.java deleted file mode 100644 index 15d50b0..0000000 --- a/falcon-regression/merlin/src/main/java/org/apache/falcon/regression/ui/search/SearchPage.java +++ /dev/null @@ -1,456 +0,0 @@ -/** - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ - -package org.apache.falcon.regression.ui.search; - -import org.apache.commons.lang.StringUtils; -import org.apache.falcon.entity.v0.Entity; -import org.apache.falcon.regression.Entities.FeedMerlin; -import org.apache.falcon.regression.Entities.ProcessMerlin; -import org.apache.falcon.regression.core.util.UIAssert; -import org.apache.log4j.Logger; -import org.openqa.selenium.By; -import org.openqa.selenium.Keys; -import org.openqa.selenium.WebDriver; -import org.openqa.selenium.WebElement; -import org.openqa.selenium.support.FindBy; -import org.openqa.selenium.support.FindBys; -import org.openqa.selenium.support.PageFactory; -import org.testng.Assert; - -import java.util.ArrayList; -import java.util.Arrays; -import java.util.Collections; -import java.util.EnumSet; -import java.util.List; -import java.util.Set; - -/** Page object for the Search Page. */ -public class SearchPage extends AbstractSearchPage { - - private static final String CLASS_OF_SELECTED_ROW = "rowSelected"; - - private static final Logger LOGGER = Logger.getLogger(SearchPage.class); - - public SearchPage(WebDriver driver) { - super(driver); - } - - - @FindBys({ - @FindBy(className = "mainUIView"), - @FindBy(className = "searchBoxContainer") - }) - private WebElement searchBlock; - - @FindBys({ - @FindBy(className = "mainUIView"), - @FindBy(className = "dashboardBox") - }) - private WebElement resultBlock; - - @FindBys({ - @FindBy(className = "mainUIView"), - @FindBy(className = "dashboardBox"), - @FindBy(tagName = "thead") - }) - private WebElement resultHeader; - - @FindBys({ - @FindBy(className = "mainUIView"), - @FindBy(className = "dashboardBox"), - @FindBy(className = "buttonRow") - }) - private WebElement resultButtons; - - private List<WebElement> getSearchResultElements() { - return resultBlock.findElements(By.className("entityRow")); - } - - public List<SearchResult> getSearchResults() { - List<SearchResult> searchResults = new ArrayList<>(); - for (WebElement oneResultElement : getSearchResultElements()) { - final List<WebElement> resultParts = oneResultElement.findElements(By.tagName("td")); - final String entityName = resultParts.get(1).getText(); - final SearchResult searchResult = SearchResult.create(entityName); - - final String[] allClasses = oneResultElement.getAttribute("class").split(" "); - if (Arrays.asList(allClasses).contains(CLASS_OF_SELECTED_ROW)) { - searchResult.withChecked(true); - } - - final String tags = resultParts.get(2).getText(); - searchResult.withTags(tags); - - final String clusterName = resultParts.get(3).getText(); - searchResult.withClusterName(clusterName); - - final String type = resultParts.get(4).getText(); - searchResult.withType(type); - - final String status = resultParts.get(5).getText(); - searchResult.withStatus(status); - searchResults.add(searchResult); - } - return searchResults; - } - - - public EntityPage openEntityPage(String entityName) { - return click(doSearch(entityName).get(0)); - } - - public EntityPage click(SearchResult result) { - LOGGER.info("attempting to click: " + result + " on search page."); - for (WebElement oneResultElement : getSearchResultElements()) { - final List<WebElement> resultParts = oneResultElement.findElements(By.tagName("td")); - final WebElement entityNameElement = resultParts.get(1); - final String entityName = entityNameElement.getText(); - if (entityName.equals(result.getEntityName())) { - entityNameElement.findElement(By.tagName("button")).click(); - return PageFactory.initElements(driver, EntityPage.class); - } - } - return null; - } - - @Override - public void checkPage() { - UIAssert.assertDisplayed(searchBlock, "Cluster box"); - } - - private WebElement getSearchBox() { - return searchBlock.findElement(By.className("input")); - } - - public List<SearchResult> doSearch(String searchString) { - clearSearch(); - return appendAndSearch(searchString); - } - - public List<SearchResult> appendAndSearch(String appendedPart) { - for(String queryParam : appendedPart.split("\\s+")) { - focusOnSearchBox(); - getSearchBox().sendKeys(queryParam); - getSearchBox().sendKeys(Keys.ENTER); - } - waitForAngularToFinish(); - if (resultBlock.isDisplayed()) { - return getSearchResults(); - } else { - return Collections.emptyList(); - } - - } - - public SearchQuery getSearchQuery() { - return new SearchQuery(searchBlock); - } - - public void clearSearch() { - focusOnSearchBox(); - getSearchBox().clear(); - SearchQuery query = getSearchQuery(); - for (int i = 0; i < query.getElementsNumber(); i++) { - removeLastParam(); - } - } - - public void removeLastParam() { - focusOnSearchBox(); - getSearchBox().sendKeys(Keys.BACK_SPACE); - getSearchBox().sendKeys(Keys.BACK_SPACE); - } - - private void focusOnSearchBox() { - driver.findElement(By.className("tags")).click(); - } - - public void checkNoResult() { - UIAssert.assertNotDisplayed(resultBlock, "Search result block"); - } - - public void selectRow(int row) { - changeRowClickedStatus(row, true); - } - - public void deselectRow(int row) { - changeRowClickedStatus(row, false); - } - - private void changeRowClickedStatus(int row, boolean checked) { - WebElement checkboxBlock = resultBlock.findElements(By.className("entityRow")).get(row - 1); - if (checked != checkboxBlock.getAttribute("class").contains(CLASS_OF_SELECTED_ROW)) { - checkboxBlock.findElement(By.xpath("./td/input")).click(); - } - } - - public void clickSelectAll() { - resultBlock.findElement(By.xpath(".//input[@ng-model='selectedAll']")).click(); - } - - - /** Class representing search query displayed in the search box. */ - public static final class SearchQuery { - private WebElement searchBlock; - private String name; - private String type; - private int elementsNumber; - private final List<String> tags = new ArrayList<>(); - private static final Logger LOGGER = Logger.getLogger(SearchQuery.class); - - public SearchQuery(WebElement searchBlock) { - this.searchBlock = searchBlock; - updateElements(); - } - - private SearchQuery updateElements() { - name = null; - tags.clear(); - final WebElement queryGroup = searchBlock.findElement(By.className("tag-list")); - final List<WebElement> queryParts = queryGroup.findElements(By.tagName("li")); - elementsNumber = queryParts.size(); - for (WebElement queryPart : queryParts) { - final WebElement queryLabel = queryPart.findElement(By.tagName("strong")); - final String queryText = queryPart.findElement(By.tagName("span")).getText(); - switch (queryLabel.getText().trim()) { - case "Name:": - if (name != null) { - LOGGER.warn(String.format("NAME block is already added: '%s' => '%s'", - name, queryText)); - } - name = queryText; - break; - case "Tag:": - tags.add(queryText); - break; - default: - } - } - return this; - } - - - public String getName() { - return name; - } - - public List<String> getTags() { - return tags; - } - - public int getElementsNumber() { - return elementsNumber; - } - - /** - * Delete element by index (1, 2, 3,..). - * @param index of element in search query. - * @return true if deletion was successful - */ - public boolean deleteByIndex(int index) { - if (index > elementsNumber || index < 1) { - LOGGER.warn("There is no element with index=" + index); - return false; - } - int oldElementsNumber = elementsNumber; - final WebElement queryGroup = searchBlock.findElement(By.className("tag-list")); - final List<WebElement> queryParts = queryGroup.findElements(By.tagName("li")); - queryParts.get(index - 1).findElement(By.className("remove-button")).click(); - this.updateElements(); - boolean result = oldElementsNumber == elementsNumber + 1; - LOGGER.info(String.format( - "Element with index=%d was%s deleted", index, result ? "" : "n't")); - return result; - } - - public boolean deleteLast() { - return deleteByIndex(elementsNumber); - } - } - - public Set<Button> getButtons(boolean active) { - List<WebElement> buttons = resultBlock.findElement(By.className("buttonsRow")) - .findElements(By.className("btn")); - Set<Button> result = EnumSet.noneOf(Button.class); - for (WebElement button : buttons) { - if ((button.getAttribute("disabled") == null) == active) { - result.add(Button.valueOf(button.getText())); - } - } - return result; - } - - public void clickButton(Button button) { - resultBlock.findElement(By.className("buttonsRow")) - .findElements(By.className("btn")).get(button.ordinal()).click(); - waitForAngularToFinish(); - } - - /** - * Buttons available for entities in result box. - */ - public enum Button { - Schedule, - Resume, - Suspend, - Edit, - Copy, - Delete, - XML - } - - /** Class representing search result displayed on the entity table page. */ - public static final class SearchResult { - private boolean isChecked = false; - private String entityName; - private String tags = ""; - private String clusterName; - private String type; - private EntityStatus status; - - public static SearchResult create(String entityName) { - return new SearchResult(entityName); - } - - public SearchResult withChecked(boolean pIsChecked) { - this.isChecked = pIsChecked; - return this; - } - - private SearchResult(String entityName) { - this.entityName = entityName; - } - - public SearchResult withTags(String pTags) { - this.tags = pTags; - return this; - } - - public SearchResult withClusterName(String pClusterName) { - this.clusterName = pClusterName; - return this; - } - - public SearchResult withType(String pType) { - this.type = pType; - return this; - } - - public SearchResult withStatus(String pStatus) { - this.status = EntityStatus.valueOf(pStatus); - return this; - } - - public boolean isChecked() { - return isChecked; - } - - public String getEntityName() { - return entityName; - } - - public String getTags() { - return tags; - } - - public String getClusterName() { - Assert.assertFalse(clusterName.contains(","), "getClusterName() called" - + " in multi-cluster setup: " + clusterName + ", maybe use getClusterNames()"); - return clusterName; - } - - public List<String> getClusterNames() { - return Arrays.asList(clusterName.split(",")); - } - - - public String getType() { - return type; - } - - public EntityStatus getStatus() { - return status; - } - - @Override - public String toString() { - return "SearchResult{" - + "isChecked=" + isChecked - + ", entityName='" + entityName + '\'' - + ", tags='" + tags + '\'' - + ", clusterName='" + clusterName + '\'' - + ", type='" + type + '\'' - + ", status='" + status + '\'' - + '}'; - } - - public static void assertEqual(List<SearchResult> searchResults, - List<Entity> expectedEntities, String errorMessage) { - Assert.assertEquals(searchResults.size(), expectedEntities.size(), errorMessage - + "(Length of lists don't match, searchResults: " + searchResults - + " expectedEntities: " + expectedEntities + ")"); - for (Entity entity : expectedEntities) { - boolean found = false; - for (SearchResult result : searchResults) { - //entities are same if they have same name & type - if (entity.getName().equals(result.entityName)) { - //entity type in SearchResults has a different meaning - //so, not comparing entity types - - //equality of cluster names - List<String> entityClusters = null; - switch (entity.getEntityType()) { - case FEED: - final FeedMerlin feed = (FeedMerlin) entity; - entityClusters = feed.getClusterNames(); - // tags equality check - Assert.assertEquals(result.getTags(), - StringUtils.trimToEmpty(feed.getTags()), - errorMessage + "(tags mismatch: " + result.entityName - + " & " + entity.toShortString() + ")"); - break; - case PROCESS: - final ProcessMerlin process = (ProcessMerlin) entity; - entityClusters = process.getClusterNames(); - // tags equality check - Assert.assertEquals(result.getTags(), - StringUtils.trimToEmpty(process.getTags()), - errorMessage + "(tags mismatch: " + result.entityName - + " & " + entity.toShortString() + ")"); - break; - default: - Assert.fail("Cluster entity is unexpected: " + entity); - break; - } - Collections.sort(entityClusters); - final List<String> actualClusters = result.getClusterNames(); - Collections.sort(actualClusters); - Assert.assertEquals(actualClusters, entityClusters, errorMessage - + "(cluster names mismatch: " + result + " " + entity + ")"); - found = true; - } - } - Assert.assertTrue(found, - "Entity: " + entity.toShortString() + " not found in: " + searchResults); - } - } - - } - -}
http://git-wip-us.apache.org/repos/asf/falcon/blob/8e49379d/falcon-regression/merlin/src/main/resources/errorMapping.properties ---------------------------------------------------------------------- diff --git a/falcon-regression/merlin/src/main/resources/errorMapping.properties b/falcon-regression/merlin/src/main/resources/errorMapping.properties deleted file mode 100644 index a6adf51..0000000 --- a/falcon-regression/merlin/src/main/resources/errorMapping.properties +++ /dev/null @@ -1,26 +0,0 @@ -## -# Licensed to the Apache Software Foundation (ASF) under one -# or more contributor license agreements. See the NOTICE file -# distributed with this work for additional information -# regarding copyright ownership. The ASF licenses this file -# to you under the Apache License, Version 2.0 (the -# "License"); you may not use this file except in compliance -# with the License. You may obtain a copy of the License at -# -# http://www.apache.org/licenses/LICENSE-2.0 -# -# Unless required by applicable law or agreed to in writing, software -# distributed under the License is distributed on an "AS IS" BASIS, -# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. -# See the License for the specific language governing permissions and -# limitations under the License. -# - -NoRetrySpecified.xml=javax.xml.bind.UnmarshalException - with linked exception:[org.xml.sax.SAXParseException; lineNumber: 54; columnNumber: 67; cvc-complex-type.2.4.a: Invalid content was found starting with element 'late-process'. One of '{retry}' is expected.] -noConcurrencyParam.xml=javax.xml.bind.UnmarshalException - with linked exception:[org.xml.sax.SAXParseException; lineNumber: 29; columnNumber: 16; cvc-complex-type.2.4.a: Invalid content was found starting with element 'execution'. One of '{concurrency}' is expected.] -noExecutionSpecified.xml=javax.xml.bind.UnmarshalException - with linked exception:[org.xml.sax.SAXParseException; lineNumber: 29; columnNumber: 16; cvc-complex-type.2.4.a: Invalid content was found starting with element 'frequency'. One of '{execution}' is expected.] -NoWorkflowParams.xml=javax.xml.bind.UnmarshalException - with linked exception:[org.xml.sax.SAXParseException; lineNumber: 52; columnNumber: 71; cvc-complex-type.2.4.a: Invalid content was found starting with element 'retry'. One of '{workflow}' is expected.] -process-invalid.xml=javax.xml.bind.UnmarshalException - with linked exception:[org.xml.sax.SAXParseException; lineNumber: 2; columnNumber: 72; cvc-elt.1: Cannot find the declaration of element 'Process'.] -inValid01_sameName.xml=inValid01_sameName already exists -inValid02_sameName.xml=inValid02_sameName already exists - http://git-wip-us.apache.org/repos/asf/falcon/blob/8e49379d/falcon-regression/merlin/src/main/resources/log4testng.properties ---------------------------------------------------------------------- diff --git a/falcon-regression/merlin/src/main/resources/log4testng.properties b/falcon-regression/merlin/src/main/resources/log4testng.properties deleted file mode 100644 index 34b4bde..0000000 --- a/falcon-regression/merlin/src/main/resources/log4testng.properties +++ /dev/null @@ -1,29 +0,0 @@ -## -# Licensed to the Apache Software Foundation (ASF) under one -# or more contributor license agreements. See the NOTICE file -# distributed with this work for additional information -# regarding copyright ownership. The ASF licenses this file -# to you under the Apache License, Version 2.0 (the -# "License"); you may not use this file except in compliance -# with the License. You may obtain a copy of the License at -# -# http://www.apache.org/licenses/LICENSE-2.0 -# -# Unless required by applicable law or agreed to in writing, software -# distributed under the License is distributed on an "AS IS" BASIS, -# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. -# See the License for the specific language governing permissions and -# limitations under the License. -# -# log4testng will log its own behavior (generally used for debugging this package only). - log4testng.debug=false - - # Specifies the root Loggers logging level. Will log DEBUG level and above - log4testng.rootLogger=DEBUG - - # The org.testng.reporters.EmailableReporter Logger will log TRACE level and above - log4testng.logger.org.testng.reporters.EmailableReporter=TRACE - - # All Logger in packages below org.testng will log WARN level and above - log4testng.logger.org.testng=INFO - http://git-wip-us.apache.org/repos/asf/falcon/blob/8e49379d/falcon-regression/merlin/src/test/java/org/apache/falcon/regression/AuthorizationTest.java ---------------------------------------------------------------------- diff --git a/falcon-regression/merlin/src/test/java/org/apache/falcon/regression/AuthorizationTest.java b/falcon-regression/merlin/src/test/java/org/apache/falcon/regression/AuthorizationTest.java deleted file mode 100644 index 714a21f..0000000 --- a/falcon-regression/merlin/src/test/java/org/apache/falcon/regression/AuthorizationTest.java +++ /dev/null @@ -1,772 +0,0 @@ -/** - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ - -package org.apache.falcon.regression; - -import org.apache.commons.httpclient.HttpStatus; -import org.apache.falcon.regression.Entities.FeedMerlin; -import org.apache.falcon.regression.Entities.ProcessMerlin; -import org.apache.falcon.regression.core.bundle.Bundle; -import org.apache.falcon.regression.core.enumsAndConstants.MerlinConstants; -import org.apache.falcon.entity.v0.EntityType; -import org.apache.falcon.entity.v0.Frequency; -import org.apache.falcon.regression.core.helpers.ColoHelper; -import org.apache.falcon.regression.core.response.ServiceResponse; -import org.apache.falcon.regression.core.util.AssertUtil; -import org.apache.falcon.regression.core.util.BundleUtil; -import org.apache.falcon.regression.core.util.HadoopUtil; -import org.apache.falcon.regression.core.util.InstanceUtil; -import org.apache.falcon.regression.core.util.OSUtil; -import org.apache.falcon.regression.core.util.OozieUtil; -import org.apache.falcon.regression.core.util.TimeUtil; -import org.apache.falcon.regression.core.util.Util; -import org.apache.falcon.regression.testHelper.BaseTestClass; -import org.apache.falcon.resource.InstancesResult; -import org.apache.hadoop.fs.FileSystem; -import org.apache.hadoop.security.authentication.client.AuthenticationException; -import org.apache.oozie.client.BundleJob; -import org.apache.oozie.client.CoordinatorAction; -import org.apache.oozie.client.CoordinatorJob; -import org.apache.oozie.client.Job; -import org.apache.oozie.client.OozieClient; -import org.apache.oozie.client.OozieClientException; -import org.testng.Assert; -import org.testng.annotations.AfterMethod; -import org.testng.annotations.BeforeClass; -import org.testng.annotations.BeforeMethod; -import org.testng.annotations.Test; -import org.apache.log4j.Logger; - -import javax.xml.bind.JAXBException; -import java.io.IOException; -import java.net.URISyntaxException; -import java.util.List; - -/** - * test for Authorization in falcon . - */ -@Test(groups = "embedded") -public class AuthorizationTest extends BaseTestClass { - private static final Logger LOGGER = Logger.getLogger(AuthorizationTest.class); - - private ColoHelper cluster = servers.get(0); - private FileSystem clusterFS = serverFS.get(0); - private OozieClient clusterOC = serverOC.get(0); - private String baseTestDir = cleanAndGetTestDir(); - private String aggregateWorkflowDir = baseTestDir + "/aggregator"; - private String feedInputPath = baseTestDir + "/input" + MINUTE_DATE_PATTERN; - - @BeforeClass(alwaysRun = true) - public void uploadWorkflow() throws Exception { - HadoopUtil.uploadDir(clusterFS, aggregateWorkflowDir, OSUtil.RESOURCES_OOZIE); - } - - @BeforeMethod(alwaysRun = true) - public void setup() throws Exception { - Bundle bundle = BundleUtil.readELBundle(); - bundles[0] = new Bundle(bundle, cluster); - bundles[0].generateUniqueBundle(this); - bundles[0].setProcessWorkflow(aggregateWorkflowDir); - } - - /** - * U2Delete test cases. - */ - //disabled since, falcon does not have authorization https://issues.apache - // .org/jira/browse/FALCON-388 - @Test(enabled = false) - public void u1SubmitU2DeleteCluster() throws Exception { - bundles[0].submitClusters(prism); - final ServiceResponse serviceResponse = prism.getClusterHelper().delete( - bundles[0].getClusters().get(0), MerlinConstants.USER2_NAME); - AssertUtil.assertFailedWithStatus(serviceResponse, HttpStatus.SC_BAD_REQUEST, - "Entity submitted by first user should not be deletable by second user"); - } - - //disabled since, falcon does not have authorization https://issues.apache - // .org/jira/browse/FALCON-388 - @Test(enabled = false) - public void u1SubmitU2DeleteProcess() throws Exception { - bundles[0].submitClusters(prism); - bundles[0].submitProcess(true); - final ServiceResponse serviceResponse = prism.getProcessHelper().delete( - bundles[0].getProcessData(), MerlinConstants.USER2_NAME); - AssertUtil.assertFailedWithStatus(serviceResponse, HttpStatus.SC_BAD_REQUEST, - "Entity submitted by first user should not be deletable by second user"); - } - - //disabled since, falcon does not have authorization https://issues.apache - // .org/jira/browse/FALCON-388 - @Test(enabled = false) - public void u1SubmitU2DeleteFeed() throws Exception { - bundles[0].submitClusters(prism); - bundles[0].submitFeed(); - final ServiceResponse serviceResponse = prism.getFeedHelper().delete( - bundles[0].getDataSets().get(0), MerlinConstants.USER2_NAME); - AssertUtil.assertFailedWithStatus(serviceResponse, HttpStatus.SC_BAD_REQUEST, - "Entity submitted by first user should not be deletable by second user"); - } - - //disabled since, falcon does not have authorization https://issues.apache - // .org/jira/browse/FALCON-388 - @Test(enabled = false) - public void u1ScheduleU2DeleteProcess() - throws Exception { - //submit, schedule process by U1 - bundles[0].submitFeedsScheduleProcess(prism); - AssertUtil.checkStatus(clusterOC, EntityType.PROCESS, bundles[0].getProcessData(), - Job.Status.RUNNING); - //try to delete process by U2 - final ServiceResponse serviceResponse = prism.getProcessHelper() - .delete(bundles[0].getProcessData(), MerlinConstants.USER2_NAME); - AssertUtil.assertFailedWithStatus(serviceResponse, HttpStatus.SC_BAD_REQUEST, - "Process scheduled by first user should not be deleted by second user"); - } - - //disabled since, falcon does not have authorization https://issues.apache - // .org/jira/browse/FALCON-388 - @Test(enabled = false) - public void u1ScheduleU2DeleteFeed() throws Exception { - String feed = bundles[0].getInputFeedFromBundle(); - //submit, schedule feed by U1 - bundles[0].submitClusters(prism); - AssertUtil.assertSucceeded(prism.getFeedHelper().submitAndSchedule(feed)); - AssertUtil.checkStatus(clusterOC, EntityType.FEED, feed, Job.Status.RUNNING); - //delete feed by U2 - final ServiceResponse serviceResponse = prism.getFeedHelper().delete(feed, MerlinConstants.USER2_NAME); - AssertUtil.assertFailedWithStatus(serviceResponse, HttpStatus.SC_BAD_REQUEST, - "Feed scheduled by first user should not be deleted by second user"); - } - - //disabled since, falcon does not have authorization https://issues.apache - // .org/jira/browse/FALCON-388 - @Test(enabled = false) - public void u1SuspendU2DeleteProcess() throws Exception { - //submit, schedule, suspend process by U1 - bundles[0].submitFeedsScheduleProcess(prism); - AssertUtil.checkStatus(clusterOC, EntityType.PROCESS, bundles[0].getProcessData(), - Job.Status.RUNNING); - AssertUtil.assertSucceeded(prism.getProcessHelper().suspend(bundles[0].getProcessData())); - AssertUtil.checkStatus(clusterOC, EntityType.PROCESS, bundles[0].getProcessData(), - Job.Status.SUSPENDED); - //try to delete process by U2 - final ServiceResponse serviceResponse = prism.getProcessHelper() - .delete(bundles[0].getProcessData(), MerlinConstants.USER2_NAME); - AssertUtil.assertFailedWithStatus(serviceResponse, HttpStatus.SC_BAD_REQUEST, - "Process suspended by first user should not be deleted by second user"); - } - - //disabled since, falcon does not have authorization https://issues.apache - // .org/jira/browse/FALCON-388 - @Test(enabled = false) - public void u1SuspendU2DeleteFeed() throws Exception { - String feed = bundles[0].getInputFeedFromBundle(); - //submit, schedule, suspend feed by U1 - bundles[0].submitClusters(prism); - AssertUtil.assertSucceeded(prism.getFeedHelper().submitAndSchedule(feed)); - AssertUtil.assertSucceeded(prism.getFeedHelper().suspend(feed)); - AssertUtil.checkStatus(clusterOC, EntityType.FEED, feed, Job.Status.SUSPENDED); - //delete feed by U2 - final ServiceResponse serviceResponse = prism.getFeedHelper() - .delete(feed, MerlinConstants.USER2_NAME); - AssertUtil.assertFailedWithStatus(serviceResponse, HttpStatus.SC_BAD_REQUEST, - "Feed scheduled by first user should not be deleted by second user"); - } - - /** - * U2Suspend test cases. - */ - //disabled since, falcon does not have authorization https://issues.apache - // .org/jira/browse/FALCON-388 - @Test(enabled = false) - public void u1ScheduleU2SuspendFeed() throws Exception { - String feed = bundles[0].getInputFeedFromBundle(); - //submit, schedule by U1 - bundles[0].submitClusters(prism); - AssertUtil.assertSucceeded(prism.getFeedHelper().submitAndSchedule(feed)); - AssertUtil.checkStatus(clusterOC, EntityType.FEED, feed, Job.Status.RUNNING); - //try to suspend by U2 - final ServiceResponse serviceResponse = prism.getFeedHelper() - .suspend(feed, MerlinConstants.USER2_NAME); - AssertUtil.assertFailedWithStatus(serviceResponse, HttpStatus.SC_BAD_REQUEST, - "Feed scheduled by first user should not be suspended by second user"); - } - - //disabled since, falcon does not have authorization https://issues.apache - // .org/jira/browse/FALCON-388 - @Test(enabled = false) - public void u1ScheduleU2SuspendProcess() throws Exception { - bundles[0].submitFeedsScheduleProcess(prism); - AssertUtil.checkStatus(clusterOC, EntityType.PROCESS, bundles[0].getProcessData(), - Job.Status.RUNNING); - //try to suspend process by U2 - final ServiceResponse serviceResponse = prism.getProcessHelper() - .suspend(bundles[0].getProcessData(), MerlinConstants.USER2_NAME); - AssertUtil.assertFailedWithStatus(serviceResponse, HttpStatus.SC_BAD_REQUEST, - "Process scheduled by first user should not be suspended by second user"); - } - - /** - * U2Resume test cases. - */ - //disabled since, falcon does not have authorization https://issues.apache - // .org/jira/browse/FALCON-388 - @Test(enabled = false) - public void u1SuspendU2ResumeFeed() throws Exception { - String feed = bundles[0].getInputFeedFromBundle(); - //submit, schedule and then suspend feed by User1 - bundles[0].submitClusters(prism); - AssertUtil.assertSucceeded(prism.getFeedHelper().submitAndSchedule(feed)); - AssertUtil.assertSucceeded(prism.getFeedHelper().suspend(feed)); - AssertUtil.checkStatus(clusterOC, EntityType.FEED, feed, Job.Status.SUSPENDED); - //try to resume feed by User2 - final ServiceResponse serviceResponse = prism.getFeedHelper() - .resume(feed, MerlinConstants.USER2_NAME); - AssertUtil.assertFailedWithStatus(serviceResponse, HttpStatus.SC_BAD_REQUEST, - "Feed suspended by first user should not be resumed by second user"); - } - - //disabled since, falcon does not have authorization https://issues.apache - // .org/jira/browse/FALCON-388 - @Test(enabled = false) - public void u1SuspendU2ResumeProcess() throws Exception { - //submit, schedule, suspend process by U1 - bundles[0].submitFeedsScheduleProcess(prism); - AssertUtil.assertSucceeded(prism.getProcessHelper().suspend(bundles[0].getProcessData())); - AssertUtil.checkStatus(clusterOC, EntityType.PROCESS, bundles[0].getProcessData(), - Job.Status.SUSPENDED); - //try to resume process by U2 - final ServiceResponse serviceResponse = prism.getProcessHelper() - .resume(bundles[0].getProcessData(), MerlinConstants.USER2_NAME); - AssertUtil.assertFailedWithStatus(serviceResponse, HttpStatus.SC_BAD_REQUEST, - "Process suspended by first user should not be resumed by second user"); - } - - //disabled since, falcon does not have authorization https://issues.apache - // .org/jira/browse/FALCON-388 - @Test(enabled = false) - public void u1SuspendU2ResumeProcessInstances() throws Exception { - String startTime = TimeUtil.getTimeWrtSystemTime(0); - String endTime = TimeUtil.addMinsToTime(startTime, 5); - String midTime = TimeUtil.addMinsToTime(startTime, 2); - LOGGER.info("Start time: " + startTime + "\tEnd time: " + endTime); - - //prepare process definition - bundles[0].setProcessValidity(startTime, endTime); - bundles[0].setProcessPeriodicity(1, Frequency.TimeUnit.minutes); - bundles[0].setProcessConcurrency(5); - bundles[0].setInputFeedPeriodicity(1, Frequency.TimeUnit.minutes); - bundles[0].setInputFeedDataPath(feedInputPath); - bundles[0].setProcessInput("now(0,0)", "now(0,4)"); - - //provide necessary data for first 3 instances to run - LOGGER.info("Creating necessary data..."); - String prefix = bundles[0].getFeedDataPathPrefix(); - HadoopUtil.deleteDirIfExists(prefix.substring(1), clusterFS); - List<String> dataDates = TimeUtil.getMinuteDatesOnEitherSide( - TimeUtil.addMinsToTime(startTime, -2), endTime, 0); - HadoopUtil.flattenAndPutDataInFolder(clusterFS, OSUtil.NORMAL_INPUT, prefix, dataDates); - - //submit, schedule process by U1 - LOGGER.info("Process data: " + Util.prettyPrintXml(bundles[0].getProcessData())); - bundles[0].submitFeedsScheduleProcess(prism); - - //check that there are 3 running instances - InstanceUtil.waitTillInstanceReachState(clusterOC, bundles[0].getProcessName(), 3, - CoordinatorAction.Status.RUNNING, EntityType.PROCESS); - - //check that there are 2 waiting instances - InstanceUtil.waitTillInstanceReachState(clusterOC, bundles[0].getProcessName(), 2, - CoordinatorAction.Status.WAITING, EntityType.PROCESS); - - //3 instances should be running , other 2 should be waiting - InstancesResult r = prism.getProcessHelper().getProcessInstanceStatus(bundles[0].getProcessName(), - "?start=" + startTime + "&end=" + endTime); - InstanceUtil.validateResponse(r, 5, 3, 0, 2, 0); - - //suspend 3 running instances - r = prism.getProcessHelper().getProcessInstanceSuspend(bundles[0].getProcessName(), - "?start=" + startTime + "&end=" + midTime); - InstanceUtil.validateResponse(r, 3, 0, 3, 0, 0); - - //try to resume suspended instances by U2 - r = prism.getProcessHelper().getProcessInstanceResume(bundles[0].getProcessName(), "?start=" + startTime - + "&end=" + midTime, MerlinConstants.USER2_NAME); - - //the state of above 3 instances should still be suspended - InstanceUtil.validateResponse(r, 3, 0, 3, 0, 0); - - //check the status of all instances - r = prism.getProcessHelper().getProcessInstanceStatus(bundles[0].getProcessName(), - "?start=" + startTime + "&end=" + endTime); - InstanceUtil.validateResponse(r, 5, 0, 3, 2, 0); - } - - /** - * U2Kill test cases. - */ - //disabled since, falcon does not have authorization https://issues.apache - // .org/jira/browse/FALCON-388 - @Test(enabled = false) - public void u1ScheduleU2KillProcessInstances() throws Exception { - String startTime = TimeUtil.getTimeWrtSystemTime(0); - String endTime = TimeUtil.addMinsToTime(startTime, 5); - LOGGER.info("Start time: " + startTime + "\tEnd time: " + endTime); - - //prepare process definition - bundles[0].setProcessValidity(startTime, endTime); - bundles[0].setProcessPeriodicity(1, Frequency.TimeUnit.minutes); - bundles[0].setProcessConcurrency(5); - bundles[0].setInputFeedPeriodicity(1, Frequency.TimeUnit.minutes); - bundles[0].setInputFeedDataPath(feedInputPath); - bundles[0].setProcessInput("now(0,0)", "now(0,4)"); - - //provide necessary data for first 3 instances to run - LOGGER.info("Creating necessary data..."); - String prefix = bundles[0].getFeedDataPathPrefix(); - HadoopUtil.deleteDirIfExists(prefix.substring(1), clusterFS); - List<String> dataDates = TimeUtil.getMinuteDatesOnEitherSide( - TimeUtil.addMinsToTime(startTime, -2), endTime, 0); - HadoopUtil.flattenAndPutDataInFolder(clusterFS, OSUtil.NORMAL_INPUT, prefix, dataDates); - - //submit, schedule process by U1 - LOGGER.info("Process data: " + Util.prettyPrintXml(bundles[0].getProcessData())); - bundles[0].submitFeedsScheduleProcess(prism); - - //check that there are 3 running instances - InstanceUtil.waitTillInstanceReachState(clusterOC, bundles[0].getProcessName(), 3, - CoordinatorAction.Status.RUNNING, EntityType.PROCESS); - - //3 instances should be running , other 2 should be waiting - InstancesResult r = prism.getProcessHelper().getProcessInstanceStatus(bundles[0].getProcessName(), - "?start=" + startTime + "&end=" + endTime); - InstanceUtil.validateResponse(r, 5, 3, 0, 2, 0); - - //try to kill all instances by U2 - r = prism.getProcessHelper().getProcessInstanceKill(bundles[0].getProcessName(), - "?start=" + startTime + "&end=" + endTime, MerlinConstants.USER2_NAME); - - //number of instances should be the same as before - InstanceUtil.validateResponse(r, 5, 3, 0, 2, 0); - } - - //disabled since, falcon does not have authorization https://issues.apache - // .org/jira/browse/FALCON-388 - @Test(enabled = false) - public void u1SuspendU2KillProcessInstances() throws Exception { - String startTime = TimeUtil.getTimeWrtSystemTime(0); - String endTime = TimeUtil.addMinsToTime(startTime, 5); - String midTime = TimeUtil.addMinsToTime(startTime, 2); - LOGGER.info("Start time: " + startTime + "\tEnd time: " + endTime); - - //prepare process definition - bundles[0].setProcessValidity(startTime, endTime); - bundles[0].setProcessPeriodicity(1, Frequency.TimeUnit.minutes); - bundles[0].setProcessConcurrency(5); - bundles[0].setInputFeedPeriodicity(1, Frequency.TimeUnit.minutes); - bundles[0].setInputFeedDataPath(feedInputPath); - bundles[0].setProcessInput("now(0,0)", "now(0,4)"); - - //provide necessary data for first 3 instances to run - LOGGER.info("Creating necessary data..."); - String prefix = bundles[0].getFeedDataPathPrefix(); - HadoopUtil.deleteDirIfExists(prefix.substring(1), clusterFS); - List<String> dataDates = TimeUtil.getMinuteDatesOnEitherSide( - TimeUtil.addMinsToTime(startTime, -2), endTime, 0); - HadoopUtil.flattenAndPutDataInFolder(clusterFS, OSUtil.NORMAL_INPUT, prefix, dataDates); - - //submit, schedule process by U1 - LOGGER.info("Process data: " + Util.prettyPrintXml(bundles[0].getProcessData())); - bundles[0].submitFeedsScheduleProcess(prism); - - //check that there are 3 running instances - InstanceUtil.waitTillInstanceReachState(clusterOC, bundles[0].getProcessName(), 3, - CoordinatorAction.Status.RUNNING, EntityType.PROCESS); - - //check that there are 2 waiting instances - InstanceUtil.waitTillInstanceReachState(clusterOC, bundles[0].getProcessName(), 2, - CoordinatorAction.Status.WAITING, EntityType.PROCESS); - - //3 instances should be running , other 2 should be waiting - InstancesResult r = prism.getProcessHelper().getProcessInstanceStatus(bundles[0].getProcessName(), - "?start=" + startTime + "&end=" + endTime); - InstanceUtil.validateResponse(r, 5, 3, 0, 2, 0); - - //suspend 3 running instances - r = prism.getProcessHelper().getProcessInstanceSuspend(bundles[0].getProcessName(), - "?start=" + startTime + "&end=" + midTime); - InstanceUtil.validateResponse(r, 3, 0, 3, 0, 0); - - //try to kill all instances by U2 - r = prism.getProcessHelper().getProcessInstanceKill(bundles[0].getProcessName(), - "?start=" + startTime + "&end=" + endTime, MerlinConstants.USER2_NAME); - - //3 should still be suspended, 2 should be waiting - InstanceUtil.validateResponse(r, 5, 0, 3, 2, 0); - } - - /** - * U2Rerun test cases. - */ - //disabled since, falcon does not have authorization https://issues.apache - // .org/jira/browse/FALCON-388 - @Test(enabled = false) - public void u1KillSomeU2RerunAllProcessInstances() - throws IOException, JAXBException, AuthenticationException, URISyntaxException, - OozieClientException, InterruptedException { - String startTime = TimeUtil - .getTimeWrtSystemTime(0); - String endTime = TimeUtil.addMinsToTime(startTime, 5); - String midTime = TimeUtil.addMinsToTime(startTime, 2); - LOGGER.info("Start time: " + startTime + "\tEnd time: " + endTime); - - //prepare process definition - bundles[0].setProcessValidity(startTime, endTime); - bundles[0].setProcessPeriodicity(1, Frequency.TimeUnit.minutes); - bundles[0].setProcessConcurrency(5); - bundles[0].setInputFeedPeriodicity(1, Frequency.TimeUnit.minutes); - bundles[0].setInputFeedDataPath(feedInputPath); - bundles[0].setProcessInput("now(0,0)", "now(0,3)"); - - //provide necessary data for first 4 instances to run - LOGGER.info("Creating necessary data..."); - String prefix = bundles[0].getFeedDataPathPrefix(); - HadoopUtil.deleteDirIfExists(prefix.substring(1), clusterFS); - List<String> dataDates = TimeUtil.getMinuteDatesOnEitherSide( - TimeUtil.addMinsToTime(startTime, -2), endTime, 0); - HadoopUtil.flattenAndPutDataInFolder(clusterFS, OSUtil.NORMAL_INPUT, prefix, dataDates); - - //submit, schedule process by U1 - LOGGER.info("Process data: " + Util.prettyPrintXml(bundles[0].getProcessData())); - bundles[0].submitFeedsScheduleProcess(prism); - - //check that there are 4 running instances - InstanceUtil.waitTillInstanceReachState(clusterOC, bundles[0].getProcessName(), 4, - CoordinatorAction.Status.RUNNING, EntityType.PROCESS); - - //4 instances should be running , 1 should be waiting - InstancesResult r = prism.getProcessHelper().getProcessInstanceStatus(bundles[0].getProcessName(), - "?start=" + startTime + "&end=" + endTime); - InstanceUtil.validateResponse(r, 5, 4, 0, 1, 0); - - //kill 3 running instances - r = prism.getProcessHelper().getProcessInstanceKill(bundles[0].getProcessName(), - "?start=" + startTime + "&end=" + midTime); - InstanceUtil.validateResponse(r, 3, 0, 0, 0, 3); - - //generally 3 instances should be killed, 1 is running and 1 is waiting - - //try to rerun instances by U2 - r = prism.getProcessHelper().getProcessInstanceRerun(bundles[0].getProcessName(), - "?start=" + startTime + "&end=" + midTime, MerlinConstants.USER2_NAME); - - //instances should still be killed - InstanceUtil.validateResponse(r, 3, 0, 0, 0, 3); - } - - /** - * U2Update test cases. - */ - //disabled since, falcon does not have authorization https://issues.apache - // .org/jira/browse/FALCON-388 - @Test(enabled = false) - public void u1SubmitU2UpdateFeed() - throws URISyntaxException, IOException, AuthenticationException, JAXBException, - InterruptedException { - FeedMerlin feed = new FeedMerlin(bundles[0].getInputFeedFromBundle()); - //submit feed - bundles[0].submitClusters(prism); - AssertUtil.assertSucceeded(prism.getFeedHelper().submitEntity(feed.toString())); - String definition = prism.getFeedHelper().getEntityDefinition(feed.toString()).getMessage(); - Assert.assertTrue(definition.contains(feed.getName()) && !definition.contains("(feed) not found"), - "Feed should be already submitted"); - //update feed definition - FeedMerlin newFeed = new FeedMerlin(feed); - newFeed.setFeedPathValue(baseTestDir + "/randomPath" + MINUTE_DATE_PATTERN); - //try to update feed by U2 - final ServiceResponse serviceResponse = prism.getFeedHelper().update(feed.toString(), newFeed.toString(), - MerlinConstants.USER2_NAME); - AssertUtil.assertFailedWithStatus(serviceResponse, HttpStatus.SC_BAD_REQUEST, - "Feed submitted by first user should not be updated by second user"); - } - - //disabled since, falcon does not have authorization https://issues.apache - // .org/jira/browse/FALCON-388 - @Test(enabled = false) - public void u1ScheduleU2UpdateFeed() throws Exception { - FeedMerlin feed = new FeedMerlin(bundles[0].getInputFeedFromBundle()); - //submit and schedule feed - bundles[0].submitClusters(prism); - AssertUtil.assertSucceeded(prism.getFeedHelper().submitAndSchedule(feed.toString())); - AssertUtil.checkStatus(clusterOC, EntityType.FEED, feed.toString(), Job.Status.RUNNING); - //update feed definition - FeedMerlin newFeed = new FeedMerlin(feed); - newFeed.setFeedPathValue(baseTestDir + "/randomPath" + MINUTE_DATE_PATTERN); - //try to update feed by U2 - final ServiceResponse serviceResponse = prism.getFeedHelper().update(feed.toString(), newFeed.toString(), - MerlinConstants.USER2_NAME); - AssertUtil.assertFailedWithStatus(serviceResponse, HttpStatus.SC_BAD_REQUEST, - "Feed scheduled by first user should not be updated by second user"); - } - - //disabled since, falcon does not have authorization https://issues.apache - // .org/jira/browse/FALCON-388 - @Test(enabled = false) - public void u1SubmitU2UpdateProcess() throws Exception { - bundles[0].setProcessValidity("2010-01-02T01:00Z", "2010-01-02T01:04Z"); - String processName = bundles[0].getProcessName(); - //submit process - bundles[0].submitBundle(prism); - String definition = prism.getProcessHelper() - .getEntityDefinition(bundles[0].getProcessData()).getMessage(); - Assert.assertTrue(definition.contains(processName) - && - !definition.contains("(process) not found"), "Process should be already submitted"); - //update process definition - bundles[0].setProcessValidity("2010-01-02T01:00Z", "2020-01-02T01:04Z"); - //try to update process by U2 - final ServiceResponse serviceResponse = prism.getProcessHelper().update(bundles[0] - .getProcessData(), bundles[0].getProcessData(), - MerlinConstants.USER2_NAME); - AssertUtil.assertFailedWithStatus(serviceResponse, HttpStatus.SC_BAD_REQUEST, - "Process submitted by first user should not be updated by second user"); - } - - //disabled since, falcon does not have authorization https://issues.apache - // .org/jira/browse/FALCON-388 - @Test(enabled = false) - public void u1ScheduleU2UpdateProcess() throws Exception { - bundles[0].setProcessValidity("2010-01-02T01:00Z", "2010-01-02T01:04Z"); - //submit, schedule process by U1 - bundles[0].submitFeedsScheduleProcess(prism); - AssertUtil.checkStatus(clusterOC, EntityType.PROCESS, bundles[0].getProcessData(), - Job.Status.RUNNING); - //update process definition - bundles[0].setProcessValidity("2010-01-02T01:00Z", "2020-01-02T01:04Z"); - //try to update process by U2 - final ServiceResponse serviceResponse = prism.getProcessHelper().update(bundles[0] - .getProcessData(), bundles[0].getProcessData(), - MerlinConstants.USER2_NAME); - AssertUtil.assertFailedWithStatus(serviceResponse, HttpStatus.SC_BAD_REQUEST, - "Process scheduled by first user should not be updated by second user"); - } - - //disabled since, falcon does not have authorization https://issues.apache - // .org/jira/browse/FALCON-388 - @Test(enabled = false) - public void u1ScheduleFeedU2ScheduleDependantProcessU1UpdateFeed() throws Exception { - FeedMerlin feed = new FeedMerlin(bundles[0].getInputFeedFromBundle()); - bundles[0].setProcessValidity("2010-01-02T01:00Z", "2099-01-02T01:00Z"); - //submit both feeds - bundles[0].submitClusters(prism); - bundles[0].submitFeeds(prism); - //schedule input feed by U1 - AssertUtil.assertSucceeded(prism.getFeedHelper().schedule(feed.toString())); - AssertUtil.checkStatus(clusterOC, EntityType.FEED, feed.toString(), Job.Status.RUNNING); - - //by U2 schedule process dependant on scheduled feed by U1 - ServiceResponse serviceResponse = prism.getProcessHelper() - .submitAndSchedule(bundles[0].getProcessData(), MerlinConstants.USER2_NAME, ""); - AssertUtil.assertSucceeded(serviceResponse); - AssertUtil.checkStatus(clusterOC, EntityType.PROCESS, bundles[0].getProcessData(), Job.Status.RUNNING); - - //get old process details - String oldProcessBundleId = OozieUtil - .getLatestBundleID(clusterOC, bundles[0].getProcessName(), EntityType.PROCESS); - String oldProcessUser = getBundleUser(clusterOC, bundles[0].getProcessName(), EntityType.PROCESS); - - //get old feed details - String oldFeedBundleId = OozieUtil.getLatestBundleID(clusterOC, feed.getName(), EntityType.FEED); - String oldFeedUser = getBundleUser(clusterOC, feed.getName(), EntityType.FEED); - - //update feed definition - FeedMerlin newFeed = new FeedMerlin(feed); - newFeed.setFeedPathValue(baseTestDir + "/randomPath" + MINUTE_DATE_PATTERN); - - //update feed by U1 - serviceResponse = prism.getFeedHelper().update(feed.toString(), newFeed.toString(), - MerlinConstants.CURRENT_USER_NAME); - AssertUtil.assertSucceeded(serviceResponse); - - //new feed bundle should be created by U1 - OozieUtil.verifyNewBundleCreation(clusterOC, oldFeedBundleId, null, newFeed.toString(), true, false); - String newFeedUser = getBundleUser(clusterOC, newFeed.getName(), EntityType.FEED); - Assert.assertEquals(oldFeedUser, newFeedUser, "User should be the same"); - - //new process bundle should be created by U2 - OozieUtil.verifyNewBundleCreation( - clusterOC, oldProcessBundleId, null, bundles[0].getProcessData(), true, false); - String newProcessUser = - getBundleUser(clusterOC, bundles[0].getProcessName(), EntityType.PROCESS); - Assert.assertEquals(oldProcessUser, newProcessUser, "User should be the same"); - } - - //disabled since, falcon does not have authorization https://issues.apache - // .org/jira/browse/FALCON-388 - @Test(enabled = false) - public void u1ScheduleFeedU2ScheduleDependantProcessU2UpdateFeed() throws Exception { - FeedMerlin feed = new FeedMerlin(bundles[0].getInputFeedFromBundle()); - bundles[0].setProcessValidity("2010-01-02T01:00Z", "2099-01-02T01:00Z"); - //submit both feeds - bundles[0].submitClusters(prism); - bundles[0].submitFeeds(prism); - //schedule input feed by U1 - AssertUtil.assertSucceeded(prism.getFeedHelper().schedule(feed.toString())); - AssertUtil.checkStatus(clusterOC, EntityType.FEED, feed.toString(), Job.Status.RUNNING); - - //by U2 schedule process dependent on scheduled feed by U1 - ServiceResponse serviceResponse = prism.getProcessHelper().submitAndSchedule(bundles[0].getProcessData(), - MerlinConstants.USER2_NAME, ""); - AssertUtil.assertSucceeded(serviceResponse); - AssertUtil.checkStatus(clusterOC, EntityType.PROCESS, bundles[0].getProcessData(), Job.Status.RUNNING); - - //update feed definition - FeedMerlin newFeed = new FeedMerlin(feed); - newFeed.setFeedPathValue(baseTestDir + "/randomPath" + MINUTE_DATE_PATTERN); - - //get old process details - String oldProcessBundleId = OozieUtil - .getLatestBundleID(clusterOC, bundles[0].getProcessName(), EntityType.PROCESS); - String oldProcessUser = getBundleUser(clusterOC, bundles[0].getProcessName(), EntityType.PROCESS); - - //get old feed details - String oldFeedBundleId = OozieUtil.getLatestBundleID(clusterOC, feed.getName(), EntityType.FEED); - String oldFeedUser = getBundleUser(clusterOC, feed.getName(), EntityType.FEED); - - //update feed by U2 - serviceResponse = prism.getFeedHelper().update(feed.toString(), newFeed.toString(), - MerlinConstants.USER2_NAME); - AssertUtil.assertSucceeded(serviceResponse); - - //new feed bundle should be created by U2 - OozieUtil.verifyNewBundleCreation(clusterOC, oldFeedBundleId, null, newFeed.toString(), true, false); - String newFeedUser = getBundleUser(clusterOC, newFeed.getName(), EntityType.FEED); - Assert.assertNotEquals(oldFeedUser, newFeedUser, "User should not be the same"); - Assert.assertEquals(MerlinConstants.USER2_NAME, newFeedUser); - - //new process bundle should be created by U2 - OozieUtil.verifyNewBundleCreation( - clusterOC, oldProcessBundleId, null, bundles[0].getProcessData(), true, false); - String newProcessUser = getBundleUser(clusterOC, bundles[0].getProcessName(), EntityType.PROCESS); - Assert.assertEquals(oldProcessUser, newProcessUser, "User should be the same"); - } - - //disabled since, falcon does not have authorization https://issues.apache - // .org/jira/browse/FALCON-388 - @Test(enabled = false) - public void u1ScheduleFeedU1ScheduleDependantProcessU1UpdateProcess() throws Exception { - String feed = bundles[0].getInputFeedFromBundle(); - bundles[0].setProcessValidity("2010-01-02T01:00Z", "2099-01-02T01:00Z"); - //submit both feeds - bundles[0].submitClusters(prism); - bundles[0].submitFeeds(prism); - //schedule input feed by U1 - AssertUtil.assertSucceeded(prism.getFeedHelper().schedule(feed)); - AssertUtil.checkStatus(clusterOC, EntityType.FEED, feed, Job.Status.RUNNING); - - //by U1 schedule process dependent on scheduled feed by U1 - ServiceResponse serviceResponse = prism.getProcessHelper().submitAndSchedule(bundles[0].getProcessData()); - AssertUtil.assertSucceeded(serviceResponse); - AssertUtil.checkStatus(clusterOC, EntityType.PROCESS, bundles[0].getProcessData(), Job.Status.RUNNING); - - //get old process details - String oldProcessBundleId = OozieUtil - .getLatestBundleID(clusterOC, bundles[0].getProcessName(), EntityType.PROCESS); - String oldProcessUser = getBundleUser(clusterOC, bundles[0].getProcessName(), EntityType.PROCESS); - - //get old feed details - String oldFeedBundleId = OozieUtil.getLatestBundleID(clusterOC, Util.readEntityName(feed), EntityType.FEED); - - //update process by U1 - ProcessMerlin processObj = bundles[0].getProcessObject().withProperty("randomProp", "randomVal"); - serviceResponse = prism.getProcessHelper().update(bundles[0].getProcessData(), processObj.toString()); - AssertUtil.assertSucceeded(serviceResponse); - - //new feed bundle should not be created - OozieUtil.verifyNewBundleCreation(clusterOC, oldFeedBundleId, null, feed, false, false); - - //new process bundle should be created by U1 - OozieUtil.verifyNewBundleCreation( - clusterOC, oldProcessBundleId, null, bundles[0].getProcessData(), true, false); - String newProcessUser = getBundleUser(clusterOC, processObj.getName(), EntityType.PROCESS); - Assert.assertEquals(oldProcessUser, newProcessUser, "User should be the same"); - } - - //disabled since, falcon does not have authorization https://issues.apache - // .org/jira/browse/FALCON-388 - @Test(enabled = false) - public void u1ScheduleFeedU1ScheduleDependantProcessU2UpdateProcess() throws Exception { - String feed = bundles[0].getInputFeedFromBundle(); - bundles[0].setProcessValidity("2010-01-02T01:00Z", "2099-01-02T01:00Z"); - //submit both feeds - bundles[0].submitClusters(prism); - bundles[0].submitFeeds(prism); - //schedule input feed by U1 - AssertUtil.assertSucceeded(prism.getFeedHelper().schedule(feed)); - AssertUtil.checkStatus(clusterOC, EntityType.FEED, feed, Job.Status.RUNNING); - - //by U1 schedule process dependent on scheduled feed by U1 - ServiceResponse serviceResponse = prism.getProcessHelper().submitAndSchedule(bundles[0].getProcessData()); - AssertUtil.assertSucceeded(serviceResponse); - AssertUtil.checkStatus(clusterOC, EntityType.PROCESS, bundles[0].getProcessData(), Job.Status.RUNNING); - - //get old process details - String oldProcessBundleId = OozieUtil - .getLatestBundleID(clusterOC, bundles[0].getProcessName(), EntityType.PROCESS); - String oldProcessUser = getBundleUser(clusterOC, bundles[0].getProcessName(), EntityType.PROCESS); - - //get old feed details - String oldFeedBundleId = OozieUtil.getLatestBundleID(clusterOC, Util.readEntityName(feed), EntityType.FEED); - - //update process by U2 - ProcessMerlin processObj = bundles[0].getProcessObject().withProperty("randomProp", "randomVal"); - serviceResponse = prism.getProcessHelper().update(bundles[0].getProcessData(), processObj.toString(), - MerlinConstants.USER2_NAME); - AssertUtil.assertSucceeded(serviceResponse); - - //new feed bundle should not be created - OozieUtil.verifyNewBundleCreation(clusterOC, oldFeedBundleId, null, feed, false, false); - - //new process bundle should be created by U2 - OozieUtil.verifyNewBundleCreation( - clusterOC, oldProcessBundleId, null, bundles[0].getProcessData(), true, false); - String newProcessUser = getBundleUser(clusterOC, processObj.getName(), EntityType.PROCESS); - Assert.assertNotEquals(oldProcessUser, newProcessUser, "User should not be the same"); - Assert.assertEquals(MerlinConstants.USER2_NAME, newProcessUser); - } - - private String getBundleUser(OozieClient oozieClient, String entityName, EntityType entityType) - throws OozieClientException { - String newBundleId = OozieUtil.getLatestBundleID(oozieClient, entityName, entityType); - BundleJob newBundleJob = oozieClient.getBundleJobInfo(newBundleId); - CoordinatorJob coordinatorJob = null; - for (CoordinatorJob coord : newBundleJob.getCoordinators()) { - if ((entityType == EntityType.PROCESS && coord.getAppName().contains("DEFAULT")) - || (entityType == EntityType.FEED && coord.getAppName().contains("RETENTION"))) { - coordinatorJob = coord; - } - } - Assert.assertNotNull(coordinatorJob); - return coordinatorJob.getUser(); - } - - @AfterMethod(alwaysRun = true) - public void tearDown() { - removeTestClassEntities(); - } -} http://git-wip-us.apache.org/repos/asf/falcon/blob/8e49379d/falcon-regression/merlin/src/test/java/org/apache/falcon/regression/CombinedActionsTest.java ---------------------------------------------------------------------- diff --git a/falcon-regression/merlin/src/test/java/org/apache/falcon/regression/CombinedActionsTest.java b/falcon-regression/merlin/src/test/java/org/apache/falcon/regression/CombinedActionsTest.java deleted file mode 100644 index 7dd3d96..0000000 --- a/falcon-regression/merlin/src/test/java/org/apache/falcon/regression/CombinedActionsTest.java +++ /dev/null @@ -1,217 +0,0 @@ -/** - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ - -package org.apache.falcon.regression; - -import org.apache.commons.lang.StringUtils; -import org.apache.falcon.entity.v0.EntityType; -import org.apache.falcon.entity.v0.Frequency; -import org.apache.falcon.entity.v0.feed.LocationType; -import org.apache.falcon.regression.Entities.FeedMerlin; -import org.apache.falcon.regression.core.bundle.Bundle; -import org.apache.falcon.regression.core.helpers.ColoHelper; -import org.apache.falcon.regression.core.util.HCatUtil; -import org.apache.falcon.regression.core.util.AssertUtil; -import org.apache.falcon.regression.core.util.TimeUtil; -import org.apache.falcon.regression.core.util.HadoopUtil; -import org.apache.falcon.regression.core.util.BundleUtil; -import org.apache.falcon.regression.core.util.InstanceUtil; -import org.apache.falcon.regression.core.util.OSUtil; -import org.apache.falcon.regression.testHelper.BaseTestClass; -import org.apache.hadoop.fs.FileSystem; -import org.apache.hive.hcatalog.api.HCatClient; -import org.apache.hive.hcatalog.api.HCatCreateTableDesc; -import org.apache.hive.hcatalog.data.schema.HCatFieldSchema; -import org.apache.log4j.Logger; -import org.apache.oozie.client.CoordinatorAction; -import org.apache.oozie.client.OozieClient; -import org.joda.time.format.DateTimeFormat; -import org.testng.annotations.AfterMethod; -import org.testng.annotations.BeforeClass; -import org.testng.annotations.BeforeMethod; -import org.testng.annotations.Test; - -import java.lang.reflect.Method; -import java.util.ArrayList; -import java.util.List; - -/** - * Test where a single workflow contains multiple actions. - */ -@Test(groups = "embedded") -public class CombinedActionsTest extends BaseTestClass { - - private ColoHelper cluster = servers.get(0); - private FileSystem clusterFS = serverFS.get(0); - private OozieClient clusterOC = serverOC.get(0); - private HCatClient clusterHC; - - private final String hiveTestDir = "/HiveData"; - private final String baseTestHDFSDir = cleanAndGetTestDir() + hiveTestDir; - private final String inputHDFSDir = baseTestHDFSDir + "/input"; - private final String outputHDFSDir = baseTestHDFSDir + "/output"; - private String aggregateWorkflowDir = cleanAndGetTestDir() + "/aggregator"; - private static final Logger LOGGER = Logger.getLogger(CombinedActionsTest.class); - private static final String HCATDIR = OSUtil.concat("src", "test", "resources", "hcat"); - private static final String LOCALHCATDATA = OSUtil.concat(HCATDIR, "data"); - public static final String DBNAME = "default"; - public static final String COL1NAME = "id"; - public static final String COL2NAME = "value"; - public static final String PARTITIONCOLUMN = "dt"; - private final String inputTableName = "combinedactionstest_input_table"; - private final String outputTableName = "combinedactionstest_output_table"; - - private String pigMrTestDir = cleanAndGetTestDir() + "/pigMrData"; - private String inputPath = pigMrTestDir + "/input/${YEAR}/${MONTH}/${DAY}/${HOUR}/${MINUTE}"; - private String outputPathPig = pigMrTestDir + "/output/pig/${YEAR}/${MONTH}/${DAY}/${HOUR}/${MINUTE}"; - private String outputPathMr = pigMrTestDir + "/output/mr/${YEAR}/${MONTH}/${DAY}/${HOUR}/${MINUTE}"; - - @BeforeClass(alwaysRun = true) - public void uploadWorkflow() throws Exception { - - HadoopUtil.uploadDir(clusterFS, aggregateWorkflowDir, OSUtil.OOZIE_COMBINED_ACTIONS); - } - - @BeforeMethod(alwaysRun = true) - public void setUp(Method method) throws Exception { - LOGGER.info("test name: " + method.getName()); - clusterHC = cluster.getClusterHelper().getHCatClient(); - bundles[0] = BundleUtil.readCombinedActionsBundle(); - bundles[0] = new Bundle(bundles[0], cluster); - bundles[0].generateUniqueBundle(this); - AssertUtil.assertSucceeded(prism.getClusterHelper().submitEntity(bundles[0].getClusters().get(0))); - } - - @AfterMethod(alwaysRun = true) - public void tearDown()throws Exception { - removeTestClassEntities(); - clusterHC.dropTable(DBNAME, inputTableName, true); - clusterHC.dropTable(DBNAME, outputTableName, true); - HadoopUtil.deleteDirIfExists(pigMrTestDir, clusterFS); - } - - /** - *Schedule a process, for which the oozie workflow contains multiple actions like hive, mr, pig - *The process should succeed. Fails right now due to: https://issues.apache.org/jira/browse/FALCON-670 - * - * @throws Exception - */ - - @Test - public void combinedMrPigHiveAction()throws Exception{ - - //create data for pig, mr and hcat jobs - final String startDate = "2010-01-01T20:00Z"; - final String endDate = "2010-01-02T04:00Z"; - - String inputFeedMrPig = bundles[0].getFeed("sampleFeed1"); - FeedMerlin feedObj = new FeedMerlin(inputFeedMrPig); - - HadoopUtil.deleteDirIfExists(pigMrTestDir + "/input", clusterFS); - List<String> dataDates = TimeUtil.getMinuteDatesOnEitherSide(startDate, endDate, 20); - - HadoopUtil.flattenAndPutDataInFolder(clusterFS, OSUtil.concat(OSUtil.NORMAL_INPUT, pigMrTestDir, "input"), - dataDates); - - final String datePattern = StringUtils.join(new String[] { "yyyy", "MM", "dd", "HH", "mm"}, "-"); - dataDates = TimeUtil.getMinuteDatesOnEitherSide(startDate, endDate, 60, DateTimeFormat.forPattern(datePattern)); - - final List<String> dataset = HadoopUtil.flattenAndPutDataInFolder(clusterFS, LOCALHCATDATA, - inputHDFSDir, dataDates); - - ArrayList<HCatFieldSchema> cols = new ArrayList<>(); - cols.add(HCatUtil.getStringSchema(COL1NAME, COL1NAME + " comment")); - cols.add(HCatUtil.getStringSchema(COL2NAME, COL2NAME + " comment")); - ArrayList<HCatFieldSchema> partitionCols = new ArrayList<>(); - - partitionCols.add(HCatUtil.getStringSchema(PARTITIONCOLUMN, PARTITIONCOLUMN + " partition")); - clusterHC.createTable(HCatCreateTableDesc - .create(DBNAME, inputTableName, cols) - .partCols(partitionCols) - .ifNotExists(true) - .isTableExternal(true) - .location(inputHDFSDir) - .build()); - - clusterHC.createTable(HCatCreateTableDesc - .create(DBNAME, outputTableName, cols) - .partCols(partitionCols) - .ifNotExists(true) - .isTableExternal(true) - .location(outputHDFSDir) - .build()); - - HCatUtil.addPartitionsToTable(clusterHC, dataDates, dataset, "dt", DBNAME, inputTableName); - - final String tableUriPartitionFragment = StringUtils.join( - new String[]{"#dt=${YEAR}", "${MONTH}", "${DAY}", "${HOUR}", "${MINUTE}"}, "-"); - String inputTableUri = - "catalog:" + DBNAME + ":" + inputTableName + tableUriPartitionFragment; - String outputTableUri = - "catalog:" + DBNAME + ":" + outputTableName + tableUriPartitionFragment; - - //Set input and output feeds for bundle - //input feed for both mr and pig jobs - feedObj.setLocation(LocationType.DATA, inputPath); - LOGGER.info(feedObj.toString()); - AssertUtil.assertSucceeded(prism.getFeedHelper().submitEntity(feedObj.toString())); - - //output feed for pig jobs - String outputFeedPig = bundles[0].getFeed("sampleFeed2"); - feedObj = new FeedMerlin(outputFeedPig); - feedObj.setLocation(LocationType.DATA, outputPathPig); - feedObj.setFrequency(new Frequency("5", Frequency.TimeUnit.minutes)); - AssertUtil.assertSucceeded(prism.getFeedHelper().submitEntity(feedObj.toString())); - - //output feed for mr jobs - String outputFeedMr = bundles[0].getFeed("sampleFeed3"); - feedObj = new FeedMerlin(outputFeedMr); - feedObj.setLocation(LocationType.DATA, outputPathMr); - AssertUtil.assertSucceeded(prism.getFeedHelper().submitEntity(feedObj.toString())); - - //input feed for hcat jobs - String inputHive = bundles[0].getFeed("sampleFeedHCat1"); - feedObj = new FeedMerlin(inputHive); - feedObj.getTable().setUri(inputTableUri); - feedObj.setFrequency(new Frequency("1", Frequency.TimeUnit.hours)); - feedObj.getClusters().getClusters().get(0).getValidity() - .setStart(TimeUtil.oozieDateToDate(startDate).toDate()); - feedObj.getClusters().getClusters().get(0).getValidity() - .setEnd(TimeUtil.oozieDateToDate(endDate).toDate()); - AssertUtil.assertSucceeded(prism.getFeedHelper().submitEntity(feedObj.toString())); - - //output feed for hcat jobs - String outputHive = bundles[0].getFeed("sampleFeedHCat2"); - feedObj = new FeedMerlin(outputHive); - feedObj.getTable().setUri(outputTableUri); - feedObj.setFrequency(new Frequency("1", Frequency.TimeUnit.hours)); - feedObj.getClusters().getClusters().get(0).getValidity() - .setStart(TimeUtil.oozieDateToDate(startDate).toDate()); - feedObj.getClusters().getClusters().get(0).getValidity() - .setEnd(TimeUtil.oozieDateToDate(endDate).toDate()); - AssertUtil.assertSucceeded(prism.getFeedHelper().submitEntity(feedObj.toString())); - - bundles[0].setProcessWorkflow(aggregateWorkflowDir); - bundles[0].setProcessValidity(startDate, endDate); - bundles[0].setProcessPeriodicity(1, Frequency.TimeUnit.hours); - bundles[0].setProcessInputStartEnd("now(0,0)", "now(0,0)"); - AssertUtil.assertSucceeded(prism.getProcessHelper().submitAndSchedule(bundles[0].getProcessData())); - InstanceUtil.waitTillInstanceReachState(clusterOC, bundles[0].getProcessName(), - 1, CoordinatorAction.Status.SUCCEEDED, EntityType.PROCESS); - } -} http://git-wip-us.apache.org/repos/asf/falcon/blob/8e49379d/falcon-regression/merlin/src/test/java/org/apache/falcon/regression/ELExpCurrentAndLastWeekTest.java ---------------------------------------------------------------------- diff --git a/falcon-regression/merlin/src/test/java/org/apache/falcon/regression/ELExpCurrentAndLastWeekTest.java b/falcon-regression/merlin/src/test/java/org/apache/falcon/regression/ELExpCurrentAndLastWeekTest.java deleted file mode 100644 index 3eb7bed..0000000 --- a/falcon-regression/merlin/src/test/java/org/apache/falcon/regression/ELExpCurrentAndLastWeekTest.java +++ /dev/null @@ -1,179 +0,0 @@ -/** - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ - -package org.apache.falcon.regression; - -import org.apache.falcon.entity.v0.EntityType; -import org.apache.falcon.entity.v0.Frequency.TimeUnit; -import org.apache.falcon.regression.core.bundle.Bundle; -import org.apache.falcon.regression.core.helpers.ColoHelper; -import org.apache.falcon.regression.core.util.OozieUtil; -import org.apache.falcon.regression.core.util.AssertUtil; -import org.apache.falcon.regression.core.util.TimeUtil; -import org.apache.falcon.regression.core.util.BundleUtil; -import org.apache.falcon.regression.core.util.InstanceUtil; -import org.apache.falcon.regression.core.util.OSUtil; -import org.apache.falcon.regression.testHelper.BaseTestClass; -import org.apache.log4j.Logger; -import org.apache.oozie.client.CoordinatorAction; -import org.apache.oozie.client.Job; -import org.apache.oozie.client.OozieClient; -import org.apache.oozie.client.OozieClientException; -import org.testng.Assert; -import org.testng.annotations.AfterMethod; -import org.testng.annotations.BeforeClass; -import org.testng.annotations.BeforeMethod; -import org.testng.annotations.Test; -import org.testng.annotations.DataProvider; - -import java.util.ArrayList; -import java.util.Collections; -import java.util.List; - -/** - * EL Expression Current and last week test. - */ - -@Test(groups = "embedded") -public class ELExpCurrentAndLastWeekTest extends BaseTestClass { - - private ColoHelper cluster = servers.get(0); - private OozieClient clusterOC = serverOC.get(0); - private String baseTestDir = cleanAndGetTestDir(); - private String aggregateWorkflowDir = baseTestDir + "/aggregator"; - private static final Logger LOGGER = Logger.getLogger(ELExpCurrentAndLastWeekTest.class); - - @BeforeClass(alwaysRun = true) - public void createTestData() throws Exception { - LOGGER.info("in @BeforeClass"); - uploadDirToClusters(aggregateWorkflowDir, OSUtil.RESOURCES_OOZIE); - - Bundle bundle = BundleUtil.readELBundle(); - bundle.generateUniqueBundle(this); - bundle = new Bundle(bundle, cluster); - - bundle.setInputFeedDataPath(baseTestDir + "/testData" + MINUTE_DATE_PATTERN); - bundle.setProcessWorkflow(aggregateWorkflowDir); - - } - - @BeforeMethod(alwaysRun = true) - public void setUp() throws Exception { - String processStart = "2015-02-17T10:30Z"; - String processEnd = "2015-02-17T10:50Z"; - bundles[0] = BundleUtil.readELBundle(); - bundles[0] = new Bundle(bundles[0], cluster); - bundles[0].generateUniqueBundle(this); - bundles[0].setInputFeedDataPath(baseTestDir + "/testData" + MINUTE_DATE_PATTERN); - bundles[0].setOutputFeedLocationData(baseTestDir + "/output" + MINUTE_DATE_PATTERN); - bundles[0].setInputFeedPeriodicity(5, TimeUnit.minutes); - bundles[0].setInputFeedValidity("2010-04-01T00:00Z", "2099-04-01T00:00Z"); - LOGGER.info("processStart: " + processStart + " processEnd: " + processEnd); - bundles[0].setProcessValidity(processStart, processEnd); - bundles[0].setProcessPeriodicity(5, TimeUnit.minutes); - bundles[0].setProcessWorkflow(aggregateWorkflowDir); - } - - @AfterMethod(alwaysRun = true) - public void tearDown() { - removeTestClassEntities(); - } - - /* Test cases to check currentWeek and lastWeek EL-Expressions. - * Finding the missing dependencies of coordiantor based on - * given el definition in entity and creating them. - * These should match with the expected missing dependencies. - * In case they dont match, the test should fail. - * - * */ - @Test(groups = {"singleCluster"}, dataProvider = "EL-DP-Cases") - public void currentAndLastWeekTest(String startInstance, String endInstance, - String firstDep, String endDep) throws Exception { - bundles[0].setDatasetInstances(startInstance, endInstance); - bundles[0].submitFeedsScheduleProcess(prism); - AssertUtil.checkStatus(clusterOC, EntityType.PROCESS, bundles[0], Job.Status.RUNNING); - - InstanceUtil.waitTillInstancesAreCreated(clusterOC, bundles[0].getProcessData(), 0); - List<String> missingDependencies = getMissingDependencies(clusterOC, bundles[0].getProcessName()); - OozieUtil.createMissingDependencies(cluster, EntityType.PROCESS, bundles[0].getProcessName(), 0); - InstanceUtil.waitTillInstanceReachState(clusterOC, bundles[0].getProcessName(), 1, - CoordinatorAction.Status.SUCCEEDED, EntityType.PROCESS); - List<String> qaDependencies = getQADepedencyList(bundles[0], firstDep, endDep); - Assert.assertTrue(matchDependencies(missingDependencies, qaDependencies)); - } - - @DataProvider(name = "EL-DP-Cases") - public Object[][] getELData() { - return new Object[][]{ - {"currentWeek('WED',2,15)", "currentWeek('WED',2,25)", "2015-02-11T02:15Z", "2015-02-11T02:25Z"}, - {"currentWeek('WED',21,60)", "currentWeek('WED',22,10)", "2015-02-11T22:00Z", "2015-02-11T22:10Z"}, - {"currentWeek('WED',24,60)", "currentWeek('THU',01,10)", "2015-02-12T01:00Z", "2015-02-12T01:10Z"}, - {"currentWeek('WED',04,-60)", "currentWeek('WED',04,10)", "2015-02-11T03:00Z", "2015-02-11T04:10Z"}, - {"currentWeek('SAT',-04,-60)", "currentWeek('SAT',-04,-40)", "2015-02-13T19:00Z", "2015-02-13T19:20Z"}, - {"currentWeek('SAT',-24,-60)", "currentWeek('SAT',-24,-40)", "2015-02-12T23:00Z", "2015-02-12T23:20Z"}, - {"lastWeek('THU',-24,-20)", "lastWeek('THU',-24,-05)", "2015-02-03T23:40Z", "2015-02-03T23:55Z"}, - {"lastWeek('WED',2,15)", "lastWeek('WED',2,25)", "2015-02-04T02:15Z", "2015-02-04T02:25Z"}, - {"lastWeek('WED',21,60)", "lastWeek('WED',22,10)", "2015-02-04T22:00Z", "2015-02-04T22:10Z"}, - {"lastWeek('WED',24,60)", "lastWeek('THU',01,10)", "2015-02-05T01:00Z", "2015-02-05T01:10Z"}, - {"lastWeek('WED',04,-60)", "lastWeek('WED',04,10)", "2015-02-04T03:00Z", "2015-02-04T04:10Z"}, - {"lastWeek('FRI',01,05)", "lastWeek('FRI',01,20)", "2015-02-06T01:05Z", "2015-02-06T01:20Z"}, - {"lastWeek('FRI',01,60)", "lastWeek('FRI',02,20)", "2015-02-06T02:00Z", "2015-02-06T02:20Z"}, - {"lastWeek('FRI',24,00)", "lastWeek('SAT',00,20)", "2015-02-07T00:00Z", "2015-02-07T00:20Z"}, - {"lastWeek('THU',-04,-20)", "lastWeek('THU',-04,-05)", "2015-02-04T19:40Z", "2015-02-04T19:55Z"}, - }; - } - - private boolean matchDependencies(List<String> fromJob, List<String> qaList) { - Collections.sort(fromJob); - Collections.sort(qaList); - if (fromJob.size() != qaList.size()) { - return false; - } - for (int index = 0; index < fromJob.size(); index++) { - if (!fromJob.get(index).contains(qaList.get(index))) { - return false; - } - } - return true; - } - - public List<String> getMissingDependencies(OozieClient oozieClient, - String processName) throws OozieClientException { - List<String> bundles = OozieUtil.getBundles(oozieClient, processName, EntityType.PROCESS); - String coordID = bundles.get(0); - List<String> missingDependencies = OozieUtil.getMissingDependencies(oozieClient, coordID); - for (int i = 0; i < 10 && missingDependencies == null; ++i) { - TimeUtil.sleepSeconds(30); - missingDependencies = OozieUtil.getMissingDependencies(oozieClient, coordID); - } - Assert.assertNotNull(missingDependencies, "Missing dependencies not found."); - return missingDependencies; - } - - private List<String> getQADepedencyList(Bundle bundle, String firstDep, String endDep) { - String path = baseTestDir + "/testData/"; - List<String> returnList = new ArrayList<>(); - List<String> dataSets = TimeUtil.getMinuteDatesOnEitherSide(firstDep, - endDep, bundle.getInitialDatasetFrequency()); - for (String str : dataSets) { - returnList.add(path + str); - } - return returnList; - } - -} http://git-wip-us.apache.org/repos/asf/falcon/blob/8e49379d/falcon-regression/merlin/src/test/java/org/apache/falcon/regression/ELExpFutureAndLatestTest.java ---------------------------------------------------------------------- diff --git a/falcon-regression/merlin/src/test/java/org/apache/falcon/regression/ELExpFutureAndLatestTest.java b/falcon-regression/merlin/src/test/java/org/apache/falcon/regression/ELExpFutureAndLatestTest.java deleted file mode 100644 index 90826f1..0000000 --- a/falcon-regression/merlin/src/test/java/org/apache/falcon/regression/ELExpFutureAndLatestTest.java +++ /dev/null @@ -1,114 +0,0 @@ -/** - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ - -package org.apache.falcon.regression; - -import org.apache.falcon.regression.core.bundle.Bundle; -import org.apache.falcon.entity.v0.EntityType; -import org.apache.falcon.entity.v0.Frequency.TimeUnit; -import org.apache.falcon.regression.core.helpers.ColoHelper; -import org.apache.falcon.regression.core.util.AssertUtil; -import org.apache.falcon.regression.core.util.BundleUtil; -import org.apache.falcon.regression.core.util.HadoopUtil; -import org.apache.falcon.regression.core.util.InstanceUtil; -import org.apache.falcon.regression.core.util.OSUtil; -import org.apache.falcon.regression.core.util.TimeUtil; -import org.apache.falcon.regression.testHelper.BaseTestClass; -import org.apache.hadoop.fs.FileSystem; -import org.apache.log4j.Logger; -import org.apache.oozie.client.CoordinatorAction; -import org.apache.oozie.client.OozieClient; -import org.testng.annotations.AfterMethod; -import org.testng.annotations.BeforeClass; -import org.testng.annotations.BeforeMethod; -import org.testng.annotations.Test; - -import java.util.List; - -/** - * EL Expression test. - */ -@Test(groups = "embedded") -public class ELExpFutureAndLatestTest extends BaseTestClass { - - private ColoHelper cluster = servers.get(0); - private FileSystem clusterFS = serverFS.get(0); - private OozieClient clusterOC = serverOC.get(0); - private String baseTestDir = cleanAndGetTestDir(); - private String aggregateWorkflowDir = baseTestDir + "/aggregator"; - private static final Logger LOGGER = Logger.getLogger(ELExpFutureAndLatestTest.class); - - @BeforeClass(alwaysRun = true) - public void createTestData() throws Exception { - LOGGER.info("in @BeforeClass"); - uploadDirToClusters(aggregateWorkflowDir, OSUtil.RESOURCES_OOZIE); - - Bundle b = BundleUtil.readELBundle(); - b.generateUniqueBundle(this); - b = new Bundle(b, cluster); - - String startDate = TimeUtil.getTimeWrtSystemTime(-20); - String endDate = TimeUtil.getTimeWrtSystemTime(70); - - b.setInputFeedDataPath(baseTestDir + "/testData" + MINUTE_DATE_PATTERN); - b.setProcessWorkflow(aggregateWorkflowDir); - - List<String> dataDates = TimeUtil.getMinuteDatesOnEitherSide(startDate, endDate, 1); - - HadoopUtil.flattenAndPutDataInFolder(clusterFS, OSUtil.NORMAL_INPUT, - b.getFeedDataPathPrefix(), dataDates); - } - - @BeforeMethod(alwaysRun = true) - public void setUp() throws Exception { - bundles[0] = BundleUtil.readELBundle(); - bundles[0] = new Bundle(bundles[0], cluster); - bundles[0].generateUniqueBundle(this); - bundles[0].setInputFeedDataPath(baseTestDir + "/testData" + MINUTE_DATE_PATTERN); - bundles[0].setOutputFeedLocationData(baseTestDir + "/output" + MINUTE_DATE_PATTERN); - bundles[0].setInputFeedPeriodicity(5, TimeUnit.minutes); - bundles[0].setInputFeedValidity("2010-04-01T00:00Z", "2099-04-01T00:00Z"); - String processStart = TimeUtil.getTimeWrtSystemTime(-3); - String processEnd = TimeUtil.getTimeWrtSystemTime(8); - LOGGER.info("processStart: " + processStart + " processEnd: " + processEnd); - bundles[0].setProcessValidity(processStart, processEnd); - bundles[0].setProcessPeriodicity(5, TimeUnit.minutes); - bundles[0].setProcessWorkflow(aggregateWorkflowDir); - } - - @AfterMethod(alwaysRun = true) - public void tearDown() { - removeTestClassEntities(); - } - - @Test(groups = {"singleCluster"}) - public void latestTest() throws Exception { - bundles[0].setDatasetInstances("latest(-3)", "latest(0)"); - AssertUtil.assertSucceeded(bundles[0].submitFeedsScheduleProcess(prism)); - InstanceUtil.waitTillInstanceReachState(clusterOC, bundles[0].getProcessName(), 3, - CoordinatorAction.Status.SUCCEEDED, EntityType.PROCESS); - } - - @Test(groups = {"singleCluster"}) - public void futureTest() throws Exception { - bundles[0].setDatasetInstances("future(0,10)", "future(3,10)"); - AssertUtil.assertSucceeded(bundles[0].submitFeedsScheduleProcess(prism)); - InstanceUtil.waitTillInstanceReachState(clusterOC, bundles[0].getProcessName(), 3, - CoordinatorAction.Status.SUCCEEDED, EntityType.PROCESS); - } -}
