This is an automated email from the ASF dual-hosted git repository.

suvasude pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/incubator-gobblin.git


The following commit(s) were added to refs/heads/master by this push:
     new 93f9a81  [GOBBLIN-1154] Improve gaas error messages
93f9a81 is described below

commit 93f9a817ff9cc259734eec81e4cb7408aab8cc98
Author: Jack Moseley <[email protected]>
AuthorDate: Mon Jun 1 18:27:07 2020 -0700

    [GOBBLIN-1154] Improve gaas error messages
    
    Closes #2993 from jack-moseley/gaas-improved-
    errors
---
 .../apache/gobblin/metrics/event/TimingEvent.java  |  1 +
 .../org/apache/gobblin/service/FlowStatusTest.java | 23 ++++++-----------
 .../service/FlowConfigV2ResourceLocalHandler.java  | 12 ++++++---
 .../gobblin/service/FlowExecutionResource.java     | 29 ++++++++++-----------
 .../gobblin/service/modules/flowgraph/Dag.java     |  7 +++++
 .../service/modules/orchestration/DagManager.java  | 30 ++++++++++++++++++----
 .../modules/orchestration/DagManagerUtils.java     |  9 +++++++
 .../modules/orchestration/Orchestrator.java        |  7 +++++
 .../monitoring/KafkaAvroJobStatusMonitor.java      |  1 +
 9 files changed, 81 insertions(+), 38 deletions(-)

diff --git 
a/gobblin-metrics-libs/gobblin-metrics-base/src/main/java/org/apache/gobblin/metrics/event/TimingEvent.java
 
b/gobblin-metrics-libs/gobblin-metrics-base/src/main/java/org/apache/gobblin/metrics/event/TimingEvent.java
index 0664c2f..45af9ad 100644
--- 
a/gobblin-metrics-libs/gobblin-metrics-base/src/main/java/org/apache/gobblin/metrics/event/TimingEvent.java
+++ 
b/gobblin-metrics-libs/gobblin-metrics-base/src/main/java/org/apache/gobblin/metrics/event/TimingEvent.java
@@ -70,6 +70,7 @@ public class TimingEvent extends GobblinEventBuilder 
implements Closeable {
     public static final String FLOW_SUCCEEDED = "FlowSucceeded";
     public static final String FLOW_FAILED = "FlowFailed";
     public static final String FLOW_RUNNING = "FlowRunning";
+    public static final String FLOW_CANCELLED = "FlowCancelled";
   }
 
   public static class FlowEventConstants {
diff --git 
a/gobblin-restli/gobblin-flow-config-service/gobblin-flow-config-service-client/src/test/java/org/apache/gobblin/service/FlowStatusTest.java
 
b/gobblin-restli/gobblin-flow-config-service/gobblin-flow-config-service-client/src/test/java/org/apache/gobblin/service/FlowStatusTest.java
index b008f83..e133582 100644
--- 
a/gobblin-restli/gobblin-flow-config-service/gobblin-flow-config-service-client/src/test/java/org/apache/gobblin/service/FlowStatusTest.java
+++ 
b/gobblin-restli/gobblin-flow-config-service/gobblin-flow-config-service-client/src/test/java/org/apache/gobblin/service/FlowStatusTest.java
@@ -28,7 +28,6 @@ import org.testng.annotations.AfterClass;
 import org.testng.annotations.BeforeClass;
 import org.testng.annotations.Test;
 
-import com.google.common.base.Joiner;
 import com.google.common.collect.Iterators;
 import com.google.common.collect.Lists;
 import com.google.inject.Binder;
@@ -38,7 +37,6 @@ import com.google.inject.Module;
 import com.google.inject.name.Names;
 import com.linkedin.restli.server.resources.BaseResource;
 
-import org.apache.gobblin.config.ConfigBuilder;
 import org.apache.gobblin.configuration.State;
 import org.apache.gobblin.metastore.StateStore;
 import org.apache.gobblin.restli.EmbeddedRestliServer;
@@ -50,7 +48,6 @@ public class FlowStatusTest {
   private FlowStatusClient _client;
   private EmbeddedRestliServer _server;
   private List<List<org.apache.gobblin.service.monitoring.JobStatus>> 
_listOfJobStatusLists;
-  private Joiner messageJoiner;
 
   class TestJobStatusRetriever extends JobStatusRetriever {
     @Override
@@ -85,8 +82,6 @@ public class FlowStatusTest {
 
   @BeforeClass
   public void setUp() throws Exception {
-    ConfigBuilder configBuilder = ConfigBuilder.create();
-
     JobStatusRetriever jobStatusRetriever = new TestJobStatusRetriever();
     final FlowStatusGenerator flowStatusGenerator =
         
FlowStatusGenerator.builder().jobStatusRetriever(jobStatusRetriever).build();
@@ -107,8 +102,6 @@ public class FlowStatusTest {
 
     _client =
         new FlowStatusClient(String.format("http://localhost:%s/";, 
_server.getPort()));
-
-    messageJoiner = Joiner.on(FlowStatusResource.MESSAGE_SEPARATOR);
   }
 
   /**
@@ -134,7 +127,7 @@ public class FlowStatusTest {
         
.processedCount(200).jobExecutionId(2).lowWatermark("watermark:2").highWatermark("watermark:3").build();
     org.apache.gobblin.service.monitoring.JobStatus fs2 = 
org.apache.gobblin.service.monitoring.JobStatus.builder().flowGroup("fgroup1")
         
.flowName("flow1").jobGroup(JobStatusRetriever.NA_KEY).jobName(JobStatusRetriever.NA_KEY).endTime(7000L)
-        .eventName(ExecutionStatus.COMPLETE.name()).flowExecutionId(1).build();
+        
.eventName(ExecutionStatus.COMPLETE.name()).flowExecutionId(1).message("Flow 
message").build();
     List<org.apache.gobblin.service.monitoring.JobStatus> jobStatusList1 = 
Lists.newArrayList(js1, fs1);
     List<org.apache.gobblin.service.monitoring.JobStatus> jobStatusList2 = 
Lists.newArrayList(js2, js3, fs2);
     _listOfJobStatusLists = Lists.newArrayList();
@@ -148,7 +141,7 @@ public class FlowStatusTest {
     Assert.assertEquals(flowStatus.getId().getFlowName(), "flow1");
     
Assert.assertEquals(flowStatus.getExecutionStatistics().getExecutionStartTime().longValue(),
 1L);
     
Assert.assertEquals(flowStatus.getExecutionStatistics().getExecutionEndTime().longValue(),
 7000L);
-    Assert.assertEquals(flowStatus.getMessage(), 
messageJoiner.join(js2.getMessage(), js3.getMessage()));
+    Assert.assertEquals(flowStatus.getMessage(), fs2.getMessage());
     Assert.assertEquals(flowStatus.getExecutionStatus(), 
ExecutionStatus.COMPLETE);
 
     JobStatusArray jobStatuses = flowStatus.getJobStatuses();
@@ -189,7 +182,7 @@ public class FlowStatusTest {
         
.processedCount(200).jobExecutionId(2).lowWatermark("watermark:2").highWatermark("watermark:3").build();
     org.apache.gobblin.service.monitoring.JobStatus fs1 = 
org.apache.gobblin.service.monitoring.JobStatus.builder().flowGroup("fgroup1")
         
.flowName("flow1").jobGroup(JobStatusRetriever.NA_KEY).jobName(JobStatusRetriever.NA_KEY).endTime(7000L)
-        .eventName(ExecutionStatus.COMPLETE.name()).flowExecutionId(0).build();
+        
.eventName(ExecutionStatus.COMPLETE.name()).flowExecutionId(0).message("Flow 
message").build();
     List<org.apache.gobblin.service.monitoring.JobStatus> jobStatusList = 
Lists.newArrayList(js1, js2, fs1);
     _listOfJobStatusLists = Lists.newArrayList();
     _listOfJobStatusLists.add(jobStatusList);
@@ -201,7 +194,7 @@ public class FlowStatusTest {
     Assert.assertEquals(flowStatus.getId().getFlowName(), "flow1");
     
Assert.assertEquals(flowStatus.getExecutionStatistics().getExecutionStartTime().longValue(),
 0L);
     
Assert.assertEquals(flowStatus.getExecutionStatistics().getExecutionEndTime().longValue(),
 7000L);
-    Assert.assertEquals(flowStatus.getMessage(), 
messageJoiner.join(js1.getMessage(), js2.getMessage()));
+    Assert.assertEquals(flowStatus.getMessage(), fs1.getMessage());
     Assert.assertEquals(flowStatus.getExecutionStatus(), 
ExecutionStatus.COMPLETE);
 
     JobStatusArray jobStatuses = flowStatus.getJobStatuses();
@@ -232,7 +225,7 @@ public class FlowStatusTest {
         
.processedCount(200).jobExecutionId(2).lowWatermark("watermark:2").highWatermark("watermark:3").build();
     org.apache.gobblin.service.monitoring.JobStatus fs1 = 
org.apache.gobblin.service.monitoring.JobStatus.builder().flowGroup("fgroup1")
         
.flowName("flow1").jobGroup(JobStatusRetriever.NA_KEY).jobName(JobStatusRetriever.NA_KEY)
-        .eventName(ExecutionStatus.RUNNING.name()).flowExecutionId(0).build();
+        
.eventName(ExecutionStatus.RUNNING.name()).flowExecutionId(0).message("Flow 
message").build();
     List<org.apache.gobblin.service.monitoring.JobStatus> jobStatusList = 
Lists.newArrayList(js1, js2, fs1);
     _listOfJobStatusLists = Lists.newArrayList();
     _listOfJobStatusLists.add(jobStatusList);
@@ -244,7 +237,7 @@ public class FlowStatusTest {
     Assert.assertEquals(flowStatus.getId().getFlowName(), "flow1");
     
Assert.assertEquals(flowStatus.getExecutionStatistics().getExecutionStartTime().longValue(),
 0L);
     
Assert.assertEquals(flowStatus.getExecutionStatistics().getExecutionEndTime().longValue(),
 0L);
-    Assert.assertEquals(flowStatus.getMessage(), 
messageJoiner.join(js1.getMessage(), js2.getMessage()));
+    Assert.assertEquals(flowStatus.getMessage(), fs1.getMessage());
     Assert.assertEquals(flowStatus.getExecutionStatus(), 
ExecutionStatus.RUNNING);
 
     JobStatusArray jobStatuses = flowStatus.getJobStatuses();
@@ -275,7 +268,7 @@ public class FlowStatusTest {
         
.processedCount(200).jobExecutionId(2).lowWatermark("watermark:2").highWatermark("watermark:3").build();
     org.apache.gobblin.service.monitoring.JobStatus fs1 = 
org.apache.gobblin.service.monitoring.JobStatus.builder().flowGroup("fgroup1")
         
.flowName("flow1").jobGroup(JobStatusRetriever.NA_KEY).jobName(JobStatusRetriever.NA_KEY).endTime(7000L)
-        .eventName(ExecutionStatus.FAILED.name()).flowExecutionId(0).build();
+        
.eventName(ExecutionStatus.FAILED.name()).flowExecutionId(0).message("Flow 
message").build();
     List<org.apache.gobblin.service.monitoring.JobStatus> jobStatusList = 
Lists.newArrayList(js1, js2, fs1);
     _listOfJobStatusLists = Lists.newArrayList();
     _listOfJobStatusLists.add(jobStatusList);
@@ -287,7 +280,7 @@ public class FlowStatusTest {
     Assert.assertEquals(flowStatus.getId().getFlowName(), "flow1");
     
Assert.assertEquals(flowStatus.getExecutionStatistics().getExecutionStartTime().longValue(),
 0L);
     
Assert.assertEquals(flowStatus.getExecutionStatistics().getExecutionEndTime().longValue(),
 7000L);
-    Assert.assertEquals(flowStatus.getMessage(), 
messageJoiner.join(js1.getMessage(), js2.getMessage()));
+    Assert.assertEquals(flowStatus.getMessage(), fs1.getMessage());
     Assert.assertEquals(flowStatus.getExecutionStatus(), 
ExecutionStatus.FAILED);
 
     JobStatusArray jobStatuses = flowStatus.getJobStatuses();
diff --git 
a/gobblin-restli/gobblin-flow-config-service/gobblin-flow-config-service-server/src/main/java/org/apache/gobblin/service/FlowConfigV2ResourceLocalHandler.java
 
b/gobblin-restli/gobblin-flow-config-service/gobblin-flow-config-service-server/src/main/java/org/apache/gobblin/service/FlowConfigV2ResourceLocalHandler.java
index efdb990..0990338 100644
--- 
a/gobblin-restli/gobblin-flow-config-service/gobblin-flow-config-service-server/src/main/java/org/apache/gobblin/service/FlowConfigV2ResourceLocalHandler.java
+++ 
b/gobblin-restli/gobblin-flow-config-service/gobblin-flow-config-service-server/src/main/java/org/apache/gobblin/service/FlowConfigV2ResourceLocalHandler.java
@@ -24,6 +24,7 @@ import com.linkedin.restli.common.ComplexResourceKey;
 import com.linkedin.restli.common.HttpStatus;
 import com.linkedin.restli.common.PatchRequest;
 import com.linkedin.restli.server.CreateKVResponse;
+import com.linkedin.restli.server.RestLiServiceException;
 import com.linkedin.restli.server.UpdateResponse;
 
 import lombok.extern.slf4j.Slf4j;
@@ -62,8 +63,9 @@ public class FlowConfigV2ResourceLocalHandler extends 
FlowConfigResourceLocalHan
 
     // Return conflict and take no action if flowSpec has already been created
     if (this.flowCatalog.exists(flowSpec.getUri())) {
-      log.warn("Flowspec with URI {} already exists, no action will be taken", 
flowSpec.getUri());
-      return new CreateKVResponse(new ComplexResourceKey<>(flowConfig.getId(), 
flowStatusId), flowConfig, HttpStatus.S_409_CONFLICT);
+      log.warn("FlowSpec with URI {} already exists, no action will be taken", 
flowSpec.getUri());
+      throw new RestLiServiceException(HttpStatus.S_409_CONFLICT,
+          "FlowSpec with URI " + flowSpec.getUri() + " already exists, no 
action will be taken");
     }
 
     Map<String, AddSpecResponse> responseMap = this.flowCatalog.put(flowSpec, 
triggerListener);
@@ -81,7 +83,11 @@ public class FlowConfigV2ResourceLocalHandler extends 
FlowConfigResourceLocalHan
     } else if 
(Boolean.parseBoolean(responseMap.getOrDefault(ServiceConfigKeys.COMPILATION_SUCCESSFUL,
 new AddSpecResponse<>("false")).getValue().toString())) {
       httpStatus = HttpStatus.S_201_CREATED;
     } else {
-      httpStatus = HttpStatus.S_400_BAD_REQUEST;
+      String message = "Flow was not compiled successfully. It may be due to 
no path being found";
+      if (!flowSpec.getCompilationErrors().isEmpty()) {
+        message = message + " or due to " + flowSpec.getCompilationErrors();
+      }
+      throw new RestLiServiceException(HttpStatus.S_400_BAD_REQUEST, message);
     }
 
     return new CreateKVResponse(new ComplexResourceKey<>(flowConfig.getId(), 
flowStatusId), flowConfig, httpStatus);
diff --git 
a/gobblin-restli/gobblin-flow-config-service/gobblin-flow-config-service-server/src/main/java/org/apache/gobblin/service/FlowExecutionResource.java
 
b/gobblin-restli/gobblin-flow-config-service/gobblin-flow-config-service-server/src/main/java/org/apache/gobblin/service/FlowExecutionResource.java
index fcdf6a1..afd55a2 100644
--- 
a/gobblin-restli/gobblin-flow-config-service/gobblin-flow-config-service-server/src/main/java/org/apache/gobblin/service/FlowExecutionResource.java
+++ 
b/gobblin-restli/gobblin-flow-config-service/gobblin-flow-config-service-server/src/main/java/org/apache/gobblin/service/FlowExecutionResource.java
@@ -32,6 +32,7 @@ import com.linkedin.restli.common.ComplexResourceKey;
 import com.linkedin.restli.common.EmptyRecord;
 import com.linkedin.restli.common.HttpStatus;
 import com.linkedin.restli.server.PagingContext;
+import com.linkedin.restli.server.RestLiServiceException;
 import com.linkedin.restli.server.UpdateResponse;
 import com.linkedin.restli.server.annotations.Context;
 import com.linkedin.restli.server.annotations.Finder;
@@ -65,8 +66,12 @@ public class FlowExecutionResource extends 
ComplexKeyResourceTemplate<FlowStatus
    */
   @Override
   public FlowExecution get(ComplexResourceKey<FlowStatusId, EmptyRecord> key) {
-    // this returns null to raise a 404 error if flowStatus is null
-    return convertFlowStatus(getFlowStatusFromGenerator(key, 
this._flowStatusGenerator));
+    FlowExecution flowExecution = 
convertFlowStatus(getFlowStatusFromGenerator(key, this._flowStatusGenerator));
+    if (flowExecution == null) {
+      throw new RestLiServiceException(HttpStatus.S_404_NOT_FOUND, "No flow 
execution found for flowStatusId " + key.getKey()
+      + ". The flowStatusId may be incorrect, or the flow execution may have 
been cleaned up.");
+    }
+    return flowExecution;
   }
 
   @Finder("latestFlowExecution")
@@ -78,8 +83,8 @@ public class FlowExecutionResource extends 
ComplexKeyResourceTemplate<FlowStatus
       return 
flowStatuses.stream().map(FlowExecutionResource::convertFlowStatus).collect(Collectors.toList());
     }
 
-    // will return 404 status code
-    return null;
+    throw new RestLiServiceException(HttpStatus.S_404_NOT_FOUND, "No flow 
execution found for flowId " + flowId
+        + ". The flowId may be incorrect, or the flow execution may have been 
cleaned up.");
   }
 
   /**
@@ -135,7 +140,7 @@ public class FlowExecutionResource extends 
ComplexKeyResourceTemplate<FlowStatus
     long flowEndTime = 0L;
     ExecutionStatus flowExecutionStatus = ExecutionStatus.$UNKNOWN;
 
-    StringBuffer flowMessagesStringBuffer = new StringBuffer();
+    String flowMessage = "";
 
     while (jobStatusIter.hasNext()) {
       org.apache.gobblin.service.monitoring.JobStatus queriedJobStatus = 
jobStatusIter.next();
@@ -144,6 +149,9 @@ public class FlowExecutionResource extends 
ComplexKeyResourceTemplate<FlowStatus
       if (JobStatusRetriever.isFlowStatus(queriedJobStatus)) {
         flowEndTime = queriedJobStatus.getEndTime();
         flowExecutionStatus = 
ExecutionStatus.valueOf(queriedJobStatus.getEventName());
+        if (queriedJobStatus.getMessage() != null) {
+          flowMessage = queriedJobStatus.getMessage();
+        }
         continue;
       }
 
@@ -163,25 +171,16 @@ public class FlowExecutionResource extends 
ComplexKeyResourceTemplate<FlowStatus
               setHighWatermark(queriedJobStatus.getHighWatermark()));
 
       jobStatusArray.add(jobStatus);
-
-      if (!queriedJobStatus.getMessage().isEmpty()) {
-        flowMessagesStringBuffer.append(queriedJobStatus.getMessage());
-        flowMessagesStringBuffer.append(MESSAGE_SEPARATOR);
-      }
     }
 
     jobStatusArray.sort(Comparator.comparing((JobStatus js) -> 
js.getExecutionStatistics().getExecutionStartTime()));
 
-    String flowMessages = flowMessagesStringBuffer.length() > 0 ?
-        flowMessagesStringBuffer.substring(0, 
flowMessagesStringBuffer.length() -
-            MESSAGE_SEPARATOR.length()) : StringUtils.EMPTY;
-
     return new FlowExecution()
         .setId(new 
FlowStatusId().setFlowGroup(flowId.getFlowGroup()).setFlowName(flowId.getFlowName())
             .setFlowExecutionId(monitoringFlowStatus.getFlowExecutionId()))
         .setExecutionStatistics(new 
FlowStatistics().setExecutionStartTime(getFlowStartTime(monitoringFlowStatus))
             .setExecutionEndTime(flowEndTime))
-        .setMessage(flowMessages)
+        .setMessage(flowMessage)
         .setExecutionStatus(flowExecutionStatus)
         .setJobStatuses(jobStatusArray);
   }
diff --git 
a/gobblin-service/src/main/java/org/apache/gobblin/service/modules/flowgraph/Dag.java
 
b/gobblin-service/src/main/java/org/apache/gobblin/service/modules/flowgraph/Dag.java
index 745a15d..a1f0d2c 100644
--- 
a/gobblin-service/src/main/java/org/apache/gobblin/service/modules/flowgraph/Dag.java
+++ 
b/gobblin-service/src/main/java/org/apache/gobblin/service/modules/flowgraph/Dag.java
@@ -31,8 +31,10 @@ import com.google.common.collect.Lists;
 import com.google.common.collect.Sets;
 
 import lombok.Getter;
+import lombok.Setter;
 
 import org.apache.gobblin.annotation.Alpha;
+import org.apache.gobblin.service.ExecutionStatus;
 
 
 /**
@@ -48,6 +50,11 @@ public class Dag<T> {
   private Map<DagNode, List<DagNode<T>>> parentChildMap;
   private List<DagNode<T>> nodes;
 
+  @Setter
+  private String message;
+  @Setter
+  private String flowEvent;
+
   public Dag(List<DagNode<T>> dagNodes) {
     this.nodes = dagNodes;
     //Build dag
diff --git 
a/gobblin-service/src/main/java/org/apache/gobblin/service/modules/orchestration/DagManager.java
 
b/gobblin-service/src/main/java/org/apache/gobblin/service/modules/orchestration/DagManager.java
index f323ee3..db1145a 100644
--- 
a/gobblin-service/src/main/java/org/apache/gobblin/service/modules/orchestration/DagManager.java
+++ 
b/gobblin-service/src/main/java/org/apache/gobblin/service/modules/orchestration/DagManager.java
@@ -476,6 +476,9 @@ public class DagManager extends AbstractIdleService {
         for (DagNode<JobExecutionPlan> dagNodeToCancel : dagNodesToCancel) {
           cancelDagNode(dagNodeToCancel);
         }
+
+        
this.dags.get(dagToCancel).setFlowEvent(TimingEvent.FlowTimings.FLOW_CANCELLED);
+        this.dags.get(dagToCancel).setMessage("Flow killed by request");
       } else {
         log.warn("Did not find Dag with id {}, it might be already 
cancelled/finished.", dagToCancel);
       }
@@ -638,6 +641,11 @@ public class DagManager extends AbstractIdleService {
             DagManagerUtils.getFullyQualifiedDagName(node),
             timeOutForJobStart);
         cancelDagNode(node);
+
+        String dagId = DagManagerUtils.generateDagId(node);
+        
this.dags.get(dagId).setFlowEvent(TimingEvent.FlowTimings.FLOW_CANCELLED);
+        this.dags.get(dagId).setMessage("Flow killed because no update 
received for " + timeOutForJobStart + " ms after orchestration");
+
         return true;
       } else {
         return false;
@@ -680,10 +688,13 @@ public class DagManager extends AbstractIdleService {
 
       if (flowSla != DagManagerUtils.NO_SLA && currentTime > flowStartTime + 
flowSla) {
         log.info("Flow {} exceeded the SLA of {} ms. Killing the job {} 
now...",
-            
node.getValue().getJobSpec().getConfig().getString(ConfigurationKeys.FLOW_NAME_KEY),
-            
node.getValue().getJobSpec().getConfig().getString(ConfigurationKeys.JOB_NAME_KEY),
-            flowSla);
+            
node.getValue().getJobSpec().getConfig().getString(ConfigurationKeys.FLOW_NAME_KEY),
 flowSla,
+            
node.getValue().getJobSpec().getConfig().getString(ConfigurationKeys.JOB_NAME_KEY));
         cancelDagNode(node);
+
+        
this.dags.get(dagId).setFlowEvent(TimingEvent.FlowTimings.FLOW_CANCELLED);
+        this.dags.get(dagId).setMessage("Flow killed due to exceeding SLA of " 
+ flowSla + " ms");
+
         return true;
       }
       return false;
@@ -779,7 +790,9 @@ public class DagManager extends AbstractIdleService {
       } catch (Exception e) {
         TimingEvent jobFailedTimer = this.eventSubmitter.isPresent() ? 
this.eventSubmitter.get().
             getTimingEvent(TimingEvent.LauncherTimings.JOB_FAILED) : null;
-        log.error("Cannot submit job: {} on Executor: {}", 
DagManagerUtils.getFullyQualifiedJobName(dagNode), specExecutorUri, e);
+        String message = "Cannot submit job " + 
DagManagerUtils.getFullyQualifiedJobName(dagNode) + " on executor " + 
specExecutorUri;
+        log.error(message, e);
+        jobMetadata.put(TimingEvent.METADATA_MESSAGE, message + " due to " + 
e.getMessage());
         if (jobFailedTimer != null) {
           jobFailedTimer.stop(jobMetadata);
         }
@@ -863,8 +876,15 @@ public class DagManager extends AbstractIdleService {
 
       switch (jobStatus) {
         // TODO : For now treat canceled as failed, till we introduce failure 
option - CANCEL
-        case CANCELLED:
         case FAILED:
+          dag.setMessage("Flow failed because job " + jobName + " failed");
+          if (DagManagerUtils.getFailureOption(dag) == 
FailureOption.FINISH_RUNNING) {
+            this.failedDagIdsFinishRunning.add(dagId);
+          } else {
+            this.failedDagIdsFinishAllPossible.add(dagId);
+          }
+          return Maps.newHashMap();
+        case CANCELLED:
           if (DagManagerUtils.getFailureOption(dag) == 
FailureOption.FINISH_RUNNING) {
             this.failedDagIdsFinishRunning.add(dagId);
           } else {
diff --git 
a/gobblin-service/src/main/java/org/apache/gobblin/service/modules/orchestration/DagManagerUtils.java
 
b/gobblin-service/src/main/java/org/apache/gobblin/service/modules/orchestration/DagManagerUtils.java
index 5290444..8ca8966 100644
--- 
a/gobblin-service/src/main/java/org/apache/gobblin/service/modules/orchestration/DagManagerUtils.java
+++ 
b/gobblin-service/src/main/java/org/apache/gobblin/service/modules/orchestration/DagManagerUtils.java
@@ -31,6 +31,7 @@ import com.typesafe.config.Config;
 
 import org.apache.gobblin.configuration.ConfigurationKeys;
 import org.apache.gobblin.metrics.event.EventSubmitter;
+import org.apache.gobblin.metrics.event.TimingEvent;
 import org.apache.gobblin.runtime.api.JobSpec;
 import org.apache.gobblin.runtime.api.SpecProducer;
 import org.apache.gobblin.service.ExecutionStatus;
@@ -295,6 +296,14 @@ public class DagManagerUtils {
       // Every dag node will contain the same flow metadata
       Config config = 
dag.getNodes().get(0).getValue().getJobSpec().getConfig();
       Map<String, String> flowMetadata = 
TimingEventUtils.getFlowMetadata(config);
+
+      if (dag.getFlowEvent() != null) {
+        flowEvent = dag.getFlowEvent();
+      }
+      if (dag.getMessage() != null) {
+        flowMetadata.put(TimingEvent.METADATA_MESSAGE, dag.getMessage());
+      }
+
       eventSubmitter.get().getTimingEvent(flowEvent).stop(flowMetadata);
     }
   }
diff --git 
a/gobblin-service/src/main/java/org/apache/gobblin/service/modules/orchestration/Orchestrator.java
 
b/gobblin-service/src/main/java/org/apache/gobblin/service/modules/orchestration/Orchestrator.java
index d545f9c..ecd16de 100644
--- 
a/gobblin-service/src/main/java/org/apache/gobblin/service/modules/orchestration/Orchestrator.java
+++ 
b/gobblin-service/src/main/java/org/apache/gobblin/service/modules/orchestration/Orchestrator.java
@@ -259,6 +259,13 @@ public class Orchestrator implements SpecCatalogListener, 
Instrumentable {
         // In this case, the current time is used as the flow executionId.
         
flowMetadata.putIfAbsent(TimingEvent.FlowEventConstants.FLOW_EXECUTION_ID_FIELD,
             Long.toString(System.currentTimeMillis()));
+
+        String message = "Flow was not compiled successfully. It may be due to 
no path being found";
+        if (!((FlowSpec) spec).getCompilationErrors().isEmpty()) {
+          message = message + " or due to " + ((FlowSpec) 
spec).getCompilationErrors();
+        }
+        flowMetadata.put(TimingEvent.METADATA_MESSAGE, message);
+
         TimingEvent flowCompileFailedTimer = this.eventSubmitter.isPresent() ? 
this.eventSubmitter.get()
             .getTimingEvent(TimingEvent.FlowTimings.FLOW_COMPILE_FAILED) : 
null;
         Instrumented.markMeter(this.flowOrchestrationFailedMeter);
diff --git 
a/gobblin-service/src/main/java/org/apache/gobblin/service/monitoring/KafkaAvroJobStatusMonitor.java
 
b/gobblin-service/src/main/java/org/apache/gobblin/service/monitoring/KafkaAvroJobStatusMonitor.java
index b670a7a..c4b6070 100644
--- 
a/gobblin-service/src/main/java/org/apache/gobblin/service/monitoring/KafkaAvroJobStatusMonitor.java
+++ 
b/gobblin-service/src/main/java/org/apache/gobblin/service/monitoring/KafkaAvroJobStatusMonitor.java
@@ -148,6 +148,7 @@ public class KafkaAvroJobStatusMonitor extends 
KafkaJobStatusMonitor {
         properties.put(JobStatusRetriever.EVENT_NAME_FIELD, 
ExecutionStatus.FAILED.name());
         properties.put(TimingEvent.JOB_END_TIME, 
properties.getProperty(TimingEvent.METADATA_END_TIME));
         break;
+      case TimingEvent.FlowTimings.FLOW_CANCELLED:
       case TimingEvent.LauncherTimings.JOB_CANCEL:
         properties.put(JobStatusRetriever.EVENT_NAME_FIELD, 
ExecutionStatus.CANCELLED.name());
         properties.put(TimingEvent.JOB_END_TIME, 
properties.getProperty(TimingEvent.METADATA_END_TIME));

Reply via email to