Repository: falcon Updated Branches: refs/heads/master c219812ca -> 054aa772b
FALCON-1335 Backend support of instance search of a group of entities Tested REST API: api/instance/search; Added unit test p.s. Accidentally deleted the previous branch. Previous reviews: https://github.com/apache/falcon/pull/93 Author: yzheng-hortonworks <[email protected]> Reviewers: "Sowmya <[email protected]>, Peeyush<[email protected]>, Balu <[email protected]>" Closes #106 from yzheng-hortonworks/FALCON-1335 Project: http://git-wip-us.apache.org/repos/asf/falcon/repo Commit: http://git-wip-us.apache.org/repos/asf/falcon/commit/054aa772 Tree: http://git-wip-us.apache.org/repos/asf/falcon/tree/054aa772 Diff: http://git-wip-us.apache.org/repos/asf/falcon/diff/054aa772 Branch: refs/heads/master Commit: 054aa772b404d96c38dbadcddee1e28a75861bb0 Parents: c219812 Author: yzheng-hortonworks <[email protected]> Authored: Wed Apr 20 11:49:07 2016 -0700 Committer: bvellanki <[email protected]> Committed: Wed Apr 20 11:49:07 2016 -0700 ---------------------------------------------------------------------- .../falcon/metadata/RelationshipType.java | 14 +++ .../org/apache/falcon/metadata/GraphUtils.java | 41 +++++++++ .../falcon/resource/AbstractEntityManager.java | 31 +++++-- .../resource/AbstractInstanceManager.java | 94 ++++++++++++++++++++ .../metadata/AbstractMetadataResource.java | 2 +- .../resource/proxy/InstanceManagerProxy.java | 18 ++++ .../falcon/resource/InstanceManagerTest.java | 70 ++++++++++++++- .../resource/metadata/MetadataTestContext.java | 46 +++++++--- .../apache/falcon/resource/InstanceManager.java | 20 +++++ 9 files changed, 313 insertions(+), 23 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/falcon/blob/054aa772/client/src/main/java/org/apache/falcon/metadata/RelationshipType.java ---------------------------------------------------------------------- diff --git a/client/src/main/java/org/apache/falcon/metadata/RelationshipType.java b/client/src/main/java/org/apache/falcon/metadata/RelationshipType.java index 6624319..47bc377 100644 --- a/client/src/main/java/org/apache/falcon/metadata/RelationshipType.java +++ b/client/src/main/java/org/apache/falcon/metadata/RelationshipType.java @@ -18,6 +18,8 @@ package org.apache.falcon.metadata; +import org.apache.falcon.entity.v0.EntityType; + /** * Enumerates Relationship types. */ @@ -64,4 +66,16 @@ public enum RelationshipType { throw new IllegalArgumentException("No constant with value " + value + " found"); } + + public static RelationshipType fromSchedulableEntityType(String type) { + EntityType entityType = EntityType.getEnum(type); + switch (entityType) { + case FEED: + return RelationshipType.FEED_ENTITY; + case PROCESS: + return RelationshipType.PROCESS_ENTITY; + default: + throw new IllegalArgumentException("Invalid schedulable entity type: " + entityType.name()); + } + } } http://git-wip-us.apache.org/repos/asf/falcon/blob/054aa772/common/src/main/java/org/apache/falcon/metadata/GraphUtils.java ---------------------------------------------------------------------- diff --git a/common/src/main/java/org/apache/falcon/metadata/GraphUtils.java b/common/src/main/java/org/apache/falcon/metadata/GraphUtils.java index 8bec02f..6350e20 100644 --- a/common/src/main/java/org/apache/falcon/metadata/GraphUtils.java +++ b/common/src/main/java/org/apache/falcon/metadata/GraphUtils.java @@ -18,9 +18,14 @@ package org.apache.falcon.metadata; +import org.apache.commons.lang3.StringUtils; +import com.thinkaurelius.titan.core.BaseVertexQuery; +import com.thinkaurelius.titan.core.Order; import com.tinkerpop.blueprints.Direction; import com.tinkerpop.blueprints.Edge; import com.tinkerpop.blueprints.Graph; +import com.tinkerpop.blueprints.GraphQuery; +import com.tinkerpop.blueprints.Query.Compare; import com.tinkerpop.blueprints.Vertex; import com.tinkerpop.blueprints.util.io.graphson.GraphSONWriter; import org.slf4j.Logger; @@ -28,6 +33,7 @@ import org.slf4j.LoggerFactory; import java.io.IOException; import java.io.OutputStream; +import java.util.Iterator; /** * Utility class for graph operations. @@ -81,4 +87,39 @@ public final class GraphUtils { + edge.getVertex(Direction.IN).getProperty("name") + "]"; } + + public static Vertex findVertex(Graph graph, String name, RelationshipType relationshipType) { + LOG.debug("Finding vertex for: name={}, type={}", name, relationshipType.getName()); + GraphQuery query = graph.query() + .has(RelationshipProperty.NAME.getName(), name) + .has(RelationshipProperty.TYPE.getName(), relationshipType.getName()); + Iterator<Vertex> results = query.vertices().iterator(); + return results.hasNext() ? results.next() : null; // returning one since name is unique + } + + public static BaseVertexQuery addRangeQuery(BaseVertexQuery query, + RelationshipProperty property, String minValue, String maxValue) { + if (StringUtils.isNotEmpty(minValue)) { + query.has(property.getName(), Compare.GREATER_THAN_EQUAL, minValue); + } + if (StringUtils.isNotEmpty(maxValue)) { + query.has(property.getName(), Compare.LESS_THAN_EQUAL, maxValue); + } + return query; + } + + public static BaseVertexQuery addEqualityQuery(BaseVertexQuery query, RelationshipProperty property, String value) { + if (StringUtils.isNotEmpty(value)) { + query.has(property.getName(), value); + } + return query; + } + + public static BaseVertexQuery addOrderLimitQuery(BaseVertexQuery query, String orderBy, int numResults) { + if (StringUtils.isNotEmpty(orderBy)) { + query.orderBy(orderBy, Order.DESC); + } + query.limit(numResults); + return query; + } } http://git-wip-us.apache.org/repos/asf/falcon/blob/054aa772/prism/src/main/java/org/apache/falcon/resource/AbstractEntityManager.java ---------------------------------------------------------------------- diff --git a/prism/src/main/java/org/apache/falcon/resource/AbstractEntityManager.java b/prism/src/main/java/org/apache/falcon/resource/AbstractEntityManager.java index fde0cd7..7d5945a 100644 --- a/prism/src/main/java/org/apache/falcon/resource/AbstractEntityManager.java +++ b/prism/src/main/java/org/apache/falcon/resource/AbstractEntityManager.java @@ -41,6 +41,7 @@ import org.apache.falcon.entity.v0.EntityType; import org.apache.falcon.entity.v0.cluster.Cluster; import org.apache.falcon.resource.APIResult.Status; import org.apache.falcon.resource.EntityList.EntityElement; +import org.apache.falcon.resource.metadata.AbstractMetadataResource; import org.apache.falcon.security.CurrentUser; import org.apache.falcon.security.SecurityUtil; import org.apache.falcon.util.DeploymentUtil; @@ -72,7 +73,7 @@ import java.util.Set; /** * A base class for managing Entity operations. */ -public abstract class AbstractEntityManager { +public abstract class AbstractEntityManager extends AbstractMetadataResource { private static final Logger LOG = LoggerFactory.getLogger(AbstractEntityManager.class); private static MemoryLocks memoryLocks = MemoryLocks.getInstance(); protected static final String DO_AS_PARAM = "doAs"; @@ -629,6 +630,15 @@ public abstract class AbstractEntityManager { String filterType, String filterTags, String filterBy, String orderBy, String sortOrder, Integer offset, Integer resultsPerPage, final String doAsUser) { + return getEntityList(fieldStr, nameSubsequence, tagKeywords, filterType, filterTags, filterBy, + orderBy, sortOrder, offset, resultsPerPage, doAsUser, false); + } + + + public EntityList getEntityList(String fieldStr, String nameSubsequence, String tagKeywords, + String filterType, String filterTags, String filterBy, + String orderBy, String sortOrder, Integer offset, + Integer resultsPerPage, final String doAsUser, boolean isReturnAll) { HashSet<String> fields = new HashSet<String>(Arrays.asList(fieldStr.toUpperCase().split(","))); Map<String, List<String>> filterByFieldsValues = getFilterByFieldsValues(filterBy); for (String key : filterByFieldsValues.keySet()) { @@ -642,7 +652,8 @@ public abstract class AbstractEntityManager { nameSubsequence, tagKeywords, filterType, filterTags, filterBy, doAsUser); // sort entities and pagination - List<Entity> entitiesReturn = sortEntitiesPagination(entities, orderBy, sortOrder, offset, resultsPerPage); + List<Entity> entitiesReturn = sortEntitiesPagination( + entities, orderBy, sortOrder, offset, resultsPerPage, isReturnAll); // add total number of results EntityList entityList = entitiesReturn.size() == 0 @@ -681,16 +692,22 @@ public abstract class AbstractEntityManager { entityType, nameSubsequence, tagKeywords, filterByFieldsValues, "", "", "", doAsUser)); } } + return entities; } protected List<Entity> sortEntitiesPagination(List<Entity> entities, String orderBy, String sortOrder, Integer offset, Integer resultsPerPage) { + return sortEntitiesPagination(entities, orderBy, sortOrder, offset, resultsPerPage, false); + } + + protected List<Entity> sortEntitiesPagination(List<Entity> entities, String orderBy, String sortOrder, + Integer offset, Integer resultsPerPage, boolean isReturnAll) { // sort entities entities = sortEntities(entities, orderBy, sortOrder); // pagination - int pageCount = getRequiredNumberOfResults(entities.size(), offset, resultsPerPage); + int pageCount = getRequiredNumberOfResults(entities.size(), offset, resultsPerPage, isReturnAll); List<Entity> entitiesReturn = new ArrayList<Entity>(); if (pageCount > 0) { entitiesReturn.addAll(entities.subList(offset, (offset + pageCount))); @@ -1031,13 +1048,17 @@ public abstract class AbstractEntityManager { } protected int getRequiredNumberOfResults(int arraySize, int offset, int numresults) { + return getRequiredNumberOfResults(arraySize, offset, numresults, false); + } + + protected int getRequiredNumberOfResults(int arraySize, int offset, int numresults, boolean isReturnAll) { /* Get a subset of elements based on offset and count. When returning subset of elements, elements[offset] is included. Size 10, offset 10, return empty list. Size 10, offset 5, count 3, return elements[5,6,7]. Size 10, offset 5, count >= 5, return elements[5,6,7,8,9] return elements starting from elements[offset] until the end OR offset+numResults*/ - if (numresults < 1) { + if (!isReturnAll && numresults < 1) { LOG.error("Value for param numResults should be > than 0 : {}", numresults); throw FalconWebException.newAPIException("Value for param numResults should be > than 0 : " + numresults); } @@ -1050,7 +1071,7 @@ public abstract class AbstractEntityManager { } int retLen = arraySize - offset; - if (retLen > numresults) { + if (!isReturnAll && retLen > numresults) { retLen = numresults; } return retLen; http://git-wip-us.apache.org/repos/asf/falcon/blob/054aa772/prism/src/main/java/org/apache/falcon/resource/AbstractInstanceManager.java ---------------------------------------------------------------------- diff --git a/prism/src/main/java/org/apache/falcon/resource/AbstractInstanceManager.java b/prism/src/main/java/org/apache/falcon/resource/AbstractInstanceManager.java index 1895ba5..ba183c8 100644 --- a/prism/src/main/java/org/apache/falcon/resource/AbstractInstanceManager.java +++ b/prism/src/main/java/org/apache/falcon/resource/AbstractInstanceManager.java @@ -37,6 +37,9 @@ import java.util.Set; import javax.servlet.ServletInputStream; import javax.servlet.http.HttpServletRequest; +import com.thinkaurelius.titan.core.TitanMultiVertexQuery; +import com.thinkaurelius.titan.core.TitanVertex; +import com.thinkaurelius.titan.graphdb.blueprints.TitanBlueprintsGraph; import org.apache.commons.lang3.StringUtils; import org.apache.falcon.FalconException; import org.apache.falcon.FalconWebException; @@ -59,6 +62,10 @@ import org.apache.falcon.entity.v0.feed.Feed; import org.apache.falcon.entity.v0.feed.LocationType; import org.apache.falcon.entity.v0.process.Process; import org.apache.falcon.logging.LogProvider; +import org.apache.falcon.metadata.GraphUtils; +import org.apache.falcon.metadata.RelationshipLabel; +import org.apache.falcon.metadata.RelationshipProperty; +import org.apache.falcon.metadata.RelationshipType; import org.apache.falcon.resource.InstancesResult.Instance; import org.apache.falcon.resource.InstancesSummaryResult.InstanceSummary; import org.apache.falcon.util.DeploymentUtil; @@ -676,6 +683,93 @@ public abstract class AbstractInstanceManager extends AbstractEntityManager { } } + public InstancesResult searchInstances(String type, String nameSubsequence, String tagKeywords, + String nominalStartTime, String nominalEndTime, + String status, String orderBy, Integer offset, Integer resultsPerPage) { + type = org.apache.commons.lang.StringUtils.isEmpty(type) ? "feed,process" : type; + resultsPerPage = resultsPerPage == null ? getDefaultResultsPerPage() : resultsPerPage; + + // filter entities + EntityList entityList = getEntityList( + "", nameSubsequence, tagKeywords, type, "", "", "", "", 0, 0, "", true); + + // search instances with TitanDB + TitanBlueprintsGraph titanGraph = (TitanBlueprintsGraph) getGraph(); + Map<TitanVertex, Iterable<TitanVertex>> instanceMap = titanInstances( + titanGraph, entityList, resultsPerPage + offset, nominalStartTime, nominalEndTime, status, orderBy); + + // integrate search results from each entity + List<Instance> instances = consolidateTitanInstances(instanceMap); + + // sort by descending order and pagination + List<Instance> instancesReturn = sortInstancesPagination(instances, orderBy, "desc", offset, resultsPerPage); + + // output format + InstancesResult result = new InstancesResult(APIResult.Status.SUCCEEDED, "Instances Search Results"); + result.setInstances(instancesReturn.toArray(new Instance[instancesReturn.size()])); + titanGraph.commit(); + return result; + } + + private Map<TitanVertex, Iterable<TitanVertex>> titanInstances(TitanBlueprintsGraph titanGraph, + EntityList entityList, int numTopInstances, + String nominalStartTime, String nominalEndTime, + String status, String orderBy) { + List<TitanVertex> entityVertices = new ArrayList<TitanVertex>(); + for (EntityList.EntityElement entityElement : entityList.getElements()) { + String entityName = entityElement.name; + String entityType = entityElement.type; + RelationshipType relationshipType = RelationshipType.fromSchedulableEntityType(entityType); + TitanVertex entityVertex = (TitanVertex) GraphUtils.findVertex(titanGraph, entityName, relationshipType); + if (entityVertex == null) { + LOG.warn("No entity vertex found for type " + entityType + ", name " + entityName); + } else { + entityVertices.add(entityVertex); + } + } + + if (entityVertices.isEmpty()) { // Need to add at least one vertex for TitanMultiVertexQuery + return new HashMap<>(); + } + + TitanMultiVertexQuery vertexQuery = titanGraph.multiQuery(entityVertices) + .labels(RelationshipLabel.INSTANCE_ENTITY_EDGE.getName()); + GraphUtils.addRangeQuery(vertexQuery, RelationshipProperty.NOMINAL_TIME, nominalStartTime, nominalEndTime); + GraphUtils.addEqualityQuery(vertexQuery, RelationshipProperty.STATUS, status); + GraphUtils.addOrderLimitQuery(vertexQuery, orderBy, numTopInstances); + return vertexQuery.vertices(); + } + + private List<Instance> consolidateTitanInstances(Map<TitanVertex, Iterable<TitanVertex>> instanceMap) { + List<Instance> instances = new ArrayList<>(); + for (Iterable<TitanVertex> vertices : instanceMap.values()) { + for (TitanVertex vertex : vertices) { + Instance instance = new Instance(); + instance.instance = vertex.getProperty(RelationshipProperty.NAME.getName()); + String instanceStatus = vertex.getProperty(RelationshipProperty.STATUS.getName()); + if (StringUtils.isNotEmpty(instanceStatus)) { + instance.status = InstancesResult.WorkflowStatus.valueOf(instanceStatus); + } + instances.add(instance); + } + } + return instances; + } + + protected List<Instance> sortInstancesPagination(List<Instance> instances, String orderBy, String sortOrder, + Integer offset, Integer resultsPerPage) { + // sort instances + instances = sortInstances(instances, orderBy, sortOrder); + + // pagination + int pageCount = super.getRequiredNumberOfResults(instances.size(), offset, resultsPerPage); + List<Instance> instancesReturn = new ArrayList<Instance>(); + if (pageCount > 0) { + instancesReturn.addAll(instances.subList(offset, (offset + pageCount))); + } + return instancesReturn; + } + private void checkName(String entityName) { if (StringUtils.isBlank(entityName)) { throw FalconWebException.newAPIException("Instance name is mandatory and shouldn't be blank"); http://git-wip-us.apache.org/repos/asf/falcon/blob/054aa772/prism/src/main/java/org/apache/falcon/resource/metadata/AbstractMetadataResource.java ---------------------------------------------------------------------- diff --git a/prism/src/main/java/org/apache/falcon/resource/metadata/AbstractMetadataResource.java b/prism/src/main/java/org/apache/falcon/resource/metadata/AbstractMetadataResource.java index 762becb..7e65ef8 100644 --- a/prism/src/main/java/org/apache/falcon/resource/metadata/AbstractMetadataResource.java +++ b/prism/src/main/java/org/apache/falcon/resource/metadata/AbstractMetadataResource.java @@ -64,7 +64,7 @@ public abstract class AbstractMetadataResource { private void checkIfMetadataMappingServiceIsEnabled() { if (service == null) { throw FalconWebException.newMetadataResourceException( - "Lineage " + MetadataMappingService.SERVICE_NAME + " is not enabled.", Response.Status.NOT_FOUND); + MetadataMappingService.SERVICE_NAME + " is not enabled.", Response.Status.NOT_FOUND); } } } http://git-wip-us.apache.org/repos/asf/falcon/blob/054aa772/prism/src/main/java/org/apache/falcon/resource/proxy/InstanceManagerProxy.java ---------------------------------------------------------------------- diff --git a/prism/src/main/java/org/apache/falcon/resource/proxy/InstanceManagerProxy.java b/prism/src/main/java/org/apache/falcon/resource/proxy/InstanceManagerProxy.java index 4951c4a..1023923 100644 --- a/prism/src/main/java/org/apache/falcon/resource/proxy/InstanceManagerProxy.java +++ b/prism/src/main/java/org/apache/falcon/resource/proxy/InstanceManagerProxy.java @@ -603,6 +603,24 @@ public class InstanceManagerProxy extends AbstractInstanceManager { }.execute(colo, entityType, entityName); } + @GET + @Path("search") + @Produces(MediaType.APPLICATION_JSON) + @Monitored(event = "instance-search") + @Override + public InstancesResult searchInstances( + @DefaultValue("") @QueryParam("type") String type, + @DefaultValue("") @QueryParam("nameseq") String nameSubsequence, + @DefaultValue("") @QueryParam("tagkeys") String tagKeywords, + @DefaultValue("") @QueryParam("start") String nominalStartTime, + @DefaultValue("") @QueryParam("end") String nominalEndTime, + @DefaultValue("") @QueryParam("instanceStatus") String status, + @DefaultValue("") @QueryParam("orderBy") String orderBy, + @DefaultValue("0") @QueryParam("offset") Integer offset, + @QueryParam("numResults") Integer resultsPerPage) { + return super.searchInstances(type, nameSubsequence, tagKeywords, nominalStartTime, nominalEndTime, + status, orderBy, offset, resultsPerPage); + } //RESUME CHECKSTYLE CHECK ParameterNumberCheck http://git-wip-us.apache.org/repos/asf/falcon/blob/054aa772/prism/src/test/java/org/apache/falcon/resource/InstanceManagerTest.java ---------------------------------------------------------------------- diff --git a/prism/src/test/java/org/apache/falcon/resource/InstanceManagerTest.java b/prism/src/test/java/org/apache/falcon/resource/InstanceManagerTest.java index 29762ab..237868d 100644 --- a/prism/src/test/java/org/apache/falcon/resource/InstanceManagerTest.java +++ b/prism/src/test/java/org/apache/falcon/resource/InstanceManagerTest.java @@ -19,15 +19,79 @@ package org.apache.falcon.resource; import org.apache.falcon.FalconWebException; +import org.apache.falcon.resource.metadata.MetadataTestContext; +import org.testng.Assert; +import org.testng.annotations.AfterClass; +import org.testng.annotations.BeforeClass; import org.testng.annotations.Test; /** - * + * Unit tests for org.apache.falcon.resource.AbstractInstanceManager. */ -public class InstanceManagerTest extends AbstractInstanceManager { +public class InstanceManagerTest { + + private static final String PROCESS_NAME_PREFIX = "instance-search-test-process"; + private static final String PROCESS_NAME_1 = "instance-search-test-process-1"; + private static final String PROCESS_NAME_2 = "instance-search-test-process-2"; + private static final String NOMINAL_TIME_1 = "2015-01-01-01-00"; + private static final String NOMINAL_TIME_2 = "2015-01-02-01-00"; + private static final String NOMINAL_TIME_3 = "2015-01-03-01-00"; + + private MetadataTestContext testContext; + + @BeforeClass + public void setUp() throws Exception { + testContext = new MetadataTestContext(); + testContext.setUp(); + } + + @AfterClass + public void tearDown() throws Exception { + testContext.tearDown(); + } + + @Test + public void testInstanceSearch() throws Exception { + // Note: all the following tests are based on entity name prefix PROCESS_NAME_PREFIX + testContext.addProcessEntity(PROCESS_NAME_1); + testContext.addInstance(PROCESS_NAME_1, NOMINAL_TIME_1, MetadataTestContext.SUCCEEDED_STATUS); + testContext.addInstance(PROCESS_NAME_1, NOMINAL_TIME_2, MetadataTestContext.SUCCEEDED_STATUS); + testContext.addInstance(PROCESS_NAME_1, NOMINAL_TIME_3, MetadataTestContext.RUNNING_STATUS); + testContext.addProcessEntity(PROCESS_NAME_2); + testContext.addInstance(PROCESS_NAME_2, NOMINAL_TIME_1, MetadataTestContext.FAILED_STATUS); + testContext.addInstance(PROCESS_NAME_2, NOMINAL_TIME_2, MetadataTestContext.RUNNING_STATUS); + + // list all instances + BaseInstanceManager instanceManager = new BaseInstanceManager(); + InstancesResult result; + result = instanceManager.searchInstances("PROCESS", PROCESS_NAME_PREFIX, "", "", "", "", "", 0, 10); + Assert.assertEquals(result.getInstances().length, 5); + + // running status + result = instanceManager.searchInstances("PROCESS", PROCESS_NAME_PREFIX, "", "", "", + MetadataTestContext.RUNNING_STATUS, "", 0, 10); + Assert.assertEquals(result.getInstances().length, 2); + + // succeeded status + result = instanceManager.searchInstances("PROCESS", PROCESS_NAME_PREFIX, "", "", "", + MetadataTestContext.SUCCEEDED_STATUS, "", 0, 10); + Assert.assertEquals(result.getInstances().length, 2); + + // failed status + result = instanceManager.searchInstances("PROCESS", PROCESS_NAME_PREFIX, "", "", "", + MetadataTestContext.FAILED_STATUS, "", 0, 10); + Assert.assertEquals(result.getInstances().length, 1); + + // nominal time filter + result = instanceManager.searchInstances("PROCESS", PROCESS_NAME_PREFIX, "", NOMINAL_TIME_2, "", "", "", 0, 10); + Assert.assertEquals(result.getInstances().length, 3); + } @Test(expectedExceptions = FalconWebException.class) public void test() { - super.triageInstance("process", "random", "2014-05-07T00:00Z", "default"); + BaseInstanceManager instanceManager = new BaseInstanceManager(); + instanceManager.triageInstance("process", "random", "2014-05-07T00:00Z", "default"); } + + private class BaseInstanceManager extends AbstractInstanceManager {} } http://git-wip-us.apache.org/repos/asf/falcon/blob/054aa772/prism/src/test/java/org/apache/falcon/resource/metadata/MetadataTestContext.java ---------------------------------------------------------------------- diff --git a/prism/src/test/java/org/apache/falcon/resource/metadata/MetadataTestContext.java b/prism/src/test/java/org/apache/falcon/resource/metadata/MetadataTestContext.java index a382d85..47d6ba1 100644 --- a/prism/src/test/java/org/apache/falcon/resource/metadata/MetadataTestContext.java +++ b/prism/src/test/java/org/apache/falcon/resource/metadata/MetadataTestContext.java @@ -55,8 +55,9 @@ import java.util.List; */ public class MetadataTestContext { public static final String FALCON_USER = "falcon-user"; - private static final String LOGS_DIR = "target/log"; - private static final String NOMINAL_TIME = "2014-01-01-01-00"; + public static final String SUCCEEDED_STATUS = "SUCCEEDED"; + public static final String FAILED_STATUS = "FAILED"; + public static final String RUNNING_STATUS = "RUNNING"; public static final String OPERATION = "GENERATE"; public static final String CLUSTER_ENTITY_NAME = "primary-cluster"; @@ -74,6 +75,9 @@ public class MetadataTestContext { public static final String OUTPUT_INSTANCE_PATHS = "jail://global:00/falcon/imp-click-join1/20140101,jail://global:00/falcon/imp-click-join2/20140101"; + private static final String LOGS_DIR = "target/log"; + private static final String NOMINAL_TIME = "2014-01-01-01-00"; + private ConfigurationStore configStore; private MetadataMappingService service; @@ -99,8 +103,8 @@ public class MetadataTestContext { addClusterEntity(); addFeedEntity(); - addProcessEntity(); - addInstance(); + addProcessEntity(PROCESS_ENTITY_NAME); + addInstance(PROCESS_ENTITY_NAME, NOMINAL_TIME, SUCCEEDED_STATUS); } public MetadataMappingService getService() { @@ -164,9 +168,9 @@ public class MetadataTestContext { } } - public void addProcessEntity() throws Exception { + public void addProcessEntity(String processEntityName) throws Exception { org.apache.falcon.entity.v0.process.Process processEntity = - EntityBuilderTestUtil.buildProcess(PROCESS_ENTITY_NAME, + EntityBuilderTestUtil.buildProcess(processEntityName, clusterEntity, "classified-as=Critical", "testPipeline"); EntityBuilderTestUtil.addProcessWorkflow(processEntity, WORKFLOW_NAME, WORKFLOW_VERSION); @@ -198,11 +202,25 @@ public class MetadataTestContext { configStore.publish(EntityType.PROCESS, processEntity); } - public void addInstance() throws Exception { + public void addInstance(String entityName, String nominalTime, String status) throws Exception { createJobCountersFileForTest(); - WorkflowExecutionContext context = WorkflowExecutionContext.create(getTestMessageArgs(), + WorkflowExecutionContext context = WorkflowExecutionContext.create( + getTestMessageArgs(entityName, nominalTime, status), WorkflowExecutionContext.Type.POST_PROCESSING); - service.onSuccess(context); + switch (status) { + case SUCCEEDED_STATUS: + service.onSuccess(context); + break; + case FAILED_STATUS: + service.onFailure(context); + break; + case RUNNING_STATUS: + service.onStart(context); + break; + default: + // Should not reach here + Assert.assertTrue(false, "Adding instance with an unsupported status in test context: " + status); + } } private void cleanupGraphStore(Graph graph) { @@ -240,12 +258,12 @@ public class MetadataTestContext { } } - private static String[] getTestMessageArgs() { + private static String[] getTestMessageArgs(String entityName, String nominalTime, String status) { return new String[]{ "-" + WorkflowExecutionArgs.CLUSTER_NAME.getName(), CLUSTER_ENTITY_NAME, "-" + WorkflowExecutionArgs.ENTITY_TYPE.getName(), ("process"), - "-" + WorkflowExecutionArgs.ENTITY_NAME.getName(), PROCESS_ENTITY_NAME, - "-" + WorkflowExecutionArgs.NOMINAL_TIME.getName(), NOMINAL_TIME, + "-" + WorkflowExecutionArgs.ENTITY_NAME.getName(), entityName, + "-" + WorkflowExecutionArgs.NOMINAL_TIME.getName(), nominalTime, "-" + WorkflowExecutionArgs.OPERATION.getName(), OPERATION, "-" + WorkflowExecutionArgs.INPUT_FEED_NAMES.getName(), INPUT_FEED_NAMES, "-" + WorkflowExecutionArgs.INPUT_FEED_PATHS.getName(), INPUT_INSTANCE_PATHS, @@ -254,8 +272,8 @@ public class MetadataTestContext { "-" + WorkflowExecutionArgs.WORKFLOW_ID.getName(), "workflow-01-00", "-" + WorkflowExecutionArgs.WORKFLOW_USER.getName(), FALCON_USER, "-" + WorkflowExecutionArgs.RUN_ID.getName(), "1", - "-" + WorkflowExecutionArgs.STATUS.getName(), "SUCCEEDED", - "-" + WorkflowExecutionArgs.TIMESTAMP.getName(), NOMINAL_TIME, + "-" + WorkflowExecutionArgs.STATUS.getName(), status, + "-" + WorkflowExecutionArgs.TIMESTAMP.getName(), nominalTime, "-" + WorkflowExecutionArgs.WF_ENGINE_URL.getName(), "http://localhost:11000/oozie", "-" + WorkflowExecutionArgs.USER_SUBFLOW_ID.getName(), "userflow@wf-id", "-" + WorkflowExecutionArgs.USER_WORKFLOW_NAME.getName(), WORKFLOW_NAME, http://git-wip-us.apache.org/repos/asf/falcon/blob/054aa772/webapp/src/main/java/org/apache/falcon/resource/InstanceManager.java ---------------------------------------------------------------------- diff --git a/webapp/src/main/java/org/apache/falcon/resource/InstanceManager.java b/webapp/src/main/java/org/apache/falcon/resource/InstanceManager.java index 7108597..5b47dc9 100644 --- a/webapp/src/main/java/org/apache/falcon/resource/InstanceManager.java +++ b/webapp/src/main/java/org/apache/falcon/resource/InstanceManager.java @@ -306,6 +306,26 @@ public class InstanceManager extends AbstractInstanceManager { throw FalconWebException.newAPIException(throwable); } } + + @GET + @Path("search") + @Produces(MediaType.APPLICATION_JSON) + @Monitored(event = "instance-search") + @Override + public InstancesResult searchInstances( + @DefaultValue("") @QueryParam("type") String type, + @DefaultValue("") @QueryParam("nameseq") String nameSubsequence, + @DefaultValue("") @QueryParam("tagkeys") String tagKeywords, + @DefaultValue("") @QueryParam("start") String nominalStartTime, + @DefaultValue("") @QueryParam("end") String nominalEndTime, + @DefaultValue("") @QueryParam("instanceStatus") String status, + @DefaultValue("") @QueryParam("orderBy") String orderBy, + @DefaultValue("0") @QueryParam("offset") Integer offset, + @QueryParam("numResults") Integer resultsPerPage) { + return super.searchInstances(type, nameSubsequence, tagKeywords, nominalStartTime, nominalEndTime, + status, orderBy, offset, resultsPerPage); + } + //RESUME CHECKSTYLE CHECK ParameterNumberCheck
