This is an automated email from the ASF dual-hosted git repository. suvasude pushed a commit to branch master in repository https://gitbox.apache.org/repos/asf/incubator-gobblin.git
The following commit(s) were added to refs/heads/master by this push: new ae7dad0 [GOBBLIN-1075] Add option to return latest failed flows ae7dad0 is described below commit ae7dad01599a255da05f4a44fe1a0093c75446fa Author: Jack Moseley <jmose...@linkedin.com> AuthorDate: Mon Mar 16 16:32:29 2020 -0700 [GOBBLIN-1075] Add option to return latest failed flows Closes #2915 from jack-moseley/failed-flows --- ...he.gobblin.service.flowexecutions.restspec.json | 4 ++ ...he.gobblin.service.flowexecutions.snapshot.json | 4 ++ .../gobblin/service/FlowExecutionClient.java | 11 +++-- .../gobblin/service/FlowExecutionResource.java | 10 ++--- .../apache/gobblin/service/FlowStatusResource.java | 2 +- .../service/monitoring/FlowStatusGenerator.java | 49 ++++++++++++++++++++++ 6 files changed, 71 insertions(+), 9 deletions(-) diff --git a/gobblin-restli/gobblin-flow-config-service/gobblin-flow-config-service-api/src/main/idl/org.apache.gobblin.service.flowexecutions.restspec.json b/gobblin-restli/gobblin-flow-config-service/gobblin-flow-config-service-api/src/main/idl/org.apache.gobblin.service.flowexecutions.restspec.json index 210e2b0..2111064 100644 --- a/gobblin-restli/gobblin-flow-config-service/gobblin-flow-config-service-api/src/main/idl/org.apache.gobblin.service.flowexecutions.restspec.json +++ b/gobblin-restli/gobblin-flow-config-service/gobblin-flow-config-service-api/src/main/idl/org.apache.gobblin.service.flowexecutions.restspec.json @@ -31,6 +31,10 @@ "name" : "tag", "type" : "string", "optional" : true + }, { + "name" : "executionStatus", + "type" : "string", + "optional" : true } ] } ], "entity" : { diff --git a/gobblin-restli/gobblin-flow-config-service/gobblin-flow-config-service-api/src/main/snapshot/org.apache.gobblin.service.flowexecutions.snapshot.json b/gobblin-restli/gobblin-flow-config-service/gobblin-flow-config-service-api/src/main/snapshot/org.apache.gobblin.service.flowexecutions.snapshot.json index 895d706..ed12a21 100644 --- a/gobblin-restli/gobblin-flow-config-service/gobblin-flow-config-service-api/src/main/snapshot/org.apache.gobblin.service.flowexecutions.snapshot.json +++ b/gobblin-restli/gobblin-flow-config-service/gobblin-flow-config-service-api/src/main/snapshot/org.apache.gobblin.service.flowexecutions.snapshot.json @@ -238,6 +238,10 @@ "name" : "tag", "type" : "string", "optional" : true + }, { + "name" : "executionStatus", + "type" : "string", + "optional" : true } ] } ], "entity" : { diff --git a/gobblin-restli/gobblin-flow-config-service/gobblin-flow-config-service-client/src/main/java/org/apache/gobblin/service/FlowExecutionClient.java b/gobblin-restli/gobblin-flow-config-service/gobblin-flow-config-service-client/src/main/java/org/apache/gobblin/service/FlowExecutionClient.java index 5cbd0f0..c78e26a 100644 --- a/gobblin-restli/gobblin-flow-config-service/gobblin-flow-config-service-client/src/main/java/org/apache/gobblin/service/FlowExecutionClient.java +++ b/gobblin-restli/gobblin-flow-config-service/gobblin-flow-config-service-client/src/main/java/org/apache/gobblin/service/FlowExecutionClient.java @@ -128,20 +128,25 @@ public class FlowExecutionClient implements Closeable { } } + public List<FlowExecution> getLatestFlowExecution(FlowId flowId, Integer count, String tag) throws RemoteInvocationException { + return getLatestFlowExecution(flowId, count, tag, null); + } + /** * Get the latest k flow executions * @param flowId identifier of flow execution to get * @return a list of {@link FlowExecution}es corresponding to the latest <code>count</code> executions, containing only - * jobStatuses that match the given tag. + * jobStatuses that match the given tag. If <code>executionStatus</code> is not null, only flows with that status are + * returned. * @throws RemoteInvocationException */ - public List<FlowExecution> getLatestFlowExecution(FlowId flowId, Integer count, String tag) + public List<FlowExecution> getLatestFlowExecution(FlowId flowId, Integer count, String tag, String executionStatus) throws RemoteInvocationException { LOG.debug("getFlowExecution with groupName " + flowId.getFlowGroup() + " flowName " + flowId.getFlowName() + " count " + Integer.toString(count)); FindRequest<FlowExecution> findRequest = _flowexecutionsRequestBuilders.findByLatestFlowExecution().flowIdParam(flowId). - addReqParam("count", count, Integer.class).addParam("tag", tag, String.class).build(); + addReqParam("count", count, Integer.class).addParam("tag", tag, String.class).addParam("executionStatus", executionStatus, String.class).build(); Response<CollectionResponse<FlowExecution>> response = _restClient.get().sendRequest(findRequest).getResponse(); diff --git a/gobblin-restli/gobblin-flow-config-service/gobblin-flow-config-service-server/src/main/java/org/apache/gobblin/service/FlowExecutionResource.java b/gobblin-restli/gobblin-flow-config-service/gobblin-flow-config-service-server/src/main/java/org/apache/gobblin/service/FlowExecutionResource.java index 43c926b..fcdf6a1 100644 --- a/gobblin-restli/gobblin-flow-config-service/gobblin-flow-config-service-server/src/main/java/org/apache/gobblin/service/FlowExecutionResource.java +++ b/gobblin-restli/gobblin-flow-config-service/gobblin-flow-config-service-server/src/main/java/org/apache/gobblin/service/FlowExecutionResource.java @@ -70,9 +70,9 @@ public class FlowExecutionResource extends ComplexKeyResourceTemplate<FlowStatus } @Finder("latestFlowExecution") - public List<FlowExecution> getLatestFlowExecution(@Context PagingContext context, - @QueryParam("flowId") FlowId flowId, @Optional @QueryParam("count") Integer count, @Optional @QueryParam("tag") String tag) { - List<org.apache.gobblin.service.monitoring.FlowStatus> flowStatuses = getLatestFlowStatusesFromGenerator(flowId, count, tag, this._flowStatusGenerator); + public List<FlowExecution> getLatestFlowExecution(@Context PagingContext context, @QueryParam("flowId") FlowId flowId, + @Optional @QueryParam("count") Integer count, @Optional @QueryParam("tag") String tag, @Optional @QueryParam("executionStatus") String executionStatus) { + List<org.apache.gobblin.service.monitoring.FlowStatus> flowStatuses = getLatestFlowStatusesFromGenerator(flowId, count, tag, executionStatus, this._flowStatusGenerator); if (flowStatuses != null) { return flowStatuses.stream().map(FlowExecutionResource::convertFlowStatus).collect(Collectors.toList()); @@ -108,13 +108,13 @@ public class FlowExecutionResource extends ComplexKeyResourceTemplate<FlowStatus } public static List<org.apache.gobblin.service.monitoring.FlowStatus> getLatestFlowStatusesFromGenerator(FlowId flowId, - Integer count, String tag, FlowStatusGenerator flowStatusGenerator) { + Integer count, String tag, String executionStatus, FlowStatusGenerator flowStatusGenerator) { if (count == null) { count = 1; } LOG.info("get latest called with flowGroup " + flowId.getFlowGroup() + " flowName " + flowId.getFlowName() + " count " + count); - return flowStatusGenerator.getLatestFlowStatus(flowId.getFlowName(), flowId.getFlowGroup(), count, tag); + return flowStatusGenerator.getLatestFlowStatus(flowId.getFlowName(), flowId.getFlowGroup(), count, tag, executionStatus); } /** diff --git a/gobblin-restli/gobblin-flow-config-service/gobblin-flow-config-service-server/src/main/java/org/apache/gobblin/service/FlowStatusResource.java b/gobblin-restli/gobblin-flow-config-service/gobblin-flow-config-service-server/src/main/java/org/apache/gobblin/service/FlowStatusResource.java index 6047b24..4c8c623 100644 --- a/gobblin-restli/gobblin-flow-config-service/gobblin-flow-config-service-server/src/main/java/org/apache/gobblin/service/FlowStatusResource.java +++ b/gobblin-restli/gobblin-flow-config-service/gobblin-flow-config-service-server/src/main/java/org/apache/gobblin/service/FlowStatusResource.java @@ -64,7 +64,7 @@ public class FlowStatusResource extends ComplexKeyResourceTemplate<FlowStatusId, @Finder("latestFlowStatus") public List<FlowStatus> getLatestFlowStatus(@Context PagingContext context, @QueryParam("flowId") FlowId flowId, @Optional @QueryParam("count") Integer count, @Optional @QueryParam("tag") String tag) { - List<org.apache.gobblin.service.monitoring.FlowStatus> flowStatuses = FlowExecutionResource.getLatestFlowStatusesFromGenerator(flowId, count, tag, this._flowStatusGenerator); + List<org.apache.gobblin.service.monitoring.FlowStatus> flowStatuses = FlowExecutionResource.getLatestFlowStatusesFromGenerator(flowId, count, tag, null, this._flowStatusGenerator); if (flowStatuses != null) { return flowStatuses.stream().map(this::convertFlowStatus).collect(Collectors.toList()); diff --git a/gobblin-runtime/src/main/java/org/apache/gobblin/service/monitoring/FlowStatusGenerator.java b/gobblin-runtime/src/main/java/org/apache/gobblin/service/monitoring/FlowStatusGenerator.java index a7a451d..785c351 100644 --- a/gobblin-runtime/src/main/java/org/apache/gobblin/service/monitoring/FlowStatusGenerator.java +++ b/gobblin-runtime/src/main/java/org/apache/gobblin/service/monitoring/FlowStatusGenerator.java @@ -17,6 +17,7 @@ package org.apache.gobblin.service.monitoring; +import java.util.ArrayList; import java.util.Iterator; import java.util.List; import java.util.stream.Collectors; @@ -37,6 +38,7 @@ import org.apache.gobblin.annotation.Alpha; @Builder public class FlowStatusGenerator { public static final List<String> FINISHED_STATUSES = Lists.newArrayList("FAILED", "COMPLETE", "CANCELLED"); + public static final int MAX_LOOKBACK = 100; private final JobStatusRetriever jobStatusRetriever; private final EventBus eventBus; @@ -64,6 +66,53 @@ public class FlowStatusGenerator { } /** + * Get the flow statuses of last <code>count</code> (or fewer) executions + * @param flowName + * @param flowGroup + * @param count + * @param tag + * @param executionStatus + * @return the latest <code>count</code>{@link FlowStatus}es. null is returned if there is no flow execution found. + * If tag is not null, the job status list only contains jobs matching the tag. + * If executionStatus is not null, the latest <code>count</code> flow statuses with that status are returned (as long + * as they are within the last {@link #MAX_LOOKBACK} executions for this flow). + */ + public List<FlowStatus> getLatestFlowStatus(String flowName, String flowGroup, int count, String tag, String executionStatus) { + if (executionStatus == null) { + return getLatestFlowStatus(flowName, flowGroup, count, tag); + } else { + List<FlowStatus> flowStatuses = getLatestFlowStatus(flowName, flowGroup, MAX_LOOKBACK, tag); + if (flowStatuses == null) { + return null; + } + List<FlowStatus> matchingFlowStatuses = new ArrayList<>(); + + for (FlowStatus flowStatus : flowStatuses) { + if (matchingFlowStatuses.size() == count) { + return matchingFlowStatuses; + } + + String executionStatusFromFlow = getExecutionStatus(flowStatus); + if (executionStatusFromFlow.equals(executionStatus)) { + matchingFlowStatuses.add(getFlowStatus(flowName, flowGroup, flowStatus.getFlowExecutionId(), tag)); + } + } + + return matchingFlowStatuses; + } + } + + /** + * Return the executionStatus of the given {@link FlowStatus}. Note that the {@link FlowStatus#jobStatusIterator} + * will have it's cursor moved forward by this. + */ + private String getExecutionStatus(FlowStatus flowStatus) { + List<JobStatus> jobStatuses = Lists.newArrayList(flowStatus.getJobStatusIterator()); + jobStatuses = jobStatuses.stream().filter(JobStatusRetriever::isFlowStatus).collect(Collectors.toList()); + return jobStatuses.isEmpty() ? "" : jobStatuses.get(0).getEventName(); + } + + /** * Get the flow status for a specific execution. * @param flowName * @param flowGroup