Repository: falcon Updated Branches: refs/heads/master f018fafb4 -> ef4e8a4b1
FALCON-1557 Supporting some Entity Management Api's and admin api in Falcon Unit (Narayan Periwal) Project: http://git-wip-us.apache.org/repos/asf/falcon/repo Commit: http://git-wip-us.apache.org/repos/asf/falcon/commit/ef4e8a4b Tree: http://git-wip-us.apache.org/repos/asf/falcon/tree/ef4e8a4b Diff: http://git-wip-us.apache.org/repos/asf/falcon/diff/ef4e8a4b Branch: refs/heads/master Commit: ef4e8a4b11f0920431aa0fd5c0ca1ed8d3301571 Parents: f018faf Author: Pallavi Rao <[email protected]> Authored: Tue Nov 24 14:04:49 2015 +0530 Committer: Pallavi Rao <[email protected]> Committed: Tue Nov 24 14:04:49 2015 +0530 ---------------------------------------------------------------------- CHANGES.txt | 2 + .../falcon/client/AbstractFalconClient.java | 102 +++++++++++++++++++ .../org/apache/falcon/client/FalconClient.java | 5 + .../workflow/engine/OozieWorkflowEngine.java | 2 +- .../client/LocalOozieClientCoordProxy.java | 16 +++ .../oozie/client/LocalProxyOozieClient.java | 2 +- .../falcon/resource/admin/AdminResource.java | 12 ++- .../apache/falcon/unit/FalconUnitClient.java | 62 ++++++++++- .../unit/LocalSchedulableEntityManager.java | 42 ++++++++ .../apache/falcon/unit/FalconUnitTestBase.java | 18 +++- .../org/apache/falcon/unit/TestFalconUnit.java | 77 ++++++++++---- .../resource/ProcessInstanceManagerIT.java | 24 ++--- .../apache/falcon/resource/UnitTestContext.java | 14 +-- 13 files changed, 325 insertions(+), 53 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/falcon/blob/ef4e8a4b/CHANGES.txt ---------------------------------------------------------------------- diff --git a/CHANGES.txt b/CHANGES.txt index 66bfa9d..62e5725 100755 --- a/CHANGES.txt +++ b/CHANGES.txt @@ -18,6 +18,8 @@ Trunk (Unreleased) FALCON-1213 Base framework of the native scheduler(Pallavi Rao) IMPROVEMENTS + FALCON-1557 Supporting some Entity Management Api's and admin api in Falcon Unit (Narayan Periwal via Pallavi Rao) + FALCON-1622 On starting falcon server JPS shows Main and not Falcon(Sandeep Samudrala via Ajay Yadava) FALCON-1607 Native Scheduler - Code refactoring: Refactor ID into more specific sub classes(Ajay Yadava) http://git-wip-us.apache.org/repos/asf/falcon/blob/ef4e8a4b/client/src/main/java/org/apache/falcon/client/AbstractFalconClient.java ---------------------------------------------------------------------- diff --git a/client/src/main/java/org/apache/falcon/client/AbstractFalconClient.java b/client/src/main/java/org/apache/falcon/client/AbstractFalconClient.java index 27b93c0..43a342d 100644 --- a/client/src/main/java/org/apache/falcon/client/AbstractFalconClient.java +++ b/client/src/main/java/org/apache/falcon/client/AbstractFalconClient.java @@ -21,6 +21,8 @@ import org.apache.falcon.LifeCycle; import org.apache.falcon.entity.v0.Entity; import org.apache.falcon.entity.v0.EntityType; import org.apache.falcon.resource.APIResult; +import org.apache.falcon.resource.EntityList; +import org.apache.falcon.resource.EntitySummaryResult; import org.apache.falcon.resource.FeedInstanceResult; import org.apache.falcon.resource.InstanceDependencyResult; import org.apache.falcon.resource.InstancesResult; @@ -176,6 +178,100 @@ public abstract class AbstractFalconClient { FalconCLIException; /** + * Submits and schedules an entity. + * @param entityType Valid options are feed or process. + * @param filePath Path for the entity definition + * @param skipDryRun Optional query param, Falcon skips oozie dryrun when value is set to true. + * @param doAsUser proxy user + * @return Result of the submit and schedule command. + */ + public abstract APIResult submitAndSchedule(String entityType, String filePath, Boolean skipDryRun, String doAsUser, + String properties) throws FalconCLIException; + + /** + * + * Get list of the entities. + * We have two filtering parameters for entity tags: "tags" and "tagkeys". + * "tags" does the exact match in key=value fashion, while "tagkeys" finds all the entities with the given key as a + * substring in the tags. This "tagkeys" filter is introduced for the user who doesn't remember the exact tag but + * some keywords in the tag. It also helps users to save the time of typing long tags. + * The returned entities will match all the filtering criteria. + * @param entityType Comma-separated entity types. Can be empty. Valid entity types are cluster, feed or process. + * @param fields <optional param> Fields of entity that the user wants to view, separated by commas. + * Valid options are STATUS, TAGS, PIPELINES, CLUSTERS. + * @param nameSubsequence <optional param> Subsequence of entity name. Not case sensitive. + * The entity name needs to contain all the characters in the subsequence in the same order. + * Example 1: "sample1" will match the entity named "SampleFeed1-2". + * Example 2: "mhs" will match the entity named "New-My-Hourly-Summary". + * @param tagKeywords <optional param> Keywords in tags, separated by comma. Not case sensitive. + * The returned entities will have tags that match all the tag keywords. + * @param filterTags <optional param> Return list of entities that have specified tags, separated by a comma. + * Query will do AND on tag values. + * Example: [email protected],[email protected] + * @param filterBy <optional param> Filter results by list of field:value pairs. + * Example: filterBy=STATUS:RUNNING,PIPELINES:clickLogs + * Supported filter fields are NAME, STATUS, PIPELINES, CLUSTER. + * Query will do an AND among filterBy fields. + * @param orderBy <optional param> Field by which results should be ordered. + * Supports ordering by "name". + * @param sortOrder <optional param> Valid options are "asc" and "desc" + * @param offset <optional param> Show results from the offset, used for pagination. Defaults to 0. + * @param numResults <optional param> Number of results to show per request, used for pagination. Only + * integers > 0 are valid, Default is 10. + * @param doAsUser proxy user + * @return Total number of results and a list of entities. + */ + public abstract EntityList getEntityList(String entityType, String fields, String nameSubsequence, String + tagKeywords, String filterBy, String filterTags, String orderBy, String sortOrder, Integer offset, Integer + numResults, String doAsUser) throws FalconCLIException; + + /** + * Given an EntityType and cluster, get list of entities along with summary of N recent instances of each entity. + * @param entityType Valid options are feed or process. + * @param cluster Show entities that belong to this cluster. + * @param start <optional param> Show entity summaries from this date. Date format is yyyy-MM-dd'T'HH:mm'Z'. + * By default, it is set to (end - 2 days). + * @param end <optional param> Show entity summary up to this date. Date format is yyyy-MM-dd'T'HH:mm'Z'. + * Default is set to now. + * @param fields <optional param> Fields of entity that the user wants to view, separated by commas. + * Valid options are STATUS, TAGS, PIPELINES. + * @param filterBy <optional param> Filter results by list of field:value pairs. + * Example: filterBy=STATUS:RUNNING,PIPELINES:clickLogs + * Supported filter fields are NAME, STATUS, PIPELINES, CLUSTER. + * Query will do an AND among filterBy fields. + * @param filterTags <optional param> Return list of entities that have specified tags, separated by a comma. + * Query will do AND on tag values. + * Example: [email protected],[email protected] + * @param orderBy <optional param> Field by which results should be ordered. + * Supports ordering by "name". + * @param sortOrder <optional param> Valid options are "asc" and "desc" + * @param offset <optional param> Show results from the offset, used for pagination. Defaults to 0. + * @param numInstances <optional param> Number of results to show per request, used for pagination. Only + * integers > 0 are valid, Default is 10. + * @param numResults <optional param> Number of recent instances to show per entity. Only integers > 0 are + * valid, Default is 7. + * @param doAsUser proxy user + * @return Show entities along with summary of N instances for each entity. + */ + public abstract EntitySummaryResult getEntitySummary(String entityType, String cluster, String start, String end, + String fields, String filterBy, String filterTags, String + orderBy, String sortOrder, Integer offset, Integer + numResults, Integer numInstances, String doAsUser) throws + FalconCLIException; + + /** + * Force updates the entity. + * @param entityType Valid options are feed or process. + * @param entityName Name of the entity. + * @param colo Colo on which the query should be run. + * @param skipDryRun Optional query param, Falcon skips oozie dryrun when value is set to true. + * @param doAsUser proxy user + * @return Result of the validation. + */ + public abstract APIResult touch(String entityType, String entityName, String colo, Boolean skipDryRun, + String doAsUser) throws FalconCLIException; + + /** * Kill currently running instance(s) of an entity. * @param type Valid options are feed or process. * @param entity name of the entity. @@ -345,6 +441,12 @@ public abstract class AbstractFalconClient { String instanceTime, String colo) throws FalconCLIException; + /** + * Get version of the falcon server. + * @return Version of the server. + */ + public abstract String getVersion(String doAsUser) throws FalconCLIException; + protected InputStream getServletInputStream(String clusters, String sourceClusters, String properties) throws FalconCLIException, UnsupportedEncodingException { http://git-wip-us.apache.org/repos/asf/falcon/blob/ef4e8a4b/client/src/main/java/org/apache/falcon/client/FalconClient.java ---------------------------------------------------------------------- diff --git a/client/src/main/java/org/apache/falcon/client/FalconClient.java b/client/src/main/java/org/apache/falcon/client/FalconClient.java index c49dd08..471cab4 100644 --- a/client/src/main/java/org/apache/falcon/client/FalconClient.java +++ b/client/src/main/java/org/apache/falcon/client/FalconClient.java @@ -377,6 +377,7 @@ public class FalconClient extends AbstractFalconClient { return parseAPIResult(clientResponse); } + @Override public APIResult submitAndSchedule(String entityType, String filePath, Boolean skipDryRun, String doAsUser, String properties) throws FalconCLIException { @@ -439,6 +440,7 @@ public class FalconClient extends AbstractFalconClient { return clientResponse.getEntity(TriageResult.class); } + @Override public EntityList getEntityList(String entityType, String fields, String nameSubsequence, String tagKeywords, String filterBy, String filterTags, String orderBy, String sortOrder, Integer offset, Integer numResults, String doAsUser) throws FalconCLIException { @@ -446,6 +448,7 @@ public class FalconClient extends AbstractFalconClient { filterTags, orderBy, sortOrder, offset, numResults, doAsUser); } + @Override public EntitySummaryResult getEntitySummary(String entityType, String cluster, String start, String end, String fields, String filterBy, String filterTags, String orderBy, String sortOrder, @@ -455,6 +458,7 @@ public class FalconClient extends AbstractFalconClient { orderBy, sortOrder, offset, numResults, numInstances, doAsUser); } + @Override public APIResult touch(String entityType, String entityName, String colo, Boolean skipDryRun, String doAsUser) throws FalconCLIException { Entities operation = Entities.TOUCH; @@ -595,6 +599,7 @@ public class FalconClient extends AbstractFalconClient { return sendAdminRequest(AdminOperations.STACK, doAsUser); } + @Override public String getVersion(String doAsUser) throws FalconCLIException { return sendAdminRequest(AdminOperations.VERSION, doAsUser); } http://git-wip-us.apache.org/repos/asf/falcon/blob/ef4e8a4b/oozie/src/main/java/org/apache/falcon/workflow/engine/OozieWorkflowEngine.java ---------------------------------------------------------------------- diff --git a/oozie/src/main/java/org/apache/falcon/workflow/engine/OozieWorkflowEngine.java b/oozie/src/main/java/org/apache/falcon/workflow/engine/OozieWorkflowEngine.java index b1cec41..724f646 100644 --- a/oozie/src/main/java/org/apache/falcon/workflow/engine/OozieWorkflowEngine.java +++ b/oozie/src/main/java/org/apache/falcon/workflow/engine/OozieWorkflowEngine.java @@ -1302,7 +1302,7 @@ public class OozieWorkflowEngine extends AbstractWorkflowEngine { coord.getId(), SchemaHelper.formatDateUTC(pauseTime)); change(cluster, coord.getId(), concurrency, null, SchemaHelper.formatDateUTC(pauseTime)); } - change(cluster, coord.getId(), concurrency, endTime, ""); + change(cluster, coord.getId(), concurrency, endTime, null); } } } http://git-wip-us.apache.org/repos/asf/falcon/blob/ef4e8a4b/oozie/src/main/java/org/apache/oozie/client/LocalOozieClientCoordProxy.java ---------------------------------------------------------------------- diff --git a/oozie/src/main/java/org/apache/oozie/client/LocalOozieClientCoordProxy.java b/oozie/src/main/java/org/apache/oozie/client/LocalOozieClientCoordProxy.java index 093d6ff..ff4561b 100644 --- a/oozie/src/main/java/org/apache/oozie/client/LocalOozieClientCoordProxy.java +++ b/oozie/src/main/java/org/apache/oozie/client/LocalOozieClientCoordProxy.java @@ -20,6 +20,7 @@ package org.apache.oozie.client; import org.apache.oozie.BaseEngineException; import org.apache.oozie.CoordinatorEngine; +import org.apache.oozie.CoordinatorEngineException; import org.apache.oozie.LocalOozieClientCoord; /** @@ -59,4 +60,19 @@ public class LocalOozieClientCoordProxy extends LocalOozieClientCoord { throw new OozieClientException(bex.getErrorCode().toString(), bex); } } + + /** + * Change a coordinator job. + * + * @param jobId job Id. + * @param changeValue change value. + * @throws OozieClientException thrown if the job could not be changed. + */ + public void change(String jobId, String changeValue) throws OozieClientException { + try { + coordEngine.change(jobId, changeValue); + } catch (CoordinatorEngineException e) { + throw new OozieClientException(e.getErrorCode().toString(), e); + } + } } http://git-wip-us.apache.org/repos/asf/falcon/blob/ef4e8a4b/oozie/src/main/java/org/apache/oozie/client/LocalProxyOozieClient.java ---------------------------------------------------------------------- diff --git a/oozie/src/main/java/org/apache/oozie/client/LocalProxyOozieClient.java b/oozie/src/main/java/org/apache/oozie/client/LocalProxyOozieClient.java index 6ae92de..f6e87c4 100644 --- a/oozie/src/main/java/org/apache/oozie/client/LocalProxyOozieClient.java +++ b/oozie/src/main/java/org/apache/oozie/client/LocalProxyOozieClient.java @@ -188,7 +188,7 @@ public class LocalProxyOozieClient extends OozieClient { @Override public void change(final String jobId, final String changeValue) throws OozieClientException { - throw new IllegalStateException("Change not supported"); + getClient(jobId).change(jobId, changeValue); } @Override http://git-wip-us.apache.org/repos/asf/falcon/blob/ef4e8a4b/prism/src/main/java/org/apache/falcon/resource/admin/AdminResource.java ---------------------------------------------------------------------- diff --git a/prism/src/main/java/org/apache/falcon/resource/admin/AdminResource.java b/prism/src/main/java/org/apache/falcon/resource/admin/AdminResource.java index ace21cb..a75c97c 100644 --- a/prism/src/main/java/org/apache/falcon/resource/admin/AdminResource.java +++ b/prism/src/main/java/org/apache/falcon/resource/admin/AdminResource.java @@ -151,20 +151,28 @@ public class AdminResource { } //SUSPEND CHECKSTYLE CHECK VisibilityModifierCheck + + /** + * Class for property. + */ @XmlRootElement(name = "property") @XmlAccessorType(XmlAccessType.FIELD) @edu.umd.cs.findbugs.annotations.SuppressWarnings({"URF_UNREAD_PUBLIC_OR_PROTECTED_FIELD"}) - protected static class Property { + public static class Property { public String key; public String value; } //RESUME CHECKSTYLE CHECK VisibilityModifierCheck //SUSPEND CHECKSTYLE CHECK VisibilityModifierCheck + + /** + * Class for list of Properties. + */ @XmlRootElement(name = "properties") @XmlAccessorType(XmlAccessType.FIELD) @edu.umd.cs.findbugs.annotations.SuppressWarnings({"URF_UNREAD_PUBLIC_OR_PROTECTED_FIELD"}) - protected static class PropertyList { + public static class PropertyList { public List<Property> properties; } //RESUME CHECKSTYLE CHECK VisibilityModifierCheck http://git-wip-us.apache.org/repos/asf/falcon/blob/ef4e8a4b/unit/src/main/java/org/apache/falcon/unit/FalconUnitClient.java ---------------------------------------------------------------------- diff --git a/unit/src/main/java/org/apache/falcon/unit/FalconUnitClient.java b/unit/src/main/java/org/apache/falcon/unit/FalconUnitClient.java index 9f2b714..9eb4277 100644 --- a/unit/src/main/java/org/apache/falcon/unit/FalconUnitClient.java +++ b/unit/src/main/java/org/apache/falcon/unit/FalconUnitClient.java @@ -32,13 +32,17 @@ import org.apache.falcon.entity.v0.process.Cluster; import org.apache.falcon.entity.v0.process.Process; import org.apache.falcon.entity.v0.process.Validity; import org.apache.falcon.resource.APIResult; +import org.apache.falcon.resource.EntityList; +import org.apache.falcon.resource.EntitySummaryResult; import org.apache.falcon.resource.FeedInstanceResult; import org.apache.falcon.resource.InstanceDependencyResult; import org.apache.falcon.resource.InstancesResult; import org.apache.falcon.resource.InstancesSummaryResult; +import org.apache.falcon.resource.admin.AdminResource; import org.apache.falcon.util.DateUtil; import org.apache.falcon.workflow.WorkflowEngineFactory; import org.apache.falcon.workflow.engine.AbstractWorkflowEngine; +import org.apache.hadoop.security.authorize.AuthorizationException; import org.slf4j.Logger; import org.slf4j.LoggerFactory; @@ -111,7 +115,13 @@ public class FalconUnitClient extends AbstractFalconClient { @Override public APIResult schedule(EntityType entityType, String entityName, String cluster, Boolean skipDryRun, String doAsUser, String properties) throws FalconCLIException { - return schedule(entityType, entityName, null, 0, cluster, skipDryRun, properties); + try { + return localSchedulableEntityManager.schedule(entityType, entityName, skipDryRun, properties); + } catch (FalconException e) { + throw new FalconCLIException(e); + } catch (AuthorizationException e) { + throw new FalconCLIException(e); + } } @Override @@ -242,6 +252,42 @@ public class FalconUnitClient extends AbstractFalconClient { return localSchedulableEntityManager.getStatus(entityType.name(), entityName, colo); } + @Override + public APIResult submitAndSchedule(String entityType, String filePath, Boolean skipDryRun, String doAsUser, + String properties) throws FalconCLIException { + try { + return localSchedulableEntityManager.submitAndSchedule(entityType, filePath, skipDryRun, doAsUser, + properties); + } catch (FalconException e) { + throw new FalconCLIException(e); + } catch (IOException e) { + throw new FalconCLIException(e); + } + } + + @Override + public EntityList getEntityList(String entityType, String fields, String nameSubsequence, String tagKeywords, + String filterBy, String filterTags, String orderBy, String sortOrder, + Integer offset, Integer numResults, String doAsUser) throws FalconCLIException { + return localSchedulableEntityManager.getEntityList(fields, nameSubsequence, tagKeywords, entityType, filterTags, + filterBy, orderBy, sortOrder, offset, numResults, doAsUser); + } + + @Override + public EntitySummaryResult getEntitySummary(String entityType, String cluster, String start, String end, + String fields, String filterBy, String filterTags, String orderBy, + String sortOrder, Integer offset, Integer numResults, + Integer numInstances, String doAsUser) throws FalconCLIException { + return localSchedulableEntityManager.getEntitySummary(entityType, cluster, start, end, fields, filterBy, + filterTags, orderBy, sortOrder, offset, numResults, numInstances, doAsUser); + } + + @Override + public APIResult touch(String entityType, String entityName, String colo, Boolean skipDryRun, + String doAsUser) throws FalconCLIException { + return localSchedulableEntityManager.touch(entityType, entityName, colo, skipDryRun); + } + public InstancesResult killInstances(String type, String entity, String start, String end, String colo, String clusters, String sourceClusters, List<LifeCycle> lifeCycles, String doAsUser) throws FalconCLIException, UnsupportedEncodingException { @@ -304,6 +350,20 @@ public class FalconUnitClient extends AbstractFalconClient { return localInstanceManager.getInstanceDependencies(entityType, entityName, instanceTime, colo); } + @Override + public String getVersion(String doAsUser) throws FalconCLIException { + AdminResource resource = new AdminResource(); + AdminResource.PropertyList propertyList = resource.getVersion(); + StringBuilder properties = new StringBuilder(); + for(AdminResource.Property property : propertyList.properties) { + if (properties.length() > 1) { + properties.append(","); + } + properties.append(property.key).append(":").append(property.value); + } + return properties.toString(); + } + private boolean checkAndUpdateCluster(Entity entity, EntityType entityType, String cluster) { if (entityType == EntityType.FEED) { return checkAndUpdateFeedClusters(entity, cluster); http://git-wip-us.apache.org/repos/asf/falcon/blob/ef4e8a4b/unit/src/main/java/org/apache/falcon/unit/LocalSchedulableEntityManager.java ---------------------------------------------------------------------- diff --git a/unit/src/main/java/org/apache/falcon/unit/LocalSchedulableEntityManager.java b/unit/src/main/java/org/apache/falcon/unit/LocalSchedulableEntityManager.java index 42adc9a..0065c71 100644 --- a/unit/src/main/java/org/apache/falcon/unit/LocalSchedulableEntityManager.java +++ b/unit/src/main/java/org/apache/falcon/unit/LocalSchedulableEntityManager.java @@ -18,10 +18,14 @@ package org.apache.falcon.unit; import org.apache.falcon.FalconException; +import org.apache.falcon.entity.EntityUtil; import org.apache.falcon.entity.v0.Entity; import org.apache.falcon.entity.v0.EntityType; import org.apache.falcon.resource.APIResult; import org.apache.falcon.resource.AbstractSchedulableEntityManager; +import org.apache.falcon.resource.EntityList; +import org.apache.falcon.resource.EntitySummaryResult; +import org.apache.hadoop.security.authorize.AuthorizationException; import java.io.IOException; import java.io.InputStream; @@ -69,4 +73,42 @@ public class LocalSchedulableEntityManager extends AbstractSchedulableEntityMana Entity entity = super.submitInternal(inputStream, entityType, doAsUser); return new APIResult(APIResult.Status.SUCCEEDED, "Submit successful (" + entityType + ") " + entity.getName()); } + + public APIResult schedule(EntityType entityType, String entityName, Boolean skipDryRun, String properties) throws + FalconException, AuthorizationException { + scheduleInternal(entityType.name(), entityName, skipDryRun, EntityUtil.getPropertyMap(properties)); + return new APIResult(APIResult.Status.SUCCEEDED, entityName + "(" + entityType + ") scheduled successfully"); + } + + public APIResult submitAndSchedule(String entityType, String filePath, Boolean skipDryRun, String doAsUser, + String properties) throws FalconException, IOException { + InputStream inputStream = FalconUnitHelper.getFileInputStream(filePath); + Entity entity = super.submitInternal(inputStream, entityType, doAsUser); + scheduleInternal(entityType, entity.getName(), skipDryRun, EntityUtil.getPropertyMap(properties)); + return new APIResult(APIResult.Status.SUCCEEDED, + entity.getName() + "(" + entityType + ") scheduled successfully"); + } + + //SUSPEND CHECKSTYLE CHECK ParameterNumberCheck + public EntityList getEntityList(String fieldStr, String nameSubsequence, String tagKeywords, + String filterType, String filterTags, String filterBy, + String orderBy, String sortOrder, Integer offset, + Integer resultsPerPage, final String doAsUser) { + return super.getEntityList(fieldStr, nameSubsequence, tagKeywords, filterType, filterTags, filterBy, orderBy, + sortOrder, offset, resultsPerPage, doAsUser); + } + + public EntitySummaryResult getEntitySummary(String type, String cluster, String startDate, String endDate, + String fields, String filterBy, String filterTags, + String orderBy, String sortOrder, Integer offset, + Integer resultsPerPage, Integer numInstances, final String doAsUser) { + return super.getEntitySummary(type, cluster, startDate, endDate, fields, filterBy, filterTags, orderBy, + sortOrder, offset, resultsPerPage, numInstances, doAsUser); + } + + public APIResult touch(String type, String entityName, String colo, Boolean skipDryRun) { + return super.touch(type, entityName, colo, skipDryRun); + } + //RESUME CHECKSTYLE CHECK ParameterNumberCheck + } http://git-wip-us.apache.org/repos/asf/falcon/blob/ef4e8a4b/unit/src/test/java/org/apache/falcon/unit/FalconUnitTestBase.java ---------------------------------------------------------------------- diff --git a/unit/src/test/java/org/apache/falcon/unit/FalconUnitTestBase.java b/unit/src/test/java/org/apache/falcon/unit/FalconUnitTestBase.java index 42cb779..2a73516 100644 --- a/unit/src/test/java/org/apache/falcon/unit/FalconUnitTestBase.java +++ b/unit/src/test/java/org/apache/falcon/unit/FalconUnitTestBase.java @@ -179,15 +179,15 @@ public class FalconUnitTestBase { skipDryRun, properties); } - public APIResult scheduleProcess(String processName, String startTime, int numInstances, - String cluster, Boolean skipDryRun, - String properties) throws FalconException, FalconCLIException { + public APIResult scheduleProcess(String processName, String cluster, String localWfPath) throws FalconException, + IOException, FalconCLIException { Process processEntity = configStore.get(EntityType.PROCESS, processName); if (processEntity == null) { throw new FalconException("Process not found " + processName); } - return falconUnitClient.schedule(EntityType.PROCESS, processName, startTime, numInstances, cluster, - skipDryRun, properties); + String workflowPath = processEntity.getWorkflow().getPath(); + fs.copyFromLocalFile(new Path(localWfPath), new Path(workflowPath, "workflow.xml")); + return falconUnitClient.schedule(EntityType.PROCESS, processName, cluster, false, null, null); } public APIResult schedule(EntityType entityType, String entityName, String cluster) throws FalconException, @@ -199,6 +199,14 @@ public class FalconUnitTestBase { return falconUnitClient.schedule(entityType, entityName, cluster, false, null, null); } + public APIResult submitAndSchedule(String type, String filePath, String localWfPath, Boolean skipDryRun, + String doAsUser, String properties, String appDirectory) throws IOException, + FalconException, FalconCLIException { + createDir(appDirectory); + fs.copyFromLocalFile(new Path(localWfPath), new Path(appDirectory, "workflow.xml")); + return falconUnitClient.submitAndSchedule(type, filePath, skipDryRun, doAsUser, properties); + } + private Map<String, String> updateColoAndCluster(String colo, String cluster, Map<String, String> props) { if (props == null) { props = new HashMap<>(); http://git-wip-us.apache.org/repos/asf/falcon/blob/ef4e8a4b/unit/src/test/java/org/apache/falcon/unit/TestFalconUnit.java ---------------------------------------------------------------------- diff --git a/unit/src/test/java/org/apache/falcon/unit/TestFalconUnit.java b/unit/src/test/java/org/apache/falcon/unit/TestFalconUnit.java index 2c8642d..68c09d5 100644 --- a/unit/src/test/java/org/apache/falcon/unit/TestFalconUnit.java +++ b/unit/src/test/java/org/apache/falcon/unit/TestFalconUnit.java @@ -24,6 +24,8 @@ import org.apache.falcon.entity.v0.EntityType; import org.apache.falcon.entity.v0.process.Process; import org.apache.falcon.entity.v0.process.Property; import org.apache.falcon.resource.APIResult; +import org.apache.falcon.resource.EntityList; +import org.apache.falcon.resource.EntitySummaryResult; import org.apache.falcon.resource.FeedInstanceResult; import org.apache.falcon.resource.InstanceDependencyResult; import org.apache.falcon.resource.InstancesResult; @@ -56,8 +58,8 @@ public class TestFalconUnit extends FalconUnitTestBase { private static final String PROCESS_NAME = "process"; private static final String OUTPUT_FEED_NAME = "out"; private static final String INPUT_FILE_NAME = "input.txt"; - private static final String SCHEDULE_TIME = "2015-06-20T00:00Z"; - private static final String END_TIME = "2015-06-20T00:01Z"; + private static final String SCHEDULE_TIME = "2013-11-18T00:05Z"; + private static final String END_TIME = "2013-11-18T00:07Z"; private static final String WORKFLOW = "workflow.xml"; private static final String SLEEP_WORKFLOW = "sleepWorkflow.xml"; @@ -68,8 +70,7 @@ public class TestFalconUnit extends FalconUnitTestBase { createData(INPUT_FEED_NAME, CLUSTER_NAME, SCHEDULE_TIME, INPUT_FILE_NAME); APIResult result = submitProcess(getAbsolutePath(PROCESS), PROCESS_APP_PATH); assertStatus(result); - result = scheduleProcess(PROCESS_NAME, SCHEDULE_TIME, 1, CLUSTER_NAME, getAbsolutePath(WORKFLOW), - true, ""); + result = scheduleProcess(PROCESS_NAME, SCHEDULE_TIME, 1, CLUSTER_NAME, getAbsolutePath(WORKFLOW), true, ""); assertStatus(result); waitForStatus(EntityType.PROCESS.name(), PROCESS_NAME, SCHEDULE_TIME, InstancesResult.WorkflowStatus.SUCCEEDED); InstancesResult.WorkflowStatus status = getClient().getInstanceStatus(EntityType.PROCESS.name(), @@ -81,7 +82,6 @@ public class TestFalconUnit extends FalconUnitTestBase { Assert.assertTrue(files.length > 0); } - @Test public void testRetention() throws IOException, FalconCLIException, FalconException, ParseException, InterruptedException { @@ -109,16 +109,13 @@ public class TestFalconUnit extends FalconUnitTestBase { @Test public void testSuspendAndResume() throws Exception { submitClusterAndFeeds(); - // submitting and scheduling process - String scheduleTime = "2015-06-20T00:00Z"; - //String processName = "process1"; - createData(INPUT_FEED_NAME, CLUSTER_NAME, scheduleTime, INPUT_FILE_NAME); + // submitting and scheduling process; + createData(INPUT_FEED_NAME, CLUSTER_NAME, SCHEDULE_TIME, INPUT_FILE_NAME); APIResult result = submitProcess(getAbsolutePath(PROCESS), PROCESS_APP_PATH); assertStatus(result); - result = scheduleProcess(PROCESS_NAME, scheduleTime, 2, CLUSTER_NAME, getAbsolutePath(WORKFLOW), - true, ""); + result = scheduleProcess(PROCESS_NAME, SCHEDULE_TIME, 2, CLUSTER_NAME, getAbsolutePath(WORKFLOW), true, ""); assertStatus(result); - waitForStatus(EntityType.PROCESS.name(), PROCESS_NAME, scheduleTime, InstancesResult.WorkflowStatus.SUCCEEDED); + waitForStatus(EntityType.PROCESS.name(), PROCESS_NAME, SCHEDULE_TIME, InstancesResult.WorkflowStatus.SUCCEEDED); result = getClient().suspend(EntityType.PROCESS, PROCESS_NAME, CLUSTER_NAME, null); assertStatus(result); result = getClient().getStatus(EntityType.PROCESS, PROCESS_NAME, CLUSTER_NAME, null); @@ -141,8 +138,7 @@ public class TestFalconUnit extends FalconUnitTestBase { createData(INPUT_FEED_NAME, CLUSTER_NAME, SCHEDULE_TIME, INPUT_FILE_NAME); result = submitProcess(getAbsolutePath(PROCESS), PROCESS_APP_PATH); assertStatus(result); - result = scheduleProcess(PROCESS_NAME, SCHEDULE_TIME, 10, CLUSTER_NAME, getAbsolutePath(WORKFLOW), - true, ""); + result = scheduleProcess(PROCESS_NAME, SCHEDULE_TIME, 2, CLUSTER_NAME, getAbsolutePath(WORKFLOW), true, ""); assertStatus(result); waitForStatus(EntityType.PROCESS.name(), PROCESS_NAME, SCHEDULE_TIME, InstancesResult.WorkflowStatus.SUCCEEDED); result = getClient().delete(EntityType.PROCESS, PROCESS_NAME, null); @@ -180,8 +176,8 @@ public class TestFalconUnit extends FalconUnitTestBase { } @Test - public void testUpdate() throws IOException, FalconCLIException, FalconException, - ParseException, InterruptedException { + public void testUpdateAndTouch() throws IOException, FalconCLIException, FalconException, ParseException, + InterruptedException { submitClusterAndFeeds(); APIResult result = submitProcess(getAbsolutePath(PROCESS), PROCESS_APP_PATH); assertStatus(result); @@ -208,6 +204,8 @@ public class TestFalconUnit extends FalconUnitTestBase { result = falconUnitClient.update(EntityType.PROCESS.name(), PROCESS_NAME, file.getAbsolutePath(), true, null); assertStatus(result); + result = falconUnitClient.touch(EntityType.PROCESS.name(), PROCESS_NAME, null, true, null); + assertStatus(result); process = getEntity(EntityType.PROCESS, PROCESS_NAME); @@ -230,7 +228,6 @@ public class TestFalconUnit extends FalconUnitTestBase { property.setName("dummy"); property.setValue("dummy"); process.getProperties().getProperties().add(property); - } @Test @@ -240,7 +237,7 @@ public class TestFalconUnit extends FalconUnitTestBase { createData(INPUT_FEED_NAME, CLUSTER_NAME, SCHEDULE_TIME, INPUT_FILE_NAME); APIResult result = submitProcess(getAbsolutePath(PROCESS), PROCESS_APP_PATH); assertStatus(result); - result = scheduleProcess(PROCESS_NAME, SCHEDULE_TIME, 3, CLUSTER_NAME, getAbsolutePath(SLEEP_WORKFLOW), true, + result = scheduleProcess(PROCESS_NAME, SCHEDULE_TIME, 2, CLUSTER_NAME, getAbsolutePath(SLEEP_WORKFLOW), true, ""); assertStatus(result); InstancesResult.WorkflowStatus currentStatus; @@ -280,8 +277,8 @@ public class TestFalconUnit extends FalconUnitTestBase { createData(INPUT_FEED_NAME, CLUSTER_NAME, SCHEDULE_TIME, INPUT_FILE_NAME); APIResult result = submitProcess(getAbsolutePath(PROCESS), PROCESS_APP_PATH); assertStatus(result); - result = scheduleProcess(PROCESS_NAME, SCHEDULE_TIME, 3, CLUSTER_NAME, getAbsolutePath(WORKFLOW), true, ""); - assertStatus(result); + result = scheduleProcess(PROCESS_NAME, SCHEDULE_TIME, 2, CLUSTER_NAME, getAbsolutePath(SLEEP_WORKFLOW), true, + ""); InstancesResult.WorkflowStatus currentStatus; waitForStatus(EntityType.PROCESS.name(), PROCESS_NAME, SCHEDULE_TIME, InstancesResult.WorkflowStatus.SUCCEEDED); currentStatus = getClient().getInstanceStatus(EntityType.PROCESS.name(), PROCESS_NAME, SCHEDULE_TIME); @@ -329,4 +326,44 @@ public class TestFalconUnit extends FalconUnitTestBase { SCHEDULE_TIME, END_TIME, null, null); Assert.assertEquals(feedInstanceResult.getStatus(), APIResult.Status.SUCCEEDED); } + + @Test + public void testEntityList() throws Exception { + submitClusterAndFeeds(); + // submitting and scheduling process + createData(INPUT_FEED_NAME, CLUSTER_NAME, SCHEDULE_TIME, INPUT_FILE_NAME); + APIResult result = submitAndSchedule(EntityType.PROCESS.name(), getAbsolutePath(PROCESS), + getAbsolutePath(SLEEP_WORKFLOW), true, null, "", PROCESS_APP_PATH); + assertStatus(result); + waitForStatus(EntityType.PROCESS.name(), PROCESS_NAME, SCHEDULE_TIME, InstancesResult.WorkflowStatus.RUNNING); + InstancesResult.WorkflowStatus currentStatus = getClient().getInstanceStatus(EntityType.PROCESS.name(), + PROCESS_NAME, SCHEDULE_TIME); + Assert.assertEquals(currentStatus, InstancesResult.WorkflowStatus.RUNNING); + + EntityList entityList = getClient().getEntityList(EntityType.PROCESS.name(), "", "", null, null, null, null, + null, new Integer(0), new Integer(1), null); + Assert.assertNotNull(entityList.getElements()); + Assert.assertEquals(entityList.getElements().length, 1); + Assert.assertEquals(entityList.getElements()[0].name, PROCESS_NAME); + } + + @Test + public void testEntitySummary() throws Exception { + submitClusterAndFeeds(); + // submitting and scheduling process + createData(INPUT_FEED_NAME, CLUSTER_NAME, SCHEDULE_TIME, INPUT_FILE_NAME); + APIResult result = submitAndSchedule(EntityType.PROCESS.name(), getAbsolutePath(PROCESS), + getAbsolutePath(SLEEP_WORKFLOW), true, null, "", PROCESS_APP_PATH); + assertStatus(result); + waitForStatus(EntityType.PROCESS.name(), PROCESS_NAME, SCHEDULE_TIME, InstancesResult.WorkflowStatus.RUNNING); + InstancesResult.WorkflowStatus currentStatus = getClient().getInstanceStatus(EntityType.PROCESS.name(), + PROCESS_NAME, SCHEDULE_TIME); + Assert.assertEquals(currentStatus, InstancesResult.WorkflowStatus.RUNNING); + EntitySummaryResult summaryResult = getClient().getEntitySummary(EntityType.PROCESS.name(), CLUSTER_NAME, + SCHEDULE_TIME, END_TIME, "", "", null, null, null, new Integer(0), new Integer(1), new Integer(1), + null); + Assert.assertEquals(summaryResult.getStatus(), APIResult.Status.SUCCEEDED); + Assert.assertNotNull(summaryResult.getEntitySummaries()); + Assert.assertEquals(summaryResult.getEntitySummaries().length, 1); + } } http://git-wip-us.apache.org/repos/asf/falcon/blob/ef4e8a4b/webapp/src/test/java/org/apache/falcon/resource/ProcessInstanceManagerIT.java ---------------------------------------------------------------------- diff --git a/webapp/src/test/java/org/apache/falcon/resource/ProcessInstanceManagerIT.java b/webapp/src/test/java/org/apache/falcon/resource/ProcessInstanceManagerIT.java index 8dbbd7d..6458b59 100644 --- a/webapp/src/test/java/org/apache/falcon/resource/ProcessInstanceManagerIT.java +++ b/webapp/src/test/java/org/apache/falcon/resource/ProcessInstanceManagerIT.java @@ -123,22 +123,22 @@ public class ProcessInstanceManagerIT extends FalconUnitTestBase { submitCluster(context.colo, context.clusterName, null); context.scheduleProcess(); waitForStatus(EntityType.PROCESS.name(), context.processName, START_INSTANCE, WorkflowStatus.RUNNING); - String endTime = "2012-04-21T00:00Z"; + String endTime = "2012-04-20T00:01Z"; InstancesResult response = context.getClient().getStatusOfInstances(EntityType.PROCESS.name(), context.processName, START_INSTANCE, endTime, context.colo, null, null, "", "", 0, 1, null); Assert.assertEquals(response.getStatus(), APIResult.Status.SUCCEEDED); Assert.assertNotNull(response.getInstances()); Assert.assertEquals(response.getInstances().length, 1); - assertInstance(response.getInstances()[0], START_INSTANCE, WorkflowStatus.RUNNING); + Assert.assertEquals(response.getInstances()[0].getStatus(), WorkflowStatus.RUNNING); } @Test public void testGetInstanceStatusPagination() throws Exception { UnitTestContext context = new UnitTestContext(); submitCluster(context.colo, context.clusterName, null); - context.scheduleProcessForPagination(); + context.scheduleProcess(); waitForStatus(EntityType.PROCESS.name(), context.processName, START_INSTANCE, WorkflowStatus.RUNNING); - String endTime = "2012-04-20T00:03Z"; + String endTime = "2012-04-20T00:02Z"; InstancesResult response = context.getClient().getStatusOfInstances(EntityType.PROCESS.name(), context.processName, START_INSTANCE, endTime, context.colo, null, "STATUS:RUNNING", "startTime", "", 0, new Integer(1), null); @@ -154,7 +154,7 @@ public class ProcessInstanceManagerIT extends FalconUnitTestBase { submitCluster(context.colo, context.clusterName, null); context.scheduleProcess(); waitForStatus(EntityType.PROCESS.name(), context.processName, START_INSTANCE, WorkflowStatus.RUNNING); - String endTime = "2012-04-21T00:00Z"; + String endTime = "2012-04-20T00:01Z"; context.getClient().killInstances(EntityType.PROCESS.name(), context.processName, START_INSTANCE, endTime, context.colo, null, null, null, null); waitForStatus(EntityType.PROCESS.name(), context.processName, START_INSTANCE, WorkflowStatus.KILLED); @@ -172,7 +172,7 @@ public class ProcessInstanceManagerIT extends FalconUnitTestBase { Assert.assertEquals(response.getStatus(), APIResult.Status.SUCCEEDED); Assert.assertNotNull(response.getInstances()); Assert.assertEquals(response.getInstances().length, 1); - assertInstance(response.getInstances()[0], START_INSTANCE, WorkflowStatus.KILLED); + Assert.assertEquals(response.getInstances()[0].getStatus(), WorkflowStatus.KILLED); } @Test @@ -181,7 +181,7 @@ public class ProcessInstanceManagerIT extends FalconUnitTestBase { submitCluster(context.colo, context.clusterName, null); context.scheduleProcess(); waitForStatus(EntityType.PROCESS.name(), context.processName, START_INSTANCE, WorkflowStatus.RUNNING); - String endTime = "2012-04-21T00:00Z"; + String endTime = "2012-04-20T00:01Z"; context.getClient().killInstances(EntityType.PROCESS.name(), context.processName, START_INSTANCE, endTime, context.colo, null, null, null, null); waitForStatus(EntityType.PROCESS.name(), context.processName, START_INSTANCE, WorkflowStatus.KILLED); @@ -202,7 +202,7 @@ public class ProcessInstanceManagerIT extends FalconUnitTestBase { Assert.assertEquals(response.getStatus(), APIResult.Status.SUCCEEDED); Assert.assertNotNull(response.getInstances()); Assert.assertEquals(response.getInstances().length, 1); - assertInstance(response.getInstances()[0], START_INSTANCE, WorkflowStatus.RUNNING); + Assert.assertEquals(response.getInstances()[0].getStatus(), WorkflowStatus.RUNNING); } @Test @@ -211,7 +211,7 @@ public class ProcessInstanceManagerIT extends FalconUnitTestBase { submitCluster(context.colo, context.clusterName, null); context.scheduleProcess(); waitForStatus(EntityType.PROCESS.name(), context.processName, START_INSTANCE, WorkflowStatus.RUNNING); - String endTime = "2012-04-21T00:00Z"; + String endTime = "2012-04-20T00:01Z"; context.getClient().suspendInstances(EntityType.PROCESS.name(), context.processName, START_INSTANCE, endTime, context.colo, context.clusterName, null, null, null); @@ -222,7 +222,7 @@ public class ProcessInstanceManagerIT extends FalconUnitTestBase { Assert.assertEquals(response.getStatus(), APIResult.Status.SUCCEEDED); Assert.assertNotNull(response.getInstances()); Assert.assertEquals(response.getInstances().length, 1); - assertInstance(response.getInstances()[0], START_INSTANCE, WorkflowStatus.SUSPENDED); + Assert.assertEquals(response.getInstances()[0].getStatus(), WorkflowStatus.SUSPENDED); } @Test @@ -231,7 +231,7 @@ public class ProcessInstanceManagerIT extends FalconUnitTestBase { submitCluster(context.colo, context.clusterName, null); context.scheduleProcess(); waitForStatus(EntityType.PROCESS.name(), context.processName, START_INSTANCE, WorkflowStatus.RUNNING); - String endTime = "2012-04-21T00:00Z"; + String endTime = "2012-04-20T00:01Z"; context.getClient().suspendInstances(EntityType.PROCESS.name(), context.processName, START_INSTANCE, endTime, context.colo, context.clusterName, null, null, null); @@ -253,6 +253,6 @@ public class ProcessInstanceManagerIT extends FalconUnitTestBase { Assert.assertEquals(response.getStatus(), APIResult.Status.SUCCEEDED); Assert.assertNotNull(response.getInstances()); Assert.assertEquals(response.getInstances().length, 1); - assertInstance(response.getInstances()[0], START_INSTANCE, WorkflowStatus.RUNNING); + Assert.assertEquals(response.getInstances()[0].getStatus(), WorkflowStatus.RUNNING); } } http://git-wip-us.apache.org/repos/asf/falcon/blob/ef4e8a4b/webapp/src/test/java/org/apache/falcon/resource/UnitTestContext.java ---------------------------------------------------------------------- diff --git a/webapp/src/test/java/org/apache/falcon/resource/UnitTestContext.java b/webapp/src/test/java/org/apache/falcon/resource/UnitTestContext.java index 37442c3..1d49353 100644 --- a/webapp/src/test/java/org/apache/falcon/resource/UnitTestContext.java +++ b/webapp/src/test/java/org/apache/falcon/resource/UnitTestContext.java @@ -44,8 +44,6 @@ public class UnitTestContext { public static final String FEED_TEMPLATE2 = "/feed-template2.xml"; public static final String PROCESS_TEMPLATE = "/process-template.xml"; - private static final String START_INSTANCE = "2012-04-20T00:00Z"; - protected String colo; protected String clusterName; protected String processName; @@ -99,14 +97,10 @@ public class UnitTestContext { } public void scheduleProcess() throws Exception { - scheduleProcess(PROCESS_TEMPLATE, overlay, 1); - } - - public void scheduleProcessForPagination() throws Exception { - scheduleProcess(PROCESS_TEMPLATE, overlay, 2); + scheduleProcess(PROCESS_TEMPLATE, overlay); } - public void scheduleProcess(String processTemplate, Map<String, String> uniqueOverlay, int numInstances) throws + public void scheduleProcess(String processTemplate, Map<String, String> uniqueOverlay) throws Exception { prepare(); @@ -122,9 +116,7 @@ public class UnitTestContext { result = client.submit(EntityType.PROCESS.name(), tmpFile, null); Assert.assertEquals(result.getStatus(), APIResult.Status.SUCCEEDED); - String scheduleTime = START_INSTANCE; - - result = client.schedule(EntityType.PROCESS, processName, scheduleTime, numInstances, clusterName, true, ""); + result = client.schedule(EntityType.PROCESS, processName, clusterName, true, null, null); Assert.assertEquals(result.getStatus(), APIResult.Status.SUCCEEDED); }
