Repository: falcon Updated Branches: refs/heads/master a61349d6e -> 98e12502a
FALCON-796 Enable users to triage data processing issues through falcon. Contributed by Ajay Yadava Project: http://git-wip-us.apache.org/repos/asf/falcon/repo Commit: http://git-wip-us.apache.org/repos/asf/falcon/commit/98e12502 Tree: http://git-wip-us.apache.org/repos/asf/falcon/tree/98e12502 Diff: http://git-wip-us.apache.org/repos/asf/falcon/diff/98e12502 Branch: refs/heads/master Commit: 98e12502a81e3e20b872431d1fdde06ba77c4880 Parents: a61349d Author: Ajay Yadava <[email protected]> Authored: Mon Jun 29 17:05:42 2015 +0530 Committer: Ajay Yadava <[email protected]> Committed: Mon Jun 29 17:05:42 2015 +0530 ---------------------------------------------------------------------- CHANGES.txt | 4 +- .../java/org/apache/falcon/ResponseHelper.java | 19 +- .../java/org/apache/falcon/cli/FalconCLI.java | 15 +- .../org/apache/falcon/client/FalconClient.java | 15 ++ .../apache/falcon/resource/TriageResult.java | 87 ++++++++++ docs/src/site/twiki/FalconCLI.twiki | 8 + docs/src/site/twiki/restapi/ResourceList.twiki | 1 + docs/src/site/twiki/restapi/Triage.twiki | 44 +++++ .../resource/AbstractInstanceManager.java | 174 +++++++++++++++++++ .../resource/proxy/InstanceManagerProxy.java | 20 +++ .../apache/falcon/resource/InstanceManager.java | 13 ++ 11 files changed, 394 insertions(+), 6 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/falcon/blob/98e12502/CHANGES.txt ---------------------------------------------------------------------- diff --git a/CHANGES.txt b/CHANGES.txt index 87589c1..14658f1 100755 --- a/CHANGES.txt +++ b/CHANGES.txt @@ -4,8 +4,10 @@ Trunk (Unreleased) INCOMPATIBLE CHANGES NEW FEATURES - FALCON-1039 Add instance dependency API in falcon(Ajay Yadava) + FALCON-1039 Add instance dependency API in falcon (Ajay Yadava) + FALCON-796 Enable users to triage data processing issues through falcon (Ajay Yadava) + IMPROVEMENTS FALCON-1293 Update CHANGES.txt to change 0.6.1 branch to release (Shaik Idris Ali via Ajay Yadava) http://git-wip-us.apache.org/repos/asf/falcon/blob/98e12502/client/src/main/java/org/apache/falcon/ResponseHelper.java ---------------------------------------------------------------------- diff --git a/client/src/main/java/org/apache/falcon/ResponseHelper.java b/client/src/main/java/org/apache/falcon/ResponseHelper.java index 78598ba..ec6604d 100644 --- a/client/src/main/java/org/apache/falcon/ResponseHelper.java +++ b/client/src/main/java/org/apache/falcon/ResponseHelper.java @@ -18,16 +18,18 @@ package org.apache.falcon; -import java.util.Date; -import java.util.Map; import org.apache.commons.lang3.StringUtils; import org.apache.falcon.entity.v0.SchemaHelper; +import org.apache.falcon.resource.EntitySummaryResult; import org.apache.falcon.resource.FeedInstanceResult; import org.apache.falcon.resource.FeedLookupResult; import org.apache.falcon.resource.InstanceDependencyResult; import org.apache.falcon.resource.InstancesResult; import org.apache.falcon.resource.InstancesSummaryResult; -import org.apache.falcon.resource.EntitySummaryResult; +import org.apache.falcon.resource.TriageResult; + +import java.util.Date; +import java.util.Map; /** * Helpers for response object to string conversion. @@ -266,6 +268,17 @@ public final class ResponseHelper { return sb.toString(); } + public static String getString(TriageResult triageResult) { + StringBuilder sb = new StringBuilder(); + + sb.append(triageResult.toString()); + sb.append("\nAdditional Information:\n"); + sb.append("Response: ").append(triageResult.getMessage()); + sb.append("Request Id: ").append(triageResult.getRequestId()); + + return sb.toString(); + } + public static String getString(FeedLookupResult feedLookupResult) { StringBuilder sb = new StringBuilder(); String results = feedLookupResult.toString(); http://git-wip-us.apache.org/repos/asf/falcon/blob/98e12502/client/src/main/java/org/apache/falcon/cli/FalconCLI.java ---------------------------------------------------------------------- diff --git a/client/src/main/java/org/apache/falcon/cli/FalconCLI.java b/client/src/main/java/org/apache/falcon/cli/FalconCLI.java index f169917..cc041c0 100644 --- a/client/src/main/java/org/apache/falcon/cli/FalconCLI.java +++ b/client/src/main/java/org/apache/falcon/cli/FalconCLI.java @@ -118,6 +118,7 @@ public class FalconCLI { public static final String LIFECYCLE_OPT = "lifecycle"; public static final String PARARMS_OPT = "params"; public static final String LISTING_OPT = "listing"; + public static final String TRIAGE_OPT = "triage"; // Recipe Command public static final String RECIPE_CMD = "recipe"; @@ -252,8 +253,14 @@ public class FalconCLI { validateSortOrder(sortOrder); validateInstanceCommands(optionsList, entity, type, colo); - - if (optionsList.contains(DEPENDENCY_OPT)) { + if (optionsList.contains(TRIAGE_OPT)) { + validateNotEmpty(colo, COLO_OPT); + validateNotEmpty(start, START_OPT); + validateNotEmpty(type, ENTITY_TYPE_OPT); + validateEntityTypeForSummary(type); + validateNotEmpty(entity, ENTITY_NAME_OPT); + result = client.triage(type, entity, start, colo).toString(); + } else if (optionsList.contains(DEPENDENCY_OPT)) { validateNotEmpty(instanceTime, INSTANCE_TIME_OPT); InstanceDependencyResult response = client.getInstanceDependencies(type, entity, instanceTime, colo); result = ResponseHelper.getString(response); @@ -798,6 +805,9 @@ public class FalconCLI { false, "Displays dependent instances for a specified instance."); + Option triage = new Option(TRIAGE_OPT, false, + "Triage a feed or process instance and find the failures in it's lineage."); + OptionGroup group = new OptionGroup(); group.addOption(running); group.addOption(list); @@ -812,6 +822,7 @@ public class FalconCLI { group.addOption(params); group.addOption(listing); group.addOption(dependency); + group.addOption(triage); Option url = new Option(URL_OPTION, true, "Falcon URL"); Option start = new Option(START_OPT, true, http://git-wip-us.apache.org/repos/asf/falcon/blob/98e12502/client/src/main/java/org/apache/falcon/client/FalconClient.java ---------------------------------------------------------------------- diff --git a/client/src/main/java/org/apache/falcon/client/FalconClient.java b/client/src/main/java/org/apache/falcon/client/FalconClient.java index 20c32e4..5df8626 100644 --- a/client/src/main/java/org/apache/falcon/client/FalconClient.java +++ b/client/src/main/java/org/apache/falcon/client/FalconClient.java @@ -41,6 +41,7 @@ import org.apache.falcon.resource.InstanceDependencyResult; import org.apache.falcon.resource.InstancesResult; import org.apache.falcon.resource.InstancesSummaryResult; import org.apache.falcon.resource.LineageGraphResult; +import org.apache.falcon.resource.TriageResult; import org.apache.hadoop.security.authentication.client.AuthenticatedURL; import org.apache.hadoop.security.authentication.client.KerberosAuthenticator; import org.apache.hadoop.security.authentication.client.PseudoAuthenticator; @@ -244,6 +245,7 @@ public class FalconClient { SUMMARY("api/instance/summary/", HttpMethod.GET, MediaType.APPLICATION_JSON), PARAMS("api/instance/params/", HttpMethod.GET, MediaType.APPLICATION_JSON), DEPENDENCY("api/instance/dependencies/", HttpMethod.GET, MediaType.APPLICATION_JSON), + TRIAGE("api/instance/triage/", HttpMethod.GET, MediaType.APPLICATION_JSON), LISTING("api/instance/listing/", HttpMethod.GET, MediaType.APPLICATION_JSON); private String path; @@ -370,6 +372,19 @@ public class FalconClient { //SUSPEND CHECKSTYLE CHECK ParameterNumberCheck + public TriageResult triage(String entityType, String entityName, String instanceTime, String colo) + throws FalconCLIException { + ClientResponse clientResponse = service + .path(Instances.TRIAGE.path).path(entityType).path(entityName) + .queryParam("start", instanceTime).queryParam("colo", colo) + .header("Cookie", AUTH_COOKIE_EQ + authenticationToken) + .accept(Instances.TRIAGE.mimeType).type(MediaType.TEXT_XML) + .method(Instances.TRIAGE.method, ClientResponse.class); + + checkIfSuccessful(clientResponse); + return clientResponse.getEntity(TriageResult.class); + } + public EntityList getEntityList(String entityType, String fields, String nameSubsequence, String tagKeywords, String filterBy, String filterTags, String orderBy, String sortOrder, Integer offset, Integer numResults) throws FalconCLIException { http://git-wip-us.apache.org/repos/asf/falcon/blob/98e12502/client/src/main/java/org/apache/falcon/resource/TriageResult.java ---------------------------------------------------------------------- diff --git a/client/src/main/java/org/apache/falcon/resource/TriageResult.java b/client/src/main/java/org/apache/falcon/resource/TriageResult.java new file mode 100644 index 0000000..a5ffe74 --- /dev/null +++ b/client/src/main/java/org/apache/falcon/resource/TriageResult.java @@ -0,0 +1,87 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.falcon.resource; + +import javax.xml.bind.annotation.XmlAccessType; +import javax.xml.bind.annotation.XmlAccessorType; +import javax.xml.bind.annotation.XmlElement; +import javax.xml.bind.annotation.XmlRootElement; + +/** + * Resut for instance triage. + */ +@XmlRootElement(name = "result") +@XmlAccessorType(XmlAccessType.FIELD) [email protected]({"EI_EXPOSE_REP", "EI_EXPOSE_REP2"}) +public class TriageResult extends APIResult { + + @XmlElement(name = "triageGraphs") + private LineageGraphResult[] triageGraphs; + + //For JAXB + private TriageResult() { + super(); + } + + public TriageResult(Status status, String message) { + super(status, message); + } + + + + public LineageGraphResult[] getTriageGraphs() { + return triageGraphs; + } + + public void setTriageGraphs(LineageGraphResult[] triageGraphs) { + this.triageGraphs = triageGraphs; + } + + + @Override + public Object[] getCollection() { + return getTriageGraphs(); + } + + + @Override + public void setCollection(Object[] items) { + if (items == null) { + setTriageGraphs(new LineageGraphResult[0]); + } else { + LineageGraphResult[] graphs = new LineageGraphResult[items.length]; + for (int index = 0; index < items.length; index++) { + graphs[index] = (LineageGraphResult)items[index]; + } + setTriageGraphs(graphs); + } + } + + @Override + public String toString() { + StringBuilder buffer = new StringBuilder(); + if (triageGraphs != null) { + for (LineageGraphResult graph : triageGraphs) { + buffer.append(graph.getDotNotation()); + buffer.append("\n\n"); + } + } + return buffer.toString(); + } +} http://git-wip-us.apache.org/repos/asf/falcon/blob/98e12502/docs/src/site/twiki/FalconCLI.twiki ---------------------------------------------------------------------- diff --git a/docs/src/site/twiki/FalconCLI.twiki b/docs/src/site/twiki/FalconCLI.twiki index 233e4a6..aeab8f8 100644 --- a/docs/src/site/twiki/FalconCLI.twiki +++ b/docs/src/site/twiki/FalconCLI.twiki @@ -272,6 +272,14 @@ This can be used with instance management options. Default values are replicatio Usage: $FALCON_HOME/bin/falcon instance -type <<feed/process>> -name <<name>> -status -lifecycle <<lifecycletype>> -start "yyyy-MM-dd'T'HH:mm'Z'" -end "yyyy-MM-dd'T'HH:mm'Z'" +---+++Triage + +Given a feed/process instance this command traces it's ancestors to find what all ancestors have failed. It's useful if +lot of instances are failing in a pipeline as it then finds out the root cause of the pipeline being stuck. + +Usage: +$FALCON_HOME/bin/falcon instance -triage -type <<feed/process>> -name <<name>> -start "yyyy-MM-dd'T'HH:mm'Z'" + ---+++Params Displays the workflow params of a given instance. Where start time is considered as nominal time of that instance and end time won't be considered. http://git-wip-us.apache.org/repos/asf/falcon/blob/98e12502/docs/src/site/twiki/restapi/ResourceList.twiki ---------------------------------------------------------------------- diff --git a/docs/src/site/twiki/restapi/ResourceList.twiki b/docs/src/site/twiki/restapi/ResourceList.twiki index 49dddb7..0094c39 100644 --- a/docs/src/site/twiki/restapi/ResourceList.twiki +++ b/docs/src/site/twiki/restapi/ResourceList.twiki @@ -66,6 +66,7 @@ See also: [[../Security.twiki][Security in Falcon]] | POST | [[InstanceResume][api/instance/resume/:entity-type/:entity-name]] | Resume a given instance | | POST | [[InstanceRerun][api/instance/rerun/:entity-type/:entity-name]] | Rerun a given instance | | GET | [[InstanceLogs][api/instance/logs/:entity-type/:entity-name]] | Get logs of a given instance | +| GET | [[Triage][api/instance/triage/:entity-type/:entity-name]] | Triage an instance to see it's stuck lineage | | GET | [[InstanceSummary][api/instance/summary/:entity-type/:entity-name]] | Return summary of instances for an entity | | GET | [[InstanceDependency][api/instance/dependency/:entity-type/:entity-name]] | Return dependent instances for a given instance | http://git-wip-us.apache.org/repos/asf/falcon/blob/98e12502/docs/src/site/twiki/restapi/Triage.twiki ---------------------------------------------------------------------- diff --git a/docs/src/site/twiki/restapi/Triage.twiki b/docs/src/site/twiki/restapi/Triage.twiki new file mode 100644 index 0000000..5646902 --- /dev/null +++ b/docs/src/site/twiki/restapi/Triage.twiki @@ -0,0 +1,44 @@ +---++ GET api/instance/triage/:entity-type/:entity-name + * <a href="#Description">Description</a> + * <a href="#Parameters">Parameters</a> + * <a href="#Results">Results</a> + * <a href="#Examples">Examples</a> + +---++ Description +Given a feed/process instance this command traces it's ancestors to find what all ancestors have failed. It's useful if +lot of instances are failing in a pipeline as it then finds out the root cause of the pipeline being stuck. + + +---++ Parameters + * :entity-type type of entity(feed/process). + * :entity-name name of the feed/process. + * :start instance time of the entity instance. + * :colo <optional param> name of the colo on which you want to triage + +---++ Results +It returns a json graph + +---++ Examples +---+++ Rest Call +<verbatim> +GET http://localhost:15000/api/instance/feed/my-feed?start=2015-03-02T00:00Z&colo=local +</verbatim> +---+++ Result +<verbatim> +{ + "vertices": ["(FEED) my-feed (2015-03-02T00:00Z) [Unavailable]", "(PROCESS) producer-process (2015-03-01T10:00Z) [TIMEDOUT]", "(FEED) input-feed-for-producer (2015-03-01T00:00Z) [Available]"], + "edges": + [ + { + "from" : "(PROCESS) producer-process (2015-03-01T10:00Z) [TIMEDOUT]", + "to" : "(FEED) my-feed (2015-03-02T00:00Z) [Unavailable]", + "label" : "produces" + }, + { + "from" : "(FEED) input-feed-for-producer (2015-03-01T00:00Z) [Available]", + "to" : "(PROCESS) producer-process (2015-03-01T10:00Z) [TIMEDOUT]", + "label" : "consumed by" + } + ] +} +</verbatim> http://git-wip-us.apache.org/repos/asf/falcon/blob/98e12502/prism/src/main/java/org/apache/falcon/resource/AbstractInstanceManager.java ---------------------------------------------------------------------- diff --git a/prism/src/main/java/org/apache/falcon/resource/AbstractInstanceManager.java b/prism/src/main/java/org/apache/falcon/resource/AbstractInstanceManager.java index 1e813d2..13e0c82 100644 --- a/prism/src/main/java/org/apache/falcon/resource/AbstractInstanceManager.java +++ b/prism/src/main/java/org/apache/falcon/resource/AbstractInstanceManager.java @@ -23,16 +23,21 @@ import org.apache.falcon.FalconException; import org.apache.falcon.FalconWebException; import org.apache.falcon.LifeCycle; import org.apache.falcon.Pair; +import org.apache.falcon.entity.EntityNotRegisteredException; import org.apache.falcon.entity.EntityUtil; import org.apache.falcon.entity.FeedHelper; +import org.apache.falcon.entity.FeedInstanceStatus; import org.apache.falcon.entity.ProcessHelper; +import org.apache.falcon.entity.Storage; import org.apache.falcon.entity.parser.ValidationException; +import org.apache.falcon.entity.store.ConfigurationStore; import org.apache.falcon.entity.v0.Entity; import org.apache.falcon.entity.v0.EntityType; import org.apache.falcon.entity.v0.Frequency; import org.apache.falcon.entity.v0.SchemaHelper; import org.apache.falcon.entity.v0.cluster.Cluster; import org.apache.falcon.entity.v0.feed.Feed; +import org.apache.falcon.entity.v0.feed.LocationType; import org.apache.falcon.entity.v0.process.Process; import org.apache.falcon.logging.LogProvider; import org.apache.falcon.resource.InstancesResult.Instance; @@ -51,10 +56,13 @@ import java.util.Calendar; import java.util.Collections; import java.util.Comparator; import java.util.Date; +import java.util.HashMap; import java.util.HashSet; +import java.util.LinkedList; import java.util.List; import java.util.Map; import java.util.Properties; +import java.util.Queue; import java.util.Set; /** @@ -523,6 +531,172 @@ public abstract class AbstractInstanceManager extends AbstractEntityManager { } //SUSPEND CHECKSTYLE CHECK ParameterNumberCheck + /** + * Triage method returns the graph of the ancestors in "UNSUCCESSFUL" state. + * + * It will traverse all the ancestor feed instances and process instances in the current instance's lineage. + * It stops traversing a lineage line once it encounters a "SUCCESSFUL" instance as this feature is intended + * to find the root cause of a pipeline failure. + * + * @param entityType type of the entity. Only feed and process are valid entity types for triage. + * @param entityName name of the entity. + * @param instanceTime time of the instance which should be used to triage. + * @return Returns a list of ancestor entity instances which have failed. + */ + public TriageResult triageInstance(String entityType, String entityName, String instanceTime, String colo) { + + checkColo(colo); + checkType(entityType); // should be only process/feed + checkName(entityName); + try { + EntityType type = EntityType.valueOf(entityType.toUpperCase()); + Entity entity = ConfigurationStore.get().get(type, entityName); + TriageResult result = new TriageResult(APIResult.Status.SUCCEEDED, "Success"); + List<LineageGraphResult> triageGraphs = new LinkedList<>(); + for (String clusterName : DeploymentUtil.getCurrentClusters()) { + Cluster cluster = EntityUtil.getEntity(EntityType.CLUSTER, clusterName); + triageGraphs.add(triage(type, entity, instanceTime, cluster)); + } + LineageGraphResult[] triageGraphsArray = new LineageGraphResult[triageGraphs.size()]; + result.setTriageGraphs(triageGraphs.toArray(triageGraphsArray)); + return result; + } catch (IllegalArgumentException e) { // bad entityType + LOG.error("Bad Entity Type: {}", entityType); + throw FalconWebException.newInstanceException(e, Response.Status.BAD_REQUEST); + } catch (EntityNotRegisteredException e) { // bad entityName + LOG.error("Bad Entity Name : {}", entityName); + throw FalconWebException.newInstanceException(e, Response.Status.BAD_REQUEST); + } catch (Throwable e) { + LOG.error("Failed to triage", e); + throw FalconWebException.newInstanceException(e, Response.Status.INTERNAL_SERVER_ERROR); + } + } + + private void checkName(String entityName) { + if (StringUtils.isBlank(entityName)) { + throw FalconWebException.newInstanceException("Instance name is mandatory and shouldn't be blank", + Response.Status.BAD_REQUEST); + } + } + + private LineageGraphResult triage(EntityType entityType, Entity entity, String instanceTime, Cluster cluster) + throws FalconException { + + Date instanceDate = SchemaHelper.parseDateUTC(instanceTime); + LineageGraphResult result = new LineageGraphResult(); + Set<String> vertices = new HashSet<>(); + Set<LineageGraphResult.Edge> edges = new HashSet<>(); + Map<String, String> instanceStatusMap = new HashMap<>(); + + // queue containing all instances which need to be triaged + Queue<SchedulableEntityInstance> remainingInstances = new LinkedList<>(); + SchedulableEntityInstance currentInstance = new SchedulableEntityInstance(entity.getName(), cluster.getName(), + instanceDate, entityType); + remainingInstances.add(currentInstance); + + while (!remainingInstances.isEmpty()) { + currentInstance = remainingInstances.remove(); + if (currentInstance.getEntityType() == EntityType.FEED) { + Feed feed = ConfigurationStore.get().get(EntityType.FEED, currentInstance.getEntityName()); + FeedInstanceStatus.AvailabilityStatus status = getFeedInstanceStatus(feed, + currentInstance.getInstanceTime(), cluster); + + // add vertex to the graph + vertices.add(currentInstance.toString()); + instanceStatusMap.put(currentInstance.toString(), "[" + status.name() + "]"); + if (status == FeedInstanceStatus.AvailabilityStatus.AVAILABLE) { + continue; + } + + // find producer process instance and add it to the queue + SchedulableEntityInstance producerInstance = FeedHelper.getProducerInstance(feed, + currentInstance.getInstanceTime(), cluster); + if (producerInstance != null) { + remainingInstances.add(producerInstance); + + //add edge from producerProcessInstance to the feedInstance + LineageGraphResult.Edge edge = new LineageGraphResult.Edge(producerInstance.toString(), + currentInstance.toString(), "produces"); + edges.add(edge); + } + } else { // entity type is PROCESS + Process process = ConfigurationStore.get().get(EntityType.PROCESS, currentInstance.getEntityName()); + InstancesResult.WorkflowStatus status = getProcessInstanceStatus(process, + currentInstance.getInstanceTime()); + + // add current process instance as a vertex + vertices.add(currentInstance.toString()); + if (status == null) { + instanceStatusMap.put(currentInstance.toString(), "[ Not Available ]"); + } else { + instanceStatusMap.put(currentInstance.toString(), "[" + status.name() + "]"); + if (status == InstancesResult.WorkflowStatus.SUCCEEDED) { + continue; + } + } + + // find list of input feed instances - only mandatory ones and not optional ones + Set<SchedulableEntityInstance> inputFeedInstances = ProcessHelper.getInputFeedInstances(process, + currentInstance.getInstanceTime(), cluster, false); + for (SchedulableEntityInstance inputFeedInstance : inputFeedInstances) { + remainingInstances.add(inputFeedInstance); + + //Add edge from inputFeedInstance to consumer processInstance + LineageGraphResult.Edge edge = new LineageGraphResult.Edge(inputFeedInstance.toString(), + currentInstance.toString(), "consumed by"); + edges.add(edge); + } + } + } + + // append status to each vertex + Set<String> relabeledVertices = new HashSet<>(); + for (String instance : vertices) { + String status = instanceStatusMap.get(instance); + relabeledVertices.add(instance + status); + } + + // append status to each edge + for (LineageGraphResult.Edge edge : edges) { + String oldTo = edge.getTo(); + String oldFrom = edge.getFrom(); + + String newFrom = oldFrom + instanceStatusMap.get(oldFrom); + String newTo = oldTo + instanceStatusMap.get(oldTo); + + edge.setFrom(newFrom); + edge.setTo(newTo); + } + + result.setEdges(edges.toArray(new LineageGraphResult.Edge[0])); + result.setVertices(relabeledVertices.toArray(new String[0])); + return result; + } + + private FeedInstanceStatus.AvailabilityStatus getFeedInstanceStatus(Feed feed, Date instanceTime, Cluster cluster) + throws FalconException { + Storage storage = FeedHelper.createStorage(cluster, feed); + Date endRange = new Date(instanceTime.getTime() + 200); + List<FeedInstanceStatus> feedListing = storage.getListing(feed, cluster.getName(), LocationType.DATA, + instanceTime, endRange); + return feedListing.get(0).getStatus(); + } + + private InstancesResult.WorkflowStatus getProcessInstanceStatus(Process process, Date instanceTime) + throws FalconException { + AbstractWorkflowEngine wfEngine = getWorkflowEngine(); + List<LifeCycle> lifeCycles = new ArrayList<LifeCycle>(); + lifeCycles.add(LifeCycle.valueOf(LifeCycle.EXECUTION.name())); + Date endRange = new Date(instanceTime.getTime() + 200); + Instance[] response = wfEngine.getStatus(process, instanceTime, endRange, lifeCycles).getInstances(); + if (response.length > 0) { + return response[0].getStatus(); + } + LOG.warn("No instances were found for the given process: {} & instanceTime: {}", process, instanceTime); + return null; + } + + public InstancesResult reRunInstance(String type, String entity, String startStr, String endStr, HttpServletRequest request, String colo, List<LifeCycle> lifeCycles, Boolean isForced) { http://git-wip-us.apache.org/repos/asf/falcon/blob/98e12502/prism/src/main/java/org/apache/falcon/resource/proxy/InstanceManagerProxy.java ---------------------------------------------------------------------- diff --git a/prism/src/main/java/org/apache/falcon/resource/proxy/InstanceManagerProxy.java b/prism/src/main/java/org/apache/falcon/resource/proxy/InstanceManagerProxy.java index 757fda8..0d59c22 100644 --- a/prism/src/main/java/org/apache/falcon/resource/proxy/InstanceManagerProxy.java +++ b/prism/src/main/java/org/apache/falcon/resource/proxy/InstanceManagerProxy.java @@ -30,6 +30,7 @@ import org.apache.falcon.resource.FeedInstanceResult; import org.apache.falcon.resource.InstanceDependencyResult; import org.apache.falcon.resource.InstancesResult; import org.apache.falcon.resource.InstancesSummaryResult; +import org.apache.falcon.resource.TriageResult; import org.apache.falcon.resource.channel.Channel; import org.apache.falcon.resource.channel.ChannelFactory; import org.slf4j.Logger; @@ -375,6 +376,25 @@ public class InstanceManagerProxy extends AbstractInstanceManager { }.execute(colo, entityType, entityName); } + @GET + @Path("triage/{type}/{name}") + @Produces(MediaType.APPLICATION_JSON) + @Monitored(event = "triage-instance") + @Override + public TriageResult triageInstance( + @Dimension("type") @PathParam("type") final String entityType, + @Dimension("name") @PathParam("name") final String entityName, + @Dimension("instanceTime") @QueryParam("start") final String instanceTime, + @Dimension("colo") @QueryParam("colo") String colo) { + return new InstanceProxy<TriageResult>(TriageResult.class) { + @Override + protected TriageResult doExecute(String colo) throws FalconException { + return getInstanceManager(colo).invoke("triageInstance", entityType, entityName, instanceTime, colo); + } + }.execute(colo, entityType, entityName); + } + + //RESUME CHECKSTYLE CHECK ParameterNumberCheck private abstract class InstanceProxy<T extends APIResult> { http://git-wip-us.apache.org/repos/asf/falcon/blob/98e12502/webapp/src/main/java/org/apache/falcon/resource/InstanceManager.java ---------------------------------------------------------------------- diff --git a/webapp/src/main/java/org/apache/falcon/resource/InstanceManager.java b/webapp/src/main/java/org/apache/falcon/resource/InstanceManager.java index c2ac5b2..7249ba4 100644 --- a/webapp/src/main/java/org/apache/falcon/resource/InstanceManager.java +++ b/webapp/src/main/java/org/apache/falcon/resource/InstanceManager.java @@ -218,6 +218,19 @@ public class InstanceManager extends AbstractInstanceManager { return super.resumeInstance(request, type, entity, startStr, endStr, colo, lifeCycles); } + @GET + @Path("triage/{type}/{name}") + @Produces(MediaType.APPLICATION_JSON) + @Monitored(event = "triage-instance") + @Override + public TriageResult triageInstance( + @Dimension("type") @PathParam("type") String entityType, + @Dimension("name") @PathParam("name") String entityName, + @Dimension("instanceTime") @QueryParam("start") String instanceTime, + @Dimension("colo") @QueryParam("colo") String colo) { + return super.triageInstance(entityType, entityName, instanceTime, colo); + } + @POST @Path("rerun/{type}/{entity}") @Produces(MediaType.APPLICATION_JSON)
