This is an automated email from the ASF dual-hosted git repository.
suvasude pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/incubator-gobblin.git
The following commit(s) were added to refs/heads/master by this push:
new 93f9a81 [GOBBLIN-1154] Improve gaas error messages
93f9a81 is described below
commit 93f9a817ff9cc259734eec81e4cb7408aab8cc98
Author: Jack Moseley <[email protected]>
AuthorDate: Mon Jun 1 18:27:07 2020 -0700
[GOBBLIN-1154] Improve gaas error messages
Closes #2993 from jack-moseley/gaas-improved-
errors
---
.../apache/gobblin/metrics/event/TimingEvent.java | 1 +
.../org/apache/gobblin/service/FlowStatusTest.java | 23 ++++++-----------
.../service/FlowConfigV2ResourceLocalHandler.java | 12 ++++++---
.../gobblin/service/FlowExecutionResource.java | 29 ++++++++++-----------
.../gobblin/service/modules/flowgraph/Dag.java | 7 +++++
.../service/modules/orchestration/DagManager.java | 30 ++++++++++++++++++----
.../modules/orchestration/DagManagerUtils.java | 9 +++++++
.../modules/orchestration/Orchestrator.java | 7 +++++
.../monitoring/KafkaAvroJobStatusMonitor.java | 1 +
9 files changed, 81 insertions(+), 38 deletions(-)
diff --git
a/gobblin-metrics-libs/gobblin-metrics-base/src/main/java/org/apache/gobblin/metrics/event/TimingEvent.java
b/gobblin-metrics-libs/gobblin-metrics-base/src/main/java/org/apache/gobblin/metrics/event/TimingEvent.java
index 0664c2f..45af9ad 100644
---
a/gobblin-metrics-libs/gobblin-metrics-base/src/main/java/org/apache/gobblin/metrics/event/TimingEvent.java
+++
b/gobblin-metrics-libs/gobblin-metrics-base/src/main/java/org/apache/gobblin/metrics/event/TimingEvent.java
@@ -70,6 +70,7 @@ public class TimingEvent extends GobblinEventBuilder
implements Closeable {
public static final String FLOW_SUCCEEDED = "FlowSucceeded";
public static final String FLOW_FAILED = "FlowFailed";
public static final String FLOW_RUNNING = "FlowRunning";
+ public static final String FLOW_CANCELLED = "FlowCancelled";
}
public static class FlowEventConstants {
diff --git
a/gobblin-restli/gobblin-flow-config-service/gobblin-flow-config-service-client/src/test/java/org/apache/gobblin/service/FlowStatusTest.java
b/gobblin-restli/gobblin-flow-config-service/gobblin-flow-config-service-client/src/test/java/org/apache/gobblin/service/FlowStatusTest.java
index b008f83..e133582 100644
---
a/gobblin-restli/gobblin-flow-config-service/gobblin-flow-config-service-client/src/test/java/org/apache/gobblin/service/FlowStatusTest.java
+++
b/gobblin-restli/gobblin-flow-config-service/gobblin-flow-config-service-client/src/test/java/org/apache/gobblin/service/FlowStatusTest.java
@@ -28,7 +28,6 @@ import org.testng.annotations.AfterClass;
import org.testng.annotations.BeforeClass;
import org.testng.annotations.Test;
-import com.google.common.base.Joiner;
import com.google.common.collect.Iterators;
import com.google.common.collect.Lists;
import com.google.inject.Binder;
@@ -38,7 +37,6 @@ import com.google.inject.Module;
import com.google.inject.name.Names;
import com.linkedin.restli.server.resources.BaseResource;
-import org.apache.gobblin.config.ConfigBuilder;
import org.apache.gobblin.configuration.State;
import org.apache.gobblin.metastore.StateStore;
import org.apache.gobblin.restli.EmbeddedRestliServer;
@@ -50,7 +48,6 @@ public class FlowStatusTest {
private FlowStatusClient _client;
private EmbeddedRestliServer _server;
private List<List<org.apache.gobblin.service.monitoring.JobStatus>>
_listOfJobStatusLists;
- private Joiner messageJoiner;
class TestJobStatusRetriever extends JobStatusRetriever {
@Override
@@ -85,8 +82,6 @@ public class FlowStatusTest {
@BeforeClass
public void setUp() throws Exception {
- ConfigBuilder configBuilder = ConfigBuilder.create();
-
JobStatusRetriever jobStatusRetriever = new TestJobStatusRetriever();
final FlowStatusGenerator flowStatusGenerator =
FlowStatusGenerator.builder().jobStatusRetriever(jobStatusRetriever).build();
@@ -107,8 +102,6 @@ public class FlowStatusTest {
_client =
new FlowStatusClient(String.format("http://localhost:%s/",
_server.getPort()));
-
- messageJoiner = Joiner.on(FlowStatusResource.MESSAGE_SEPARATOR);
}
/**
@@ -134,7 +127,7 @@ public class FlowStatusTest {
.processedCount(200).jobExecutionId(2).lowWatermark("watermark:2").highWatermark("watermark:3").build();
org.apache.gobblin.service.monitoring.JobStatus fs2 =
org.apache.gobblin.service.monitoring.JobStatus.builder().flowGroup("fgroup1")
.flowName("flow1").jobGroup(JobStatusRetriever.NA_KEY).jobName(JobStatusRetriever.NA_KEY).endTime(7000L)
- .eventName(ExecutionStatus.COMPLETE.name()).flowExecutionId(1).build();
+
.eventName(ExecutionStatus.COMPLETE.name()).flowExecutionId(1).message("Flow
message").build();
List<org.apache.gobblin.service.monitoring.JobStatus> jobStatusList1 =
Lists.newArrayList(js1, fs1);
List<org.apache.gobblin.service.monitoring.JobStatus> jobStatusList2 =
Lists.newArrayList(js2, js3, fs2);
_listOfJobStatusLists = Lists.newArrayList();
@@ -148,7 +141,7 @@ public class FlowStatusTest {
Assert.assertEquals(flowStatus.getId().getFlowName(), "flow1");
Assert.assertEquals(flowStatus.getExecutionStatistics().getExecutionStartTime().longValue(),
1L);
Assert.assertEquals(flowStatus.getExecutionStatistics().getExecutionEndTime().longValue(),
7000L);
- Assert.assertEquals(flowStatus.getMessage(),
messageJoiner.join(js2.getMessage(), js3.getMessage()));
+ Assert.assertEquals(flowStatus.getMessage(), fs2.getMessage());
Assert.assertEquals(flowStatus.getExecutionStatus(),
ExecutionStatus.COMPLETE);
JobStatusArray jobStatuses = flowStatus.getJobStatuses();
@@ -189,7 +182,7 @@ public class FlowStatusTest {
.processedCount(200).jobExecutionId(2).lowWatermark("watermark:2").highWatermark("watermark:3").build();
org.apache.gobblin.service.monitoring.JobStatus fs1 =
org.apache.gobblin.service.monitoring.JobStatus.builder().flowGroup("fgroup1")
.flowName("flow1").jobGroup(JobStatusRetriever.NA_KEY).jobName(JobStatusRetriever.NA_KEY).endTime(7000L)
- .eventName(ExecutionStatus.COMPLETE.name()).flowExecutionId(0).build();
+
.eventName(ExecutionStatus.COMPLETE.name()).flowExecutionId(0).message("Flow
message").build();
List<org.apache.gobblin.service.monitoring.JobStatus> jobStatusList =
Lists.newArrayList(js1, js2, fs1);
_listOfJobStatusLists = Lists.newArrayList();
_listOfJobStatusLists.add(jobStatusList);
@@ -201,7 +194,7 @@ public class FlowStatusTest {
Assert.assertEquals(flowStatus.getId().getFlowName(), "flow1");
Assert.assertEquals(flowStatus.getExecutionStatistics().getExecutionStartTime().longValue(),
0L);
Assert.assertEquals(flowStatus.getExecutionStatistics().getExecutionEndTime().longValue(),
7000L);
- Assert.assertEquals(flowStatus.getMessage(),
messageJoiner.join(js1.getMessage(), js2.getMessage()));
+ Assert.assertEquals(flowStatus.getMessage(), fs1.getMessage());
Assert.assertEquals(flowStatus.getExecutionStatus(),
ExecutionStatus.COMPLETE);
JobStatusArray jobStatuses = flowStatus.getJobStatuses();
@@ -232,7 +225,7 @@ public class FlowStatusTest {
.processedCount(200).jobExecutionId(2).lowWatermark("watermark:2").highWatermark("watermark:3").build();
org.apache.gobblin.service.monitoring.JobStatus fs1 =
org.apache.gobblin.service.monitoring.JobStatus.builder().flowGroup("fgroup1")
.flowName("flow1").jobGroup(JobStatusRetriever.NA_KEY).jobName(JobStatusRetriever.NA_KEY)
- .eventName(ExecutionStatus.RUNNING.name()).flowExecutionId(0).build();
+
.eventName(ExecutionStatus.RUNNING.name()).flowExecutionId(0).message("Flow
message").build();
List<org.apache.gobblin.service.monitoring.JobStatus> jobStatusList =
Lists.newArrayList(js1, js2, fs1);
_listOfJobStatusLists = Lists.newArrayList();
_listOfJobStatusLists.add(jobStatusList);
@@ -244,7 +237,7 @@ public class FlowStatusTest {
Assert.assertEquals(flowStatus.getId().getFlowName(), "flow1");
Assert.assertEquals(flowStatus.getExecutionStatistics().getExecutionStartTime().longValue(),
0L);
Assert.assertEquals(flowStatus.getExecutionStatistics().getExecutionEndTime().longValue(),
0L);
- Assert.assertEquals(flowStatus.getMessage(),
messageJoiner.join(js1.getMessage(), js2.getMessage()));
+ Assert.assertEquals(flowStatus.getMessage(), fs1.getMessage());
Assert.assertEquals(flowStatus.getExecutionStatus(),
ExecutionStatus.RUNNING);
JobStatusArray jobStatuses = flowStatus.getJobStatuses();
@@ -275,7 +268,7 @@ public class FlowStatusTest {
.processedCount(200).jobExecutionId(2).lowWatermark("watermark:2").highWatermark("watermark:3").build();
org.apache.gobblin.service.monitoring.JobStatus fs1 =
org.apache.gobblin.service.monitoring.JobStatus.builder().flowGroup("fgroup1")
.flowName("flow1").jobGroup(JobStatusRetriever.NA_KEY).jobName(JobStatusRetriever.NA_KEY).endTime(7000L)
- .eventName(ExecutionStatus.FAILED.name()).flowExecutionId(0).build();
+
.eventName(ExecutionStatus.FAILED.name()).flowExecutionId(0).message("Flow
message").build();
List<org.apache.gobblin.service.monitoring.JobStatus> jobStatusList =
Lists.newArrayList(js1, js2, fs1);
_listOfJobStatusLists = Lists.newArrayList();
_listOfJobStatusLists.add(jobStatusList);
@@ -287,7 +280,7 @@ public class FlowStatusTest {
Assert.assertEquals(flowStatus.getId().getFlowName(), "flow1");
Assert.assertEquals(flowStatus.getExecutionStatistics().getExecutionStartTime().longValue(),
0L);
Assert.assertEquals(flowStatus.getExecutionStatistics().getExecutionEndTime().longValue(),
7000L);
- Assert.assertEquals(flowStatus.getMessage(),
messageJoiner.join(js1.getMessage(), js2.getMessage()));
+ Assert.assertEquals(flowStatus.getMessage(), fs1.getMessage());
Assert.assertEquals(flowStatus.getExecutionStatus(),
ExecutionStatus.FAILED);
JobStatusArray jobStatuses = flowStatus.getJobStatuses();
diff --git
a/gobblin-restli/gobblin-flow-config-service/gobblin-flow-config-service-server/src/main/java/org/apache/gobblin/service/FlowConfigV2ResourceLocalHandler.java
b/gobblin-restli/gobblin-flow-config-service/gobblin-flow-config-service-server/src/main/java/org/apache/gobblin/service/FlowConfigV2ResourceLocalHandler.java
index efdb990..0990338 100644
---
a/gobblin-restli/gobblin-flow-config-service/gobblin-flow-config-service-server/src/main/java/org/apache/gobblin/service/FlowConfigV2ResourceLocalHandler.java
+++
b/gobblin-restli/gobblin-flow-config-service/gobblin-flow-config-service-server/src/main/java/org/apache/gobblin/service/FlowConfigV2ResourceLocalHandler.java
@@ -24,6 +24,7 @@ import com.linkedin.restli.common.ComplexResourceKey;
import com.linkedin.restli.common.HttpStatus;
import com.linkedin.restli.common.PatchRequest;
import com.linkedin.restli.server.CreateKVResponse;
+import com.linkedin.restli.server.RestLiServiceException;
import com.linkedin.restli.server.UpdateResponse;
import lombok.extern.slf4j.Slf4j;
@@ -62,8 +63,9 @@ public class FlowConfigV2ResourceLocalHandler extends
FlowConfigResourceLocalHan
// Return conflict and take no action if flowSpec has already been created
if (this.flowCatalog.exists(flowSpec.getUri())) {
- log.warn("Flowspec with URI {} already exists, no action will be taken",
flowSpec.getUri());
- return new CreateKVResponse(new ComplexResourceKey<>(flowConfig.getId(),
flowStatusId), flowConfig, HttpStatus.S_409_CONFLICT);
+ log.warn("FlowSpec with URI {} already exists, no action will be taken",
flowSpec.getUri());
+ throw new RestLiServiceException(HttpStatus.S_409_CONFLICT,
+ "FlowSpec with URI " + flowSpec.getUri() + " already exists, no
action will be taken");
}
Map<String, AddSpecResponse> responseMap = this.flowCatalog.put(flowSpec,
triggerListener);
@@ -81,7 +83,11 @@ public class FlowConfigV2ResourceLocalHandler extends
FlowConfigResourceLocalHan
} else if
(Boolean.parseBoolean(responseMap.getOrDefault(ServiceConfigKeys.COMPILATION_SUCCESSFUL,
new AddSpecResponse<>("false")).getValue().toString())) {
httpStatus = HttpStatus.S_201_CREATED;
} else {
- httpStatus = HttpStatus.S_400_BAD_REQUEST;
+ String message = "Flow was not compiled successfully. It may be due to
no path being found";
+ if (!flowSpec.getCompilationErrors().isEmpty()) {
+ message = message + " or due to " + flowSpec.getCompilationErrors();
+ }
+ throw new RestLiServiceException(HttpStatus.S_400_BAD_REQUEST, message);
}
return new CreateKVResponse(new ComplexResourceKey<>(flowConfig.getId(),
flowStatusId), flowConfig, httpStatus);
diff --git
a/gobblin-restli/gobblin-flow-config-service/gobblin-flow-config-service-server/src/main/java/org/apache/gobblin/service/FlowExecutionResource.java
b/gobblin-restli/gobblin-flow-config-service/gobblin-flow-config-service-server/src/main/java/org/apache/gobblin/service/FlowExecutionResource.java
index fcdf6a1..afd55a2 100644
---
a/gobblin-restli/gobblin-flow-config-service/gobblin-flow-config-service-server/src/main/java/org/apache/gobblin/service/FlowExecutionResource.java
+++
b/gobblin-restli/gobblin-flow-config-service/gobblin-flow-config-service-server/src/main/java/org/apache/gobblin/service/FlowExecutionResource.java
@@ -32,6 +32,7 @@ import com.linkedin.restli.common.ComplexResourceKey;
import com.linkedin.restli.common.EmptyRecord;
import com.linkedin.restli.common.HttpStatus;
import com.linkedin.restli.server.PagingContext;
+import com.linkedin.restli.server.RestLiServiceException;
import com.linkedin.restli.server.UpdateResponse;
import com.linkedin.restli.server.annotations.Context;
import com.linkedin.restli.server.annotations.Finder;
@@ -65,8 +66,12 @@ public class FlowExecutionResource extends
ComplexKeyResourceTemplate<FlowStatus
*/
@Override
public FlowExecution get(ComplexResourceKey<FlowStatusId, EmptyRecord> key) {
- // this returns null to raise a 404 error if flowStatus is null
- return convertFlowStatus(getFlowStatusFromGenerator(key,
this._flowStatusGenerator));
+ FlowExecution flowExecution =
convertFlowStatus(getFlowStatusFromGenerator(key, this._flowStatusGenerator));
+ if (flowExecution == null) {
+ throw new RestLiServiceException(HttpStatus.S_404_NOT_FOUND, "No flow
execution found for flowStatusId " + key.getKey()
+ + ". The flowStatusId may be incorrect, or the flow execution may have
been cleaned up.");
+ }
+ return flowExecution;
}
@Finder("latestFlowExecution")
@@ -78,8 +83,8 @@ public class FlowExecutionResource extends
ComplexKeyResourceTemplate<FlowStatus
return
flowStatuses.stream().map(FlowExecutionResource::convertFlowStatus).collect(Collectors.toList());
}
- // will return 404 status code
- return null;
+ throw new RestLiServiceException(HttpStatus.S_404_NOT_FOUND, "No flow
execution found for flowId " + flowId
+ + ". The flowId may be incorrect, or the flow execution may have been
cleaned up.");
}
/**
@@ -135,7 +140,7 @@ public class FlowExecutionResource extends
ComplexKeyResourceTemplate<FlowStatus
long flowEndTime = 0L;
ExecutionStatus flowExecutionStatus = ExecutionStatus.$UNKNOWN;
- StringBuffer flowMessagesStringBuffer = new StringBuffer();
+ String flowMessage = "";
while (jobStatusIter.hasNext()) {
org.apache.gobblin.service.monitoring.JobStatus queriedJobStatus =
jobStatusIter.next();
@@ -144,6 +149,9 @@ public class FlowExecutionResource extends
ComplexKeyResourceTemplate<FlowStatus
if (JobStatusRetriever.isFlowStatus(queriedJobStatus)) {
flowEndTime = queriedJobStatus.getEndTime();
flowExecutionStatus =
ExecutionStatus.valueOf(queriedJobStatus.getEventName());
+ if (queriedJobStatus.getMessage() != null) {
+ flowMessage = queriedJobStatus.getMessage();
+ }
continue;
}
@@ -163,25 +171,16 @@ public class FlowExecutionResource extends
ComplexKeyResourceTemplate<FlowStatus
setHighWatermark(queriedJobStatus.getHighWatermark()));
jobStatusArray.add(jobStatus);
-
- if (!queriedJobStatus.getMessage().isEmpty()) {
- flowMessagesStringBuffer.append(queriedJobStatus.getMessage());
- flowMessagesStringBuffer.append(MESSAGE_SEPARATOR);
- }
}
jobStatusArray.sort(Comparator.comparing((JobStatus js) ->
js.getExecutionStatistics().getExecutionStartTime()));
- String flowMessages = flowMessagesStringBuffer.length() > 0 ?
- flowMessagesStringBuffer.substring(0,
flowMessagesStringBuffer.length() -
- MESSAGE_SEPARATOR.length()) : StringUtils.EMPTY;
-
return new FlowExecution()
.setId(new
FlowStatusId().setFlowGroup(flowId.getFlowGroup()).setFlowName(flowId.getFlowName())
.setFlowExecutionId(monitoringFlowStatus.getFlowExecutionId()))
.setExecutionStatistics(new
FlowStatistics().setExecutionStartTime(getFlowStartTime(monitoringFlowStatus))
.setExecutionEndTime(flowEndTime))
- .setMessage(flowMessages)
+ .setMessage(flowMessage)
.setExecutionStatus(flowExecutionStatus)
.setJobStatuses(jobStatusArray);
}
diff --git
a/gobblin-service/src/main/java/org/apache/gobblin/service/modules/flowgraph/Dag.java
b/gobblin-service/src/main/java/org/apache/gobblin/service/modules/flowgraph/Dag.java
index 745a15d..a1f0d2c 100644
---
a/gobblin-service/src/main/java/org/apache/gobblin/service/modules/flowgraph/Dag.java
+++
b/gobblin-service/src/main/java/org/apache/gobblin/service/modules/flowgraph/Dag.java
@@ -31,8 +31,10 @@ import com.google.common.collect.Lists;
import com.google.common.collect.Sets;
import lombok.Getter;
+import lombok.Setter;
import org.apache.gobblin.annotation.Alpha;
+import org.apache.gobblin.service.ExecutionStatus;
/**
@@ -48,6 +50,11 @@ public class Dag<T> {
private Map<DagNode, List<DagNode<T>>> parentChildMap;
private List<DagNode<T>> nodes;
+ @Setter
+ private String message;
+ @Setter
+ private String flowEvent;
+
public Dag(List<DagNode<T>> dagNodes) {
this.nodes = dagNodes;
//Build dag
diff --git
a/gobblin-service/src/main/java/org/apache/gobblin/service/modules/orchestration/DagManager.java
b/gobblin-service/src/main/java/org/apache/gobblin/service/modules/orchestration/DagManager.java
index f323ee3..db1145a 100644
---
a/gobblin-service/src/main/java/org/apache/gobblin/service/modules/orchestration/DagManager.java
+++
b/gobblin-service/src/main/java/org/apache/gobblin/service/modules/orchestration/DagManager.java
@@ -476,6 +476,9 @@ public class DagManager extends AbstractIdleService {
for (DagNode<JobExecutionPlan> dagNodeToCancel : dagNodesToCancel) {
cancelDagNode(dagNodeToCancel);
}
+
+
this.dags.get(dagToCancel).setFlowEvent(TimingEvent.FlowTimings.FLOW_CANCELLED);
+ this.dags.get(dagToCancel).setMessage("Flow killed by request");
} else {
log.warn("Did not find Dag with id {}, it might be already
cancelled/finished.", dagToCancel);
}
@@ -638,6 +641,11 @@ public class DagManager extends AbstractIdleService {
DagManagerUtils.getFullyQualifiedDagName(node),
timeOutForJobStart);
cancelDagNode(node);
+
+ String dagId = DagManagerUtils.generateDagId(node);
+
this.dags.get(dagId).setFlowEvent(TimingEvent.FlowTimings.FLOW_CANCELLED);
+ this.dags.get(dagId).setMessage("Flow killed because no update
received for " + timeOutForJobStart + " ms after orchestration");
+
return true;
} else {
return false;
@@ -680,10 +688,13 @@ public class DagManager extends AbstractIdleService {
if (flowSla != DagManagerUtils.NO_SLA && currentTime > flowStartTime +
flowSla) {
log.info("Flow {} exceeded the SLA of {} ms. Killing the job {}
now...",
-
node.getValue().getJobSpec().getConfig().getString(ConfigurationKeys.FLOW_NAME_KEY),
-
node.getValue().getJobSpec().getConfig().getString(ConfigurationKeys.JOB_NAME_KEY),
- flowSla);
+
node.getValue().getJobSpec().getConfig().getString(ConfigurationKeys.FLOW_NAME_KEY),
flowSla,
+
node.getValue().getJobSpec().getConfig().getString(ConfigurationKeys.JOB_NAME_KEY));
cancelDagNode(node);
+
+
this.dags.get(dagId).setFlowEvent(TimingEvent.FlowTimings.FLOW_CANCELLED);
+ this.dags.get(dagId).setMessage("Flow killed due to exceeding SLA of "
+ flowSla + " ms");
+
return true;
}
return false;
@@ -779,7 +790,9 @@ public class DagManager extends AbstractIdleService {
} catch (Exception e) {
TimingEvent jobFailedTimer = this.eventSubmitter.isPresent() ?
this.eventSubmitter.get().
getTimingEvent(TimingEvent.LauncherTimings.JOB_FAILED) : null;
- log.error("Cannot submit job: {} on Executor: {}",
DagManagerUtils.getFullyQualifiedJobName(dagNode), specExecutorUri, e);
+ String message = "Cannot submit job " +
DagManagerUtils.getFullyQualifiedJobName(dagNode) + " on executor " +
specExecutorUri;
+ log.error(message, e);
+ jobMetadata.put(TimingEvent.METADATA_MESSAGE, message + " due to " +
e.getMessage());
if (jobFailedTimer != null) {
jobFailedTimer.stop(jobMetadata);
}
@@ -863,8 +876,15 @@ public class DagManager extends AbstractIdleService {
switch (jobStatus) {
// TODO : For now treat canceled as failed, till we introduce failure
option - CANCEL
- case CANCELLED:
case FAILED:
+ dag.setMessage("Flow failed because job " + jobName + " failed");
+ if (DagManagerUtils.getFailureOption(dag) ==
FailureOption.FINISH_RUNNING) {
+ this.failedDagIdsFinishRunning.add(dagId);
+ } else {
+ this.failedDagIdsFinishAllPossible.add(dagId);
+ }
+ return Maps.newHashMap();
+ case CANCELLED:
if (DagManagerUtils.getFailureOption(dag) ==
FailureOption.FINISH_RUNNING) {
this.failedDagIdsFinishRunning.add(dagId);
} else {
diff --git
a/gobblin-service/src/main/java/org/apache/gobblin/service/modules/orchestration/DagManagerUtils.java
b/gobblin-service/src/main/java/org/apache/gobblin/service/modules/orchestration/DagManagerUtils.java
index 5290444..8ca8966 100644
---
a/gobblin-service/src/main/java/org/apache/gobblin/service/modules/orchestration/DagManagerUtils.java
+++
b/gobblin-service/src/main/java/org/apache/gobblin/service/modules/orchestration/DagManagerUtils.java
@@ -31,6 +31,7 @@ import com.typesafe.config.Config;
import org.apache.gobblin.configuration.ConfigurationKeys;
import org.apache.gobblin.metrics.event.EventSubmitter;
+import org.apache.gobblin.metrics.event.TimingEvent;
import org.apache.gobblin.runtime.api.JobSpec;
import org.apache.gobblin.runtime.api.SpecProducer;
import org.apache.gobblin.service.ExecutionStatus;
@@ -295,6 +296,14 @@ public class DagManagerUtils {
// Every dag node will contain the same flow metadata
Config config =
dag.getNodes().get(0).getValue().getJobSpec().getConfig();
Map<String, String> flowMetadata =
TimingEventUtils.getFlowMetadata(config);
+
+ if (dag.getFlowEvent() != null) {
+ flowEvent = dag.getFlowEvent();
+ }
+ if (dag.getMessage() != null) {
+ flowMetadata.put(TimingEvent.METADATA_MESSAGE, dag.getMessage());
+ }
+
eventSubmitter.get().getTimingEvent(flowEvent).stop(flowMetadata);
}
}
diff --git
a/gobblin-service/src/main/java/org/apache/gobblin/service/modules/orchestration/Orchestrator.java
b/gobblin-service/src/main/java/org/apache/gobblin/service/modules/orchestration/Orchestrator.java
index d545f9c..ecd16de 100644
---
a/gobblin-service/src/main/java/org/apache/gobblin/service/modules/orchestration/Orchestrator.java
+++
b/gobblin-service/src/main/java/org/apache/gobblin/service/modules/orchestration/Orchestrator.java
@@ -259,6 +259,13 @@ public class Orchestrator implements SpecCatalogListener,
Instrumentable {
// In this case, the current time is used as the flow executionId.
flowMetadata.putIfAbsent(TimingEvent.FlowEventConstants.FLOW_EXECUTION_ID_FIELD,
Long.toString(System.currentTimeMillis()));
+
+ String message = "Flow was not compiled successfully. It may be due to
no path being found";
+ if (!((FlowSpec) spec).getCompilationErrors().isEmpty()) {
+ message = message + " or due to " + ((FlowSpec)
spec).getCompilationErrors();
+ }
+ flowMetadata.put(TimingEvent.METADATA_MESSAGE, message);
+
TimingEvent flowCompileFailedTimer = this.eventSubmitter.isPresent() ?
this.eventSubmitter.get()
.getTimingEvent(TimingEvent.FlowTimings.FLOW_COMPILE_FAILED) :
null;
Instrumented.markMeter(this.flowOrchestrationFailedMeter);
diff --git
a/gobblin-service/src/main/java/org/apache/gobblin/service/monitoring/KafkaAvroJobStatusMonitor.java
b/gobblin-service/src/main/java/org/apache/gobblin/service/monitoring/KafkaAvroJobStatusMonitor.java
index b670a7a..c4b6070 100644
---
a/gobblin-service/src/main/java/org/apache/gobblin/service/monitoring/KafkaAvroJobStatusMonitor.java
+++
b/gobblin-service/src/main/java/org/apache/gobblin/service/monitoring/KafkaAvroJobStatusMonitor.java
@@ -148,6 +148,7 @@ public class KafkaAvroJobStatusMonitor extends
KafkaJobStatusMonitor {
properties.put(JobStatusRetriever.EVENT_NAME_FIELD,
ExecutionStatus.FAILED.name());
properties.put(TimingEvent.JOB_END_TIME,
properties.getProperty(TimingEvent.METADATA_END_TIME));
break;
+ case TimingEvent.FlowTimings.FLOW_CANCELLED:
case TimingEvent.LauncherTimings.JOB_CANCEL:
properties.put(JobStatusRetriever.EVENT_NAME_FIELD,
ExecutionStatus.CANCELLED.name());
properties.put(TimingEvent.JOB_END_TIME,
properties.getProperty(TimingEvent.METADATA_END_TIME));