This is an automated email from the ASF dual-hosted git repository.

hutran pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/incubator-gobblin.git


The following commit(s) were added to refs/heads/master by this push:
     new 151fe14  [GOBBLIN-698] Enhance logging to print job and flow details 
when a job…
151fe14 is described below

commit 151fe143a566060e5f25bbda56f1efd94fd181e1
Author: suvasude <[email protected]>
AuthorDate: Tue Mar 5 21:05:43 2019 -0800

    [GOBBLIN-698] Enhance logging to print job and flow details when a job…
    
    Closes #2569 from sv2000/logging
---
 .../service/modules/orchestration/DagManager.java  | 25 +++++++-----------
 .../modules/orchestration/DagManagerUtils.java     | 30 ++++++++++++++++++++++
 2 files changed, 39 insertions(+), 16 deletions(-)

diff --git 
a/gobblin-service/src/main/java/org/apache/gobblin/service/modules/orchestration/DagManager.java
 
b/gobblin-service/src/main/java/org/apache/gobblin/service/modules/orchestration/DagManager.java
index 3a7c232..a9c1bbf 100644
--- 
a/gobblin-service/src/main/java/org/apache/gobblin/service/modules/orchestration/DagManager.java
+++ 
b/gobblin-service/src/main/java/org/apache/gobblin/service/modules/orchestration/DagManager.java
@@ -332,14 +332,14 @@ public class DagManager extends AbstractIdleService {
         throws IOException {
       //Add Dag to the map of running dags
       String dagId = DagManagerUtils.generateDagId(dag);
-      log.info("Initializing Dag {}", dagId);
+      log.info("Initializing Dag {}", 
DagManagerUtils.getFullyQualifiedDagName(dag));
       if (this.dags.containsKey(dagId)) {
         log.warn("Already tracking a dag with dagId {}, skipping.", dagId);
         return;
       }
 
       this.dags.put(dagId, dag);
-      log.info("Dag {} - determining if any jobs are already running.", dagId);
+      log.debug("Dag {} - determining if any jobs are already running.", 
DagManagerUtils.getFullyQualifiedDagName(dag));
       //Are there any jobs already in the running state? This check is for 
Dags already running
       //before a leadership change occurs.
       for (DagNode<JobExecutionPlan> dagNode : dag.getNodes()) {
@@ -347,13 +347,13 @@ public class DagManager extends AbstractIdleService {
           addJobState(dagId, dagNode);
         }
       }
-      log.info("Dag {} submitting jobs ready for execution.", dagId);
+      log.debug("Dag {} submitting jobs ready for execution.", 
DagManagerUtils.getFullyQualifiedDagName(dag));
       //Determine the next set of jobs to run and submit them for execution
       Map<String, Set<DagNode<JobExecutionPlan>>> nextSubmitted = 
submitNext(dagId);
       for (DagNode dagNode: nextSubmitted.get(dagId)) {
         addJobState(dagId, dagNode);
       }
-      log.info("Dag {} Initialization complete.", dagId);
+      log.info("Dag {} Initialization complete.", 
DagManagerUtils.getFullyQualifiedDagName(dag));
     }
 
     /**
@@ -451,18 +451,12 @@ public class DagManager extends AbstractIdleService {
       JobSpec jobSpec = DagManagerUtils.getJobSpec(dagNode);
       Map<String, String> jobMetadata = 
TimingEventUtils.getJobMetadata(Maps.newHashMap(), jobExecutionPlan);
 
+      String specExecutorUri = 
dagNode.getValue().getSpecExecutor().getUri().toString();
+
       // Run this spec on selected executor
       SpecProducer producer = null;
       try {
         producer = DagManagerUtils.getSpecProducer(dagNode);
-        Config jobConfig = DagManagerUtils.getJobConfig(dagNode);
-        if (!jobConfig.hasPath(ConfigurationKeys.FLOW_EXECUTION_ID_KEY)) {
-          log.warn("JobSpec does not contain flowExecutionId.");
-        }
-        log.info("Submitting job: {} on executor: {}", jobSpec, producer);
-
-        log.info("Going to orchestrate JobSpec: {} on Executor: {}", jobSpec, 
producer);
-
         TimingEvent jobOrchestrationTimer = this.eventSubmitter.isPresent() ? 
this.eventSubmitter.get().
             getTimingEvent(TimingEvent.LauncherTimings.JOB_ORCHESTRATED) : 
null;
 
@@ -475,12 +469,11 @@ public class DagManager extends AbstractIdleService {
         if (jobOrchestrationTimer != null) {
           jobOrchestrationTimer.stop(jobMetadata);
         }
-
-        log.info("Orchestrated JobSpec: {} on Executor: {}", jobSpec, 
producer);
+        log.info("Orchestrated job: {} on Executor: {}", 
DagManagerUtils.getFullyQualifiedJobName(dagNode), specExecutorUri);
       } catch (Exception e) {
         TimingEvent jobFailedTimer = this.eventSubmitter.isPresent() ? 
this.eventSubmitter.get().
             getTimingEvent(TimingEvent.LauncherTimings.JOB_FAILED) : null;
-        log.error("Cannot submit job: {} on executor: {}", jobSpec, producer, 
e);
+        log.error("Cannot submit job: {} on Executor: {}", 
DagManagerUtils.getFullyQualifiedJobName(dagNode), specExecutorUri, e);
         if (jobFailedTimer != null) {
           jobFailedTimer.stop(jobMetadata);
         }
@@ -495,7 +488,7 @@ public class DagManager extends AbstractIdleService {
         throws IOException {
       Dag<JobExecutionPlan> dag = this.jobToDag.get(dagNode);
       String dagId = DagManagerUtils.generateDagId(dag);
-      String jobName = DagManagerUtils.getJobName(dagNode);
+      String jobName = DagManagerUtils.getFullyQualifiedJobName(dagNode);
       ExecutionStatus jobStatus = DagManagerUtils.getExecutionStatus(dagNode);
       log.info("Job {} of Dag {} has finished with status {}", jobName, dagId, 
jobStatus.name());
 
diff --git 
a/gobblin-service/src/main/java/org/apache/gobblin/service/modules/orchestration/DagManagerUtils.java
 
b/gobblin-service/src/main/java/org/apache/gobblin/service/modules/orchestration/DagManagerUtils.java
index 9ac51fe..0044c2b 100644
--- 
a/gobblin-service/src/main/java/org/apache/gobblin/service/modules/orchestration/DagManagerUtils.java
+++ 
b/gobblin-service/src/main/java/org/apache/gobblin/service/modules/orchestration/DagManagerUtils.java
@@ -51,10 +51,40 @@ public class DagManagerUtils {
     return Joiner.on("_").join(flowGroup, flowName, flowExecutionId);
   }
 
+  /**
+   * Returns a fully-qualified {@link Dag} name that includes: (flowGroup, 
flowName, flowExecutionId).
+   * @param dag
+   * @return fully qualified name of the underlying {@link Dag}.
+   */
+  static String getFullyQualifiedDagName(Dag<JobExecutionPlan> dag) {
+    Config jobConfig = 
dag.getStartNodes().get(0).getValue().getJobSpec().getConfig();
+    String flowGroup = ConfigUtils.getString(jobConfig, 
ConfigurationKeys.FLOW_GROUP_KEY, "");
+    String flowName = ConfigUtils.getString(jobConfig, 
ConfigurationKeys.FLOW_NAME_KEY, "");
+    Long flowExecutionId = ConfigUtils.getLong(jobConfig, 
ConfigurationKeys.FLOW_EXECUTION_ID_KEY, 0L);
+
+    return "(flowGroup: " + flowGroup + ", flowName: " + flowName + ", 
flowExecutionId: " + flowExecutionId + ")";
+  }
+
   static String getJobName(DagNode<JobExecutionPlan> dagNode) {
     return 
dagNode.getValue().getJobSpec().getConfig().getString(ConfigurationKeys.JOB_NAME_KEY);
   }
 
+  /**
+   * Returns a fully-qualified job name that includes: (flowGroup, flowName, 
flowExecutionId, jobName).
+   * @param dagNode
+   * @return a fully qualified name of the underlying job.
+   */
+  static String getFullyQualifiedJobName(DagNode<JobExecutionPlan> dagNode) {
+    Config jobConfig = dagNode.getValue().getJobSpec().getConfig();
+
+    String flowGroup = ConfigUtils.getString(jobConfig, 
ConfigurationKeys.FLOW_GROUP_KEY, "");
+    String flowName = ConfigUtils.getString(jobConfig, 
ConfigurationKeys.FLOW_NAME_KEY, "");
+    Long flowExecutionId = ConfigUtils.getLong(jobConfig, 
ConfigurationKeys.FLOW_EXECUTION_ID_KEY, 0L);
+    String jobName = ConfigUtils.getString(jobConfig, 
ConfigurationKeys.JOB_NAME_KEY, "");
+
+    return "(flowGroup: " + flowGroup + ", flowName: " + flowName + ", 
flowExecutionId: " + flowExecutionId + ", jobName: " + jobName + ")";
+  }
+
   static JobExecutionPlan getJobExecutionPlan(DagNode<JobExecutionPlan> 
dagNode) {
     return dagNode.getValue();
   }

Reply via email to