Repository: falcon Updated Branches: refs/heads/master 42f175a12 -> 9fd86b786
http://git-wip-us.apache.org/repos/asf/falcon/blob/9fd86b78/prism/src/main/java/org/apache/falcon/resource/AbstractEntityManager.java ---------------------------------------------------------------------- diff --git a/prism/src/main/java/org/apache/falcon/resource/AbstractEntityManager.java b/prism/src/main/java/org/apache/falcon/resource/AbstractEntityManager.java index 678235e..a721666 100644 --- a/prism/src/main/java/org/apache/falcon/resource/AbstractEntityManager.java +++ b/prism/src/main/java/org/apache/falcon/resource/AbstractEntityManager.java @@ -553,9 +553,7 @@ public abstract class AbstractEntityManager { try { Entity entityObj = EntityUtil.getEntity(type, entityName); - Set<Entity> dependents = EntityGraph.get().getDependents(entityObj); - Entity[] dependentEntities = dependents.toArray(new Entity[dependents.size()]); - return new EntityList(dependentEntities, entityObj); + return EntityUtil.getEntityDependencies(entityObj); } catch (Exception e) { LOG.error("Unable to get dependencies for entityName {} ({})", entityName, type, e); throw FalconWebException.newException(e, Response.Status.BAD_REQUEST); http://git-wip-us.apache.org/repos/asf/falcon/blob/9fd86b78/prism/src/main/java/org/apache/falcon/resource/AbstractInstanceManager.java ---------------------------------------------------------------------- diff --git a/prism/src/main/java/org/apache/falcon/resource/AbstractInstanceManager.java b/prism/src/main/java/org/apache/falcon/resource/AbstractInstanceManager.java index 42907c8..72f9fe4 100644 --- a/prism/src/main/java/org/apache/falcon/resource/AbstractInstanceManager.java +++ b/prism/src/main/java/org/apache/falcon/resource/AbstractInstanceManager.java @@ -19,16 +19,24 @@ package org.apache.falcon.resource; import org.apache.commons.lang3.StringUtils; -import org.apache.falcon.*; +import org.apache.falcon.FalconException; +import org.apache.falcon.FalconWebException; +import org.apache.falcon.LifeCycle; +import org.apache.falcon.Pair; import org.apache.falcon.entity.EntityUtil; import org.apache.falcon.entity.FeedHelper; +import org.apache.falcon.entity.ProcessHelper; import org.apache.falcon.entity.parser.ValidationException; import org.apache.falcon.entity.v0.Entity; import org.apache.falcon.entity.v0.EntityType; import org.apache.falcon.entity.v0.Frequency; import org.apache.falcon.entity.v0.SchemaHelper; +import org.apache.falcon.entity.v0.cluster.Cluster; +import org.apache.falcon.entity.v0.feed.Feed; +import org.apache.falcon.entity.v0.process.Process; import org.apache.falcon.logging.LogProvider; import org.apache.falcon.resource.InstancesResult.Instance; +import org.apache.falcon.util.DeploymentUtil; import org.apache.falcon.workflow.engine.AbstractWorkflowEngine; import org.slf4j.Logger; import org.slf4j.LoggerFactory; @@ -37,7 +45,17 @@ import javax.servlet.ServletInputStream; import javax.servlet.http.HttpServletRequest; import javax.ws.rs.core.Response; import java.io.IOException; -import java.util.*; +import java.util.ArrayList; +import java.util.Arrays; +import java.util.Calendar; +import java.util.Collections; +import java.util.Comparator; +import java.util.Date; +import java.util.HashSet; +import java.util.List; +import java.util.Map; +import java.util.Properties; +import java.util.Set; /** * A base class for managing Entity's Instance operations. @@ -160,6 +178,63 @@ public abstract class AbstractInstanceManager extends AbstractEntityManager { } } + + public InstanceDependencyResult getInstanceDependencies(String entityType, String entityName, + String instanceTimeString, String colo) { + checkColo(colo); + EntityType type = checkType(entityType); + Set<SchedulableEntityInstance> result = new HashSet<>(); + + try { + Date instanceTime = EntityUtil.parseDateUTC(instanceTimeString); + for (String clusterName : DeploymentUtil.getCurrentClusters()) { + Cluster cluster = EntityUtil.getEntity(EntityType.CLUSTER, clusterName); + switch (type) { + + case PROCESS: + Process process = EntityUtil.getEntity(EntityType.PROCESS, entityName); + org.apache.falcon.entity.v0.process.Cluster pCluster = ProcessHelper.getCluster(process, + clusterName); + if (pCluster != null) { + Set<SchedulableEntityInstance> inputFeeds = ProcessHelper.getInputFeedInstances(process, + instanceTime, cluster, true); + Set<SchedulableEntityInstance> outputFeeds = ProcessHelper.getOutputFeedInstances(process, + instanceTime, cluster); + result.addAll(inputFeeds); + result.addAll(outputFeeds); + } + break; + + case FEED: + Feed feed = EntityUtil.getEntity(EntityType.FEED, entityName); + org.apache.falcon.entity.v0.feed.Cluster fCluster = FeedHelper.getCluster(feed, clusterName); + if (fCluster != null) { + Set<SchedulableEntityInstance> consumers = FeedHelper.getConsumerInstances(feed, instanceTime, + cluster); + SchedulableEntityInstance producer = FeedHelper.getProducerInstance(feed, instanceTime, + cluster); + result.addAll(consumers); + if (producer != null) { + result.add(producer); + } + } + break; + + default: + throw FalconWebException.newInstanceException("Instance dependency isn't supported for type: " + + entityType, Response.Status.BAD_REQUEST); + } + } + + } catch (Throwable throwable) { + throw FalconWebException.newInstanceException(throwable, Response.Status.BAD_REQUEST); + } + + InstanceDependencyResult res = new InstanceDependencyResult(APIResult.Status.SUCCEEDED, "Success!"); + res.setDependencies(result.toArray(new SchedulableEntityInstance[0])); + return res; + } + public InstancesSummaryResult getSummary(String type, String entity, String startStr, String endStr, String colo, List<LifeCycle> lifeCycles) { checkColo(colo); http://git-wip-us.apache.org/repos/asf/falcon/blob/9fd86b78/prism/src/main/java/org/apache/falcon/resource/proxy/InstanceManagerProxy.java ---------------------------------------------------------------------- diff --git a/prism/src/main/java/org/apache/falcon/resource/proxy/InstanceManagerProxy.java b/prism/src/main/java/org/apache/falcon/resource/proxy/InstanceManagerProxy.java index 68219fb..1a8396c 100644 --- a/prism/src/main/java/org/apache/falcon/resource/proxy/InstanceManagerProxy.java +++ b/prism/src/main/java/org/apache/falcon/resource/proxy/InstanceManagerProxy.java @@ -27,18 +27,28 @@ import org.apache.falcon.monitors.Monitored; import org.apache.falcon.resource.APIResult; import org.apache.falcon.resource.AbstractInstanceManager; import org.apache.falcon.resource.FeedInstanceResult; +import org.apache.falcon.resource.InstanceDependencyResult; import org.apache.falcon.resource.InstancesResult; import org.apache.falcon.resource.InstancesSummaryResult; import org.apache.falcon.resource.channel.Channel; import org.apache.falcon.resource.channel.ChannelFactory; import javax.servlet.http.HttpServletRequest; -import javax.ws.rs.*; +import javax.ws.rs.DefaultValue; +import javax.ws.rs.GET; +import javax.ws.rs.POST; +import javax.ws.rs.Path; +import javax.ws.rs.PathParam; +import javax.ws.rs.Produces; +import javax.ws.rs.QueryParam; import javax.ws.rs.core.Context; import javax.ws.rs.core.MediaType; import javax.ws.rs.core.Response; import java.lang.reflect.Constructor; -import java.util.*; +import java.util.HashMap; +import java.util.List; +import java.util.Map; +import java.util.Set; /** * A proxy implementation of the entity instance operations. @@ -338,6 +348,29 @@ public class InstanceManagerProxy extends AbstractInstanceManager { } }.execute(colo, type, entity); } + + + @GET + @Path("dependencies/{type}/{entity}") + @Produces(MediaType.APPLICATION_JSON) + @Monitored(event = "instance-dependency") + public InstanceDependencyResult instanceDependencies( + @Dimension("type") @PathParam("type") final String entityType, + @Dimension("entityName") @PathParam("entity") final String entityName, + @Dimension("instanceTime") @QueryParam("instanceTime") final String instanceTimeStr, + @Dimension("colo") @QueryParam("colo") String colo) { + + return new InstanceProxy<InstanceDependencyResult>(InstanceDependencyResult.class) { + + @Override + protected InstanceDependencyResult doExecute(String colo) throws FalconException { + return getInstanceManager(colo).invoke("instanceDependencies", + entityType, entityName, instanceTimeStr, colo); + } + + }.execute(colo, entityType, entityName); + } + //RESUME CHECKSTYLE CHECK ParameterNumberCheck private abstract class InstanceProxy<T extends APIResult> { http://git-wip-us.apache.org/repos/asf/falcon/blob/9fd86b78/webapp/src/main/java/org/apache/falcon/resource/InstanceManager.java ---------------------------------------------------------------------- diff --git a/webapp/src/main/java/org/apache/falcon/resource/InstanceManager.java b/webapp/src/main/java/org/apache/falcon/resource/InstanceManager.java index dc533a2..c2ac5b2 100644 --- a/webapp/src/main/java/org/apache/falcon/resource/InstanceManager.java +++ b/webapp/src/main/java/org/apache/falcon/resource/InstanceManager.java @@ -23,7 +23,13 @@ import org.apache.falcon.monitors.Dimension; import org.apache.falcon.monitors.Monitored; import javax.servlet.http.HttpServletRequest; -import javax.ws.rs.*; +import javax.ws.rs.DefaultValue; +import javax.ws.rs.GET; +import javax.ws.rs.POST; +import javax.ws.rs.Path; +import javax.ws.rs.PathParam; +import javax.ws.rs.Produces; +import javax.ws.rs.QueryParam; import javax.ws.rs.core.Context; import javax.ws.rs.core.MediaType; import java.util.List; @@ -230,4 +236,16 @@ public class InstanceManager extends AbstractInstanceManager { } //RESUME CHECKSTYLE CHECK ParameterNumberCheck + + @GET + @Path("dependencies/{type}/{entity}") + @Produces(MediaType.APPLICATION_JSON) + @Monitored(event = "instance-dependency") + public InstanceDependencyResult instanceDependencies( + @Dimension("type") @PathParam("type") String entityType, + @Dimension("entityName") @PathParam("entity") String entityName, + @Dimension("instanceTime") @QueryParam("instanceTime") String instanceTimeStr, + @Dimension("colo") @QueryParam("colo") String colo) { + return super.getInstanceDependencies(entityType, entityName, instanceTimeStr, colo); + } } http://git-wip-us.apache.org/repos/asf/falcon/blob/9fd86b78/webapp/src/main/java/org/apache/falcon/resource/SchedulableEntityManager.java ---------------------------------------------------------------------- diff --git a/webapp/src/main/java/org/apache/falcon/resource/SchedulableEntityManager.java b/webapp/src/main/java/org/apache/falcon/resource/SchedulableEntityManager.java index fa04add..ed6f44e 100644 --- a/webapp/src/main/java/org/apache/falcon/resource/SchedulableEntityManager.java +++ b/webapp/src/main/java/org/apache/falcon/resource/SchedulableEntityManager.java @@ -23,7 +23,14 @@ import org.apache.falcon.monitors.Dimension; import org.apache.falcon.monitors.Monitored; import javax.servlet.http.HttpServletRequest; -import javax.ws.rs.*; +import javax.ws.rs.Consumes; +import javax.ws.rs.DefaultValue; +import javax.ws.rs.GET; +import javax.ws.rs.POST; +import javax.ws.rs.Path; +import javax.ws.rs.PathParam; +import javax.ws.rs.Produces; +import javax.ws.rs.QueryParam; import javax.ws.rs.core.Context; import javax.ws.rs.core.MediaType; http://git-wip-us.apache.org/repos/asf/falcon/blob/9fd86b78/webapp/src/test/java/org/apache/falcon/cli/FalconCLIIT.java ---------------------------------------------------------------------- diff --git a/webapp/src/test/java/org/apache/falcon/cli/FalconCLIIT.java b/webapp/src/test/java/org/apache/falcon/cli/FalconCLIIT.java index 797b595..90acb59 100644 --- a/webapp/src/test/java/org/apache/falcon/cli/FalconCLIIT.java +++ b/webapp/src/test/java/org/apache/falcon/cli/FalconCLIIT.java @@ -287,6 +287,10 @@ public class FalconCLIIT { Assert.assertEquals(executeWithURL("entity -schedule -type feed -name " + overlay.get("outputFeedName")), 0); OozieTestUtils.waitForProcessWFtoStart(context); + //Test the dependency command + Assert.assertEquals(executeWithURL("instance -dependency -type feed -name " + overlay.get("inputFeedName") + + " -instanceTime 2010-01-01T00:00Z"), 0); + Assert.assertEquals(executeWithURL("instance -status -type feed -name " + overlay.get("outputFeedName") + " -start " + START_INSTANCE), 0);
