This is an automated email from the ASF dual-hosted git repository.
aplex pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/gobblin.git
The following commit(s) were added to refs/heads/master by this push:
new 4c3496a [GOBBLIN-1457] Lazy-load issues to improve performance (#3404)
4c3496a is described below
commit 4c3496a5cbadccffdbf2e0993110f5cfbd0d0844
Author: Alex Prokofiev <[email protected]>
AuthorDate: Mon Sep 27 13:59:16 2021 -0700
[GOBBLIN-1457] Lazy-load issues to improve performance (#3404)
Previously, we loaded issues from the db on batch execution requests.
This can result in long response times when large number of flows
are queried.
Now the users can specify if they want to retrieve them, or omit
and speed up the query.
---
...he.gobblin.service.flowexecutions.restspec.json | 16 +++++++--
...he.gobblin.service.flowexecutions.snapshot.json | 14 ++++++--
.../org/apache/gobblin/service/FlowStatusTest.java | 40 ++++++++++++++--------
.../gobblin/service/FlowExecutionResource.java | 20 ++++++++---
.../service/FlowExecutionResourceHandler.java | 6 ++--
.../service/FlowExecutionResourceLocalHandler.java | 31 ++++++++++++-----
.../apache/gobblin/service/FlowStatusResource.java | 2 +-
.../gobblin/service/monitoring/JobStatus.java | 4 ++-
.../service/monitoring/JobStatusRetriever.java | 24 ++++++++-----
...GobblinServiceFlowExecutionResourceHandler.java | 10 +++---
10 files changed, 117 insertions(+), 50 deletions(-)
diff --git
a/gobblin-restli/gobblin-flow-config-service/gobblin-flow-config-service-api/src/main/idl/org.apache.gobblin.service.flowexecutions.restspec.json
b/gobblin-restli/gobblin-flow-config-service/gobblin-flow-config-service-api/src/main/idl/org.apache.gobblin.service.flowexecutions.restspec.json
index c9cc039..3038c58 100644
---
a/gobblin-restli/gobblin-flow-config-service/gobblin-flow-config-service-api/src/main/idl/org.apache.gobblin.service.flowexecutions.restspec.json
+++
b/gobblin-restli/gobblin-flow-config-service/gobblin-flow-config-service-api/src/main/idl/org.apache.gobblin.service.flowexecutions.restspec.json
@@ -20,9 +20,9 @@
} ],
"finders" : [ {
"name" : "latestFlowExecution",
+ "doc" : "Retrieve the most recent matching FlowExecution(s) of the
identified FlowId",
"parameters" : [ {
"name" : "flowId",
- "doc" : "Retrieve the most recent matching FlowExecution(s) of the
identified FlowId",
"type" : "org.apache.gobblin.service.FlowId"
}, {
"name" : "count",
@@ -36,6 +36,11 @@
"name" : "executionStatus",
"type" : "string",
"optional" : true
+ }, {
+ "name" : "includeIssues",
+ "type" : "boolean",
+ "default" : "false",
+ "doc" : "include job issues in the response. Otherwise empty array of
issues will be returned."
} ]
}, {
"name" : "latestFlowGroupExecutions",
@@ -45,13 +50,18 @@
"type" : "string"
}, {
"name" : "countPerFlow",
- "doc" : "(maximum) number of FlowExecutions for each flow in
flowGroup",
"type" : "int",
- "optional" : true
+ "optional" : true,
+ "doc" : "(maximum) number of FlowExecutions for each flow in flowGroup
*"
}, {
"name" : "tag",
"type" : "string",
"optional" : true
+ }, {
+ "name" : "includeIssues",
+ "type" : "boolean",
+ "default" : "false",
+ "doc" : "include job issues in the response. Otherwise empty array of
issues will be returned."
} ]
} ],
"entity" : {
diff --git
a/gobblin-restli/gobblin-flow-config-service/gobblin-flow-config-service-api/src/main/snapshot/org.apache.gobblin.service.flowexecutions.snapshot.json
b/gobblin-restli/gobblin-flow-config-service/gobblin-flow-config-service-api/src/main/snapshot/org.apache.gobblin.service.flowexecutions.snapshot.json
index 767ebaa..b2cdddc 100644
---
a/gobblin-restli/gobblin-flow-config-service/gobblin-flow-config-service-api/src/main/snapshot/org.apache.gobblin.service.flowexecutions.snapshot.json
+++
b/gobblin-restli/gobblin-flow-config-service/gobblin-flow-config-service-api/src/main/snapshot/org.apache.gobblin.service.flowexecutions.snapshot.json
@@ -298,6 +298,11 @@
"name" : "executionStatus",
"type" : "string",
"optional" : true
+ }, {
+ "name" : "includeIssues",
+ "type" : "boolean",
+ "default" : "false",
+ "doc" : "include job issues in the response. Otherwise empty array
of issues will be returned."
} ]
}, {
"name" : "latestFlowGroupExecutions",
@@ -308,12 +313,17 @@
}, {
"name" : "countPerFlow",
"type" : "int",
- "doc" : "(maximum) number of FlowExecutions for each flow in
flowGroup",
- "optional" : true
+ "optional" : true,
+ "doc" : "(maximum) number of FlowExecutions for each flow in
flowGroup *"
}, {
"name" : "tag",
"type" : "string",
"optional" : true
+ }, {
+ "name" : "includeIssues",
+ "type" : "boolean",
+ "default" : "false",
+ "doc" : "include job issues in the response. Otherwise empty array
of issues will be returned."
} ]
} ],
"entity" : {
diff --git
a/gobblin-restli/gobblin-flow-config-service/gobblin-flow-config-service-client/src/test/java/org/apache/gobblin/service/FlowStatusTest.java
b/gobblin-restli/gobblin-flow-config-service/gobblin-flow-config-service-client/src/test/java/org/apache/gobblin/service/FlowStatusTest.java
index 1bf8743..a1b0309 100644
---
a/gobblin-restli/gobblin-flow-config-service/gobblin-flow-config-service-client/src/test/java/org/apache/gobblin/service/FlowStatusTest.java
+++
b/gobblin-restli/gobblin-flow-config-service/gobblin-flow-config-service-client/src/test/java/org/apache/gobblin/service/FlowStatusTest.java
@@ -28,6 +28,7 @@ import org.testng.annotations.AfterClass;
import org.testng.annotations.BeforeClass;
import org.testng.annotations.Test;
+import com.google.common.base.Suppliers;
import com.google.common.collect.Iterators;
import com.google.common.collect.Lists;
import com.google.inject.Binder;
@@ -127,29 +128,32 @@ public class FlowStatusTest {
org.apache.gobblin.service.monitoring.JobStatus.builder().flowGroup("fgroup1").flowName("flow1")
.jobGroup("jgroup1").jobName("job1").startTime(1000L).endTime(5000L)
.eventName(ExecutionStatus.COMPLETE.name()).flowExecutionId(0).message("Test
message 1").processedCount(100)
-
.jobExecutionId(1).lowWatermark("watermark:1").highWatermark("watermark:2").issues(Collections.emptyList())
- .build();
+
.jobExecutionId(1).lowWatermark("watermark:1").highWatermark("watermark:2")
+ .issues(Suppliers.ofInstance(Collections.emptyList())).build();
org.apache.gobblin.service.monitoring.JobStatus fs1 =
org.apache.gobblin.service.monitoring.JobStatus.builder().flowGroup("fgroup1").flowName("flow1")
.jobGroup(JobStatusRetriever.NA_KEY).jobName(JobStatusRetriever.NA_KEY).endTime(5000L)
-
.eventName(ExecutionStatus.COMPLETE.name()).flowExecutionId(0).issues(Collections.emptyList()).build();
+ .eventName(ExecutionStatus.COMPLETE.name()).flowExecutionId(0)
+ .issues(Suppliers.ofInstance(Collections.emptyList())).build();
org.apache.gobblin.service.monitoring.JobStatus js2 =
org.apache.gobblin.service.monitoring.JobStatus.builder().flowGroup("fgroup1").flowName("flow1")
.jobGroup("jgroup1").jobName("job1").jobTag("dataset1").startTime(2000L).endTime(6000L)
.eventName(ExecutionStatus.COMPLETE.name()).flowExecutionId(1).message("Test
message 2").processedCount(200)
-
.jobExecutionId(2).lowWatermark("watermark:2").highWatermark("watermark:3").issues(Collections.emptyList())
+
.jobExecutionId(2).lowWatermark("watermark:2").highWatermark("watermark:3")
+ .issues(Suppliers.ofInstance(Collections.emptyList()))
.build();
org.apache.gobblin.service.monitoring.JobStatus js3 =
org.apache.gobblin.service.monitoring.JobStatus.builder().flowGroup("fgroup1").flowName("flow1")
.jobGroup("jgroup1").jobName("job2").jobTag("dataset2").startTime(2000L).endTime(6000L)
.eventName(ExecutionStatus.COMPLETE.name()).flowExecutionId(1).message("Test
message 3").processedCount(200)
-
.jobExecutionId(2).lowWatermark("watermark:2").highWatermark("watermark:3").issues(Collections.emptyList())
+
.jobExecutionId(2).lowWatermark("watermark:2").highWatermark("watermark:3")
+ .issues(Suppliers.ofInstance(Collections.emptyList()))
.build();
org.apache.gobblin.service.monitoring.JobStatus fs2 =
org.apache.gobblin.service.monitoring.JobStatus.builder().flowGroup("fgroup1").flowName("flow1")
.jobGroup(JobStatusRetriever.NA_KEY).jobName(JobStatusRetriever.NA_KEY).endTime(7000L)
.eventName(ExecutionStatus.COMPLETE.name()).flowExecutionId(1).message("Flow
message")
- .issues(Collections.emptyList()).build();
+ .issues(Suppliers.ofInstance(Collections.emptyList())).build();
List<org.apache.gobblin.service.monitoring.JobStatus> jobStatusList1 =
Lists.newArrayList(js1, fs1);
List<org.apache.gobblin.service.monitoring.JobStatus> jobStatusList2 =
Lists.newArrayList(js2, js3, fs2);
_listOfJobStatusLists = Lists.newArrayList();
@@ -198,19 +202,21 @@ public class FlowStatusTest {
org.apache.gobblin.service.monitoring.JobStatus.builder().flowGroup("fgroup1").flowName("flow1")
.jobGroup("jgroup1").jobName("job1").startTime(1000L).endTime(5000L)
.eventName(ExecutionStatus.COMPLETE.name()).flowExecutionId(0).message("Test
message 1").processedCount(100)
-
.jobExecutionId(1).lowWatermark("watermark:1").highWatermark("watermark:2").issues(Collections.emptyList())
+
.jobExecutionId(1).lowWatermark("watermark:1").highWatermark("watermark:2")
+ .issues(Suppliers.ofInstance(Collections.emptyList()))
.build();
org.apache.gobblin.service.monitoring.JobStatus js2 =
org.apache.gobblin.service.monitoring.JobStatus.builder().flowGroup("fgroup1").flowName("flow1")
.jobGroup("jgroup1").jobName("job2").startTime(2000L).endTime(6000L)
.eventName(ExecutionStatus.COMPLETE.name()).flowExecutionId(0).message("Test
message 2").processedCount(200)
-
.jobExecutionId(2).lowWatermark("watermark:2").highWatermark("watermark:3").issues(Collections.emptyList())
+
.jobExecutionId(2).lowWatermark("watermark:2").highWatermark("watermark:3")
+ .issues(Suppliers.ofInstance(Collections.emptyList()))
.build();
org.apache.gobblin.service.monitoring.JobStatus fs1 =
org.apache.gobblin.service.monitoring.JobStatus.builder().flowGroup("fgroup1").flowName("flow1")
.jobGroup(JobStatusRetriever.NA_KEY).jobName(JobStatusRetriever.NA_KEY).endTime(7000L)
.eventName(ExecutionStatus.COMPLETE.name()).flowExecutionId(0).message("Flow
message")
- .issues(Collections.emptyList()).build();
+ .issues(Suppliers.ofInstance(Collections.emptyList())).build();
List<org.apache.gobblin.service.monitoring.JobStatus> jobStatusList =
Lists.newArrayList(js1, js2, fs1);
_listOfJobStatusLists = Lists.newArrayList();
_listOfJobStatusLists.add(jobStatusList);
@@ -247,19 +253,21 @@ public class FlowStatusTest {
org.apache.gobblin.service.monitoring.JobStatus.builder().flowGroup("fgroup1").flowName("flow1")
.jobGroup("jgroup1").jobName("job1").startTime(1000L).endTime(5000L)
.eventName(ExecutionStatus.RUNNING.name()).flowExecutionId(0).message("Test
message 1").processedCount(100)
-
.jobExecutionId(1).lowWatermark("watermark:1").highWatermark("watermark:2").issues(Collections.emptyList())
+
.jobExecutionId(1).lowWatermark("watermark:1").highWatermark("watermark:2")
+ .issues(Suppliers.ofInstance(Collections.emptyList()))
.build();
org.apache.gobblin.service.monitoring.JobStatus js2 =
org.apache.gobblin.service.monitoring.JobStatus.builder().flowGroup("fgroup1").flowName("flow1")
.jobGroup("jgroup1").jobName("job2").startTime(2000L).endTime(6000L)
.eventName(ExecutionStatus.COMPLETE.name()).flowExecutionId(0).message("Test
message 2").processedCount(200)
-
.jobExecutionId(2).lowWatermark("watermark:2").highWatermark("watermark:3").issues(Collections.emptyList())
+
.jobExecutionId(2).lowWatermark("watermark:2").highWatermark("watermark:3")
+ .issues(Suppliers.ofInstance(Collections.emptyList()))
.build();
org.apache.gobblin.service.monitoring.JobStatus fs1 =
org.apache.gobblin.service.monitoring.JobStatus.builder().flowGroup("fgroup1").flowName("flow1")
.jobGroup(JobStatusRetriever.NA_KEY).jobName(JobStatusRetriever.NA_KEY)
.eventName(ExecutionStatus.RUNNING.name()).flowExecutionId(0).message("Flow
message")
- .issues(Collections.emptyList()).build();
+ .issues(Suppliers.ofInstance(Collections.emptyList())).build();
List<org.apache.gobblin.service.monitoring.JobStatus> jobStatusList =
Lists.newArrayList(js1, js2, fs1);
_listOfJobStatusLists = Lists.newArrayList();
_listOfJobStatusLists.add(jobStatusList);
@@ -296,19 +304,21 @@ public class FlowStatusTest {
org.apache.gobblin.service.monitoring.JobStatus.builder().flowGroup("fgroup1").flowName("flow1")
.jobGroup("jgroup1").jobName("job1").startTime(1000L).endTime(5000L)
.eventName(ExecutionStatus.COMPLETE.name()).flowExecutionId(0).message("Test
message 1").processedCount(100)
-
.jobExecutionId(1).lowWatermark("watermark:1").highWatermark("watermark:2").issues(Collections.emptyList())
+
.jobExecutionId(1).lowWatermark("watermark:1").highWatermark("watermark:2")
+ .issues(Suppliers.ofInstance(Collections.emptyList()))
.build();
org.apache.gobblin.service.monitoring.JobStatus js2 =
org.apache.gobblin.service.monitoring.JobStatus.builder().flowGroup("fgroup1").flowName("flow1")
.jobGroup("jgroup1").jobName("job2").startTime(2000L).endTime(6000L)
.eventName(ExecutionStatus.FAILED.name()).flowExecutionId(0).message("Test
message 2").processedCount(200)
-
.jobExecutionId(2).lowWatermark("watermark:2").highWatermark("watermark:3").issues(Collections.emptyList())
+
.jobExecutionId(2).lowWatermark("watermark:2").highWatermark("watermark:3")
+ .issues(Suppliers.ofInstance(Collections.emptyList()))
.build();
org.apache.gobblin.service.monitoring.JobStatus fs1 =
org.apache.gobblin.service.monitoring.JobStatus.builder().flowGroup("fgroup1").flowName("flow1")
.jobGroup(JobStatusRetriever.NA_KEY).jobName(JobStatusRetriever.NA_KEY).endTime(7000L)
.eventName(ExecutionStatus.FAILED.name()).flowExecutionId(0).message("Flow
message")
- .issues(Collections.emptyList()).build();
+ .issues(Suppliers.ofInstance(Collections.emptyList())).build();
List<org.apache.gobblin.service.monitoring.JobStatus> jobStatusList =
Lists.newArrayList(js1, js2, fs1);
_listOfJobStatusLists = Lists.newArrayList();
_listOfJobStatusLists.add(jobStatusList);
diff --git
a/gobblin-restli/gobblin-flow-config-service/gobblin-flow-config-service-server/src/main/java/org/apache/gobblin/service/FlowExecutionResource.java
b/gobblin-restli/gobblin-flow-config-service/gobblin-flow-config-service-server/src/main/java/org/apache/gobblin/service/FlowExecutionResource.java
index 12ace49..0f276b5 100644
---
a/gobblin-restli/gobblin-flow-config-service/gobblin-flow-config-service-server/src/main/java/org/apache/gobblin/service/FlowExecutionResource.java
+++
b/gobblin-restli/gobblin-flow-config-service/gobblin-flow-config-service-server/src/main/java/org/apache/gobblin/service/FlowExecutionResource.java
@@ -57,16 +57,28 @@ public class FlowExecutionResource extends
ComplexKeyResourceTemplate<FlowStatus
return this.flowExecutionResourceHandler.get(key);
}
+ /**
+ * Retrieve the most recent matching FlowExecution(s) of the identified
FlowId
+ * @param includeIssues include job issues in the response. Otherwise empty
array of issues will be returned.
+ */
@Finder("latestFlowExecution")
public List<FlowExecution> getLatestFlowExecution(@Context PagingContext
context, @QueryParam("flowId") FlowId flowId,
- @Optional @QueryParam("count") Integer count, @Optional
@QueryParam("tag") String tag, @Optional @QueryParam("executionStatus") String
executionStatus) {
- return this.flowExecutionResourceHandler.getLatestFlowExecution(context,
flowId, count, tag, executionStatus);
+ @Optional @QueryParam("count") Integer count, @Optional
@QueryParam("tag") String tag, @Optional @QueryParam("executionStatus") String
executionStatus,
+ @Optional("false") @QueryParam("includeIssues") Boolean includeIssues) {
+ return this.flowExecutionResourceHandler.getLatestFlowExecution(context,
flowId, count, tag, executionStatus, includeIssues);
}
+ /**
+ * Retrieve the most recent matching FlowExecution(s) for each flow in the
identified flowGroup
+ * @param countPerFlow (maximum) number of FlowExecutions for each flow in
flowGroup *
+ * @param includeIssues include job issues in the response. Otherwise empty
array of issues will be returned.
+ * @return
+ */
@Finder("latestFlowGroupExecutions")
public List<FlowExecution> getLatestFlowGroupExecutions(@Context
PagingContext context, @QueryParam("flowGroup") String flowGroup,
- @Optional @QueryParam("countPerFlow") Integer countPerFlow, @Optional
@QueryParam("tag") String tag) {
- return
this.flowExecutionResourceHandler.getLatestFlowGroupExecutions(context,
flowGroup, countPerFlow, tag);
+ @Optional @QueryParam("countPerFlow") Integer countPerFlow, @Optional
@QueryParam("tag") String tag,
+ @Optional("false") @QueryParam("includeIssues") Boolean includeIssues) {
+ return
this.flowExecutionResourceHandler.getLatestFlowGroupExecutions(context,
flowGroup, countPerFlow, tag, includeIssues);
}
/**
diff --git
a/gobblin-restli/gobblin-flow-config-service/gobblin-flow-config-service-server/src/main/java/org/apache/gobblin/service/FlowExecutionResourceHandler.java
b/gobblin-restli/gobblin-flow-config-service/gobblin-flow-config-service-server/src/main/java/org/apache/gobblin/service/FlowExecutionResourceHandler.java
index ea3e9ef..8b2935f 100644
---
a/gobblin-restli/gobblin-flow-config-service/gobblin-flow-config-service-server/src/main/java/org/apache/gobblin/service/FlowExecutionResourceHandler.java
+++
b/gobblin-restli/gobblin-flow-config-service/gobblin-flow-config-service-server/src/main/java/org/apache/gobblin/service/FlowExecutionResourceHandler.java
@@ -34,7 +34,8 @@ public interface FlowExecutionResourceHandler {
/**
* Get latest {@link FlowExecution}
*/
- public List<FlowExecution> getLatestFlowExecution(PagingContext context,
FlowId flowId, Integer count, String tag, String executionStatus);
+ public List<FlowExecution> getLatestFlowExecution(PagingContext context,
FlowId flowId, Integer count, String tag,
+ String executionStatus, Boolean includeIssues);
/**
* Get latest {@link FlowExecution} for every flow in `flowGroup`
@@ -42,7 +43,8 @@ public interface FlowExecutionResourceHandler {
* NOTE: `executionStatus` param not provided yet, without justifying use
case, due to complexity of interaction with `countPerFlow`
* and resulting efficiency concern of performing across many flows sharing
the single named group.
*/
- public List<FlowExecution> getLatestFlowGroupExecutions(PagingContext
context, String flowGroup, Integer countPerFLow, String tag);
+ public List<FlowExecution> getLatestFlowGroupExecutions(PagingContext
context, String flowGroup, Integer countPerFLow,
+ String tag, Boolean includeIssues);
/**
* Resume a failed {@link FlowExecution} from the point before failure
diff --git
a/gobblin-restli/gobblin-flow-config-service/gobblin-flow-config-service-server/src/main/java/org/apache/gobblin/service/FlowExecutionResourceLocalHandler.java
b/gobblin-restli/gobblin-flow-config-service/gobblin-flow-config-service-server/src/main/java/org/apache/gobblin/service/FlowExecutionResourceLocalHandler.java
index ba5aaf1..5237fb2 100644
---
a/gobblin-restli/gobblin-flow-config-service/gobblin-flow-config-service-server/src/main/java/org/apache/gobblin/service/FlowExecutionResourceLocalHandler.java
+++
b/gobblin-restli/gobblin-flow-config-service/gobblin-flow-config-service-server/src/main/java/org/apache/gobblin/service/FlowExecutionResourceLocalHandler.java
@@ -56,7 +56,7 @@ public class FlowExecutionResourceLocalHandler implements
FlowExecutionResourceH
@Override
public FlowExecution get(ComplexResourceKey<FlowStatusId, EmptyRecord> key) {
- FlowExecution flowExecution =
convertFlowStatus(getFlowStatusFromGenerator(key, this.flowStatusGenerator));
+ FlowExecution flowExecution =
convertFlowStatus(getFlowStatusFromGenerator(key, this.flowStatusGenerator),
true);
if (flowExecution == null) {
throw new RestLiServiceException(HttpStatus.S_404_NOT_FOUND, "No flow
execution found for flowStatusId " + key.getKey()
+ ". The flowStatusId may be incorrect, or the flow execution may
have been cleaned up.");
@@ -65,11 +65,14 @@ public class FlowExecutionResourceLocalHandler implements
FlowExecutionResourceH
}
@Override
- public List<FlowExecution> getLatestFlowExecution(PagingContext context,
FlowId flowId, Integer count, String tag, String executionStatus) {
+ public List<FlowExecution> getLatestFlowExecution(PagingContext context,
FlowId flowId, Integer count, String tag,
+ String executionStatus, Boolean includeIssues) {
List<org.apache.gobblin.service.monitoring.FlowStatus> flowStatuses =
getLatestFlowStatusesFromGenerator(flowId, count, tag, executionStatus,
this.flowStatusGenerator);
if (flowStatuses != null) {
- return
flowStatuses.stream().map(FlowExecutionResourceLocalHandler::convertFlowStatus).collect(Collectors.toList());
+ return flowStatuses.stream()
+ .map((FlowStatus monitoringFlowStatus) ->
convertFlowStatus(monitoringFlowStatus, includeIssues))
+ .collect(Collectors.toList());
}
throw new RestLiServiceException(HttpStatus.S_404_NOT_FOUND, "No flow
execution found for flowId " + flowId
@@ -78,12 +81,15 @@ public class FlowExecutionResourceLocalHandler implements
FlowExecutionResourceH
}
@Override
- public List<FlowExecution> getLatestFlowGroupExecutions(PagingContext
context, String flowGroup, Integer countPerFlow, String tag) {
+ public List<FlowExecution> getLatestFlowGroupExecutions(PagingContext
context, String flowGroup, Integer countPerFlow,
+ String tag, Boolean includeIssues) {
List<org.apache.gobblin.service.monitoring.FlowStatus> flowStatuses =
getLatestFlowGroupStatusesFromGenerator(flowGroup, countPerFlow, tag,
this.flowStatusGenerator);
if (flowStatuses != null) {
- return
flowStatuses.stream().map(FlowExecutionResourceLocalHandler::convertFlowStatus).collect(Collectors.toList());
+ return flowStatuses.stream()
+ .map((FlowStatus monitoringFlowStatus) ->
convertFlowStatus(monitoringFlowStatus, includeIssues))
+ .collect(Collectors.toList());
}
throw new RestLiServiceException(HttpStatus.S_404_NOT_FOUND, "No flow
executions found for flowGroup " + flowGroup
@@ -137,7 +143,8 @@ public class FlowExecutionResourceLocalHandler implements
FlowExecutionResourceH
* @param monitoringFlowStatus
* @return a {@link FlowExecution} converted from a {@link
org.apache.gobblin.service.monitoring.FlowStatus}
*/
- public static FlowExecution
convertFlowStatus(org.apache.gobblin.service.monitoring.FlowStatus
monitoringFlowStatus) {
+ public static FlowExecution
convertFlowStatus(org.apache.gobblin.service.monitoring.FlowStatus
monitoringFlowStatus,
+ boolean includeIssues) {
if (monitoringFlowStatus == null) {
return null;
}
@@ -183,9 +190,15 @@ public class FlowExecutionResourceLocalHandler implements
FlowExecutionResourceH
.setExecutionStatus(ExecutionStatus.valueOf(queriedJobStatus.getEventName()))
.setMessage(queriedJobStatus.getMessage())
.setJobState(new
JobState().setLowWatermark(queriedJobStatus.getLowWatermark()).
- setHighWatermark(queriedJobStatus.getHighWatermark()))
- .setIssues(new IssueArray(queriedJobStatus.getIssues().stream()
-
.map(FlowExecutionResourceLocalHandler::convertIssueToRestApiObject).collect(Collectors.toList())));
+ setHighWatermark(queriedJobStatus.getHighWatermark()));
+
+ if (includeIssues) {
+ jobStatus.setIssues(new
IssueArray(queriedJobStatus.getIssues().get().stream()
+
.map(FlowExecutionResourceLocalHandler::convertIssueToRestApiObject)
+ .collect(Collectors.toList())));
+ } else {
+ jobStatus.setIssues(new IssueArray());
+ }
if (!Strings.isNullOrEmpty(queriedJobStatus.getMetrics())) {
jobStatus.setMetrics(queriedJobStatus.getMetrics());
diff --git
a/gobblin-restli/gobblin-flow-config-service/gobblin-flow-config-service-server/src/main/java/org/apache/gobblin/service/FlowStatusResource.java
b/gobblin-restli/gobblin-flow-config-service/gobblin-flow-config-service-server/src/main/java/org/apache/gobblin/service/FlowStatusResource.java
index 48ad8e8..483648f 100644
---
a/gobblin-restli/gobblin-flow-config-service/gobblin-flow-config-service-server/src/main/java/org/apache/gobblin/service/FlowStatusResource.java
+++
b/gobblin-restli/gobblin-flow-config-service/gobblin-flow-config-service-server/src/main/java/org/apache/gobblin/service/FlowStatusResource.java
@@ -82,7 +82,7 @@ public class FlowStatusResource extends
ComplexKeyResourceTemplate<FlowStatusId,
if (monitoringFlowStatus == null) {
return null;
}
- FlowExecution flowExecution =
FlowExecutionResourceLocalHandler.convertFlowStatus(monitoringFlowStatus);
+ FlowExecution flowExecution =
FlowExecutionResourceLocalHandler.convertFlowStatus(monitoringFlowStatus,
false);
return new FlowStatus()
.setId(flowExecution.getId())
.setExecutionStatistics(flowExecution.getExecutionStatistics())
diff --git
a/gobblin-runtime/src/main/java/org/apache/gobblin/service/monitoring/JobStatus.java
b/gobblin-runtime/src/main/java/org/apache/gobblin/service/monitoring/JobStatus.java
index ebb710b..6917e92 100644
---
a/gobblin-runtime/src/main/java/org/apache/gobblin/service/monitoring/JobStatus.java
+++
b/gobblin-runtime/src/main/java/org/apache/gobblin/service/monitoring/JobStatus.java
@@ -19,6 +19,8 @@ package org.apache.gobblin.service.monitoring;
import java.util.List;
+import com.google.common.base.Supplier;
+
import lombok.Builder;
import lombok.Getter;
import lombok.Setter;
@@ -54,7 +56,7 @@ public class JobStatus {
private final int maxAttempts;
private final int currentAttempts;
private final boolean shouldRetry;
- private final List<Issue> issues;
+ private final Supplier<List<Issue>> issues;
private final int progressPercentage;
private final long lastProgressEventTime;
}
diff --git
a/gobblin-runtime/src/main/java/org/apache/gobblin/service/monitoring/JobStatusRetriever.java
b/gobblin-runtime/src/main/java/org/apache/gobblin/service/monitoring/JobStatusRetriever.java
index 346474b..8452e9a 100644
---
a/gobblin-runtime/src/main/java/org/apache/gobblin/service/monitoring/JobStatusRetriever.java
+++
b/gobblin-runtime/src/main/java/org/apache/gobblin/service/monitoring/JobStatusRetriever.java
@@ -25,6 +25,8 @@ import java.util.Map;
import java.util.Objects;
import java.util.stream.Collectors;
+import com.google.common.base.Supplier;
+import com.google.common.base.Suppliers;
import com.google.common.collect.Iterators;
import com.google.common.collect.Ordering;
import com.typesafe.config.ConfigFactory;
@@ -121,21 +123,25 @@ public abstract class JobStatusRetriever implements
LatestFlowExecutionIdTracker
int progressPercentage =
jobState.getPropAsInt(TimingEvent.JOB_COMPLETION_PERCENTAGE, 0);
long lastProgressEventTime =
jobState.getPropAsLong(TimingEvent.JOB_LAST_PROGRESS_EVENT_TIME, 0);
+ String contextId =
TroubleshooterUtils.getContextIdForJob(jobState.getProperties());
- List<Issue> issues;
- try {
- String contextId =
TroubleshooterUtils.getContextIdForJob(jobState.getProperties());
- issues = issueRepository.getAll(contextId);
- } catch (TroubleshooterException e) {
- log.warn("Cannot retrieve job issues", e);
- issues = Collections.emptyList();
- }
+ Supplier<List<Issue>> jobIssues = Suppliers.memoize(() -> {
+ List<Issue> issues;
+ try {
+ issues = issueRepository.getAll(contextId);
+ } catch (TroubleshooterException e) {
+ log.warn("Cannot retrieve job issues", e);
+ issues = Collections.emptyList();
+ }
+ return issues;
+ });
return
JobStatus.builder().flowName(flowName).flowGroup(flowGroup).flowExecutionId(flowExecutionId).
jobName(jobName).jobGroup(jobGroup).jobTag(jobTag).jobExecutionId(jobExecutionId).eventName(eventName).
lowWatermark(lowWatermark).highWatermark(highWatermark).orchestratedTime(orchestratedTime).startTime(startTime).endTime(endTime).
message(message).processedCount(processedCount).maxAttempts(maxAttempts).currentAttempts(currentAttempts).
-
shouldRetry(shouldRetry).progressPercentage(progressPercentage).lastProgressEventTime(lastProgressEventTime).issues(issues).build();
+
shouldRetry(shouldRetry).progressPercentage(progressPercentage).lastProgressEventTime(lastProgressEventTime).
+ issues(jobIssues).build();
}
protected final String getFlowGroup(State jobState) {
diff --git
a/gobblin-service/src/main/java/org/apache/gobblin/service/modules/restli/GobblinServiceFlowExecutionResourceHandler.java
b/gobblin-service/src/main/java/org/apache/gobblin/service/modules/restli/GobblinServiceFlowExecutionResourceHandler.java
index cb4a779..15ffc2f 100644
---
a/gobblin-service/src/main/java/org/apache/gobblin/service/modules/restli/GobblinServiceFlowExecutionResourceHandler.java
+++
b/gobblin-service/src/main/java/org/apache/gobblin/service/modules/restli/GobblinServiceFlowExecutionResourceHandler.java
@@ -73,13 +73,15 @@ public class GobblinServiceFlowExecutionResourceHandler
implements FlowExecution
}
@Override
- public List<FlowExecution> getLatestFlowExecution(PagingContext context,
FlowId flowId, Integer count, String tag, String executionStatus) {
- return this.localHandler.getLatestFlowExecution(context, flowId, count,
tag, executionStatus);
+ public List<FlowExecution> getLatestFlowExecution(PagingContext context,
FlowId flowId, Integer count, String tag,
+ String executionStatus, Boolean includeIssues) {
+ return this.localHandler.getLatestFlowExecution(context, flowId, count,
tag, executionStatus, includeIssues);
}
@Override
- public List<FlowExecution> getLatestFlowGroupExecutions(PagingContext
context, String flowGroup, Integer countPerFlow, String tag) {
- return this.localHandler.getLatestFlowGroupExecutions(context, flowGroup,
countPerFlow, tag);
+ public List<FlowExecution> getLatestFlowGroupExecutions(PagingContext
context, String flowGroup, Integer countPerFlow,
+ String tag, Boolean includeIssues) {
+ return this.localHandler.getLatestFlowGroupExecutions(context, flowGroup,
countPerFlow, tag, includeIssues);
}
@Override