This is an automated email from the ASF dual-hosted git repository.
hutran pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/incubator-gobblin.git
The following commit(s) were added to refs/heads/master by this push:
new 151fe14 [GOBBLIN-698] Enhance logging to print job and flow details
when a job…
151fe14 is described below
commit 151fe143a566060e5f25bbda56f1efd94fd181e1
Author: suvasude <[email protected]>
AuthorDate: Tue Mar 5 21:05:43 2019 -0800
[GOBBLIN-698] Enhance logging to print job and flow details when a job…
Closes #2569 from sv2000/logging
---
.../service/modules/orchestration/DagManager.java | 25 +++++++-----------
.../modules/orchestration/DagManagerUtils.java | 30 ++++++++++++++++++++++
2 files changed, 39 insertions(+), 16 deletions(-)
diff --git
a/gobblin-service/src/main/java/org/apache/gobblin/service/modules/orchestration/DagManager.java
b/gobblin-service/src/main/java/org/apache/gobblin/service/modules/orchestration/DagManager.java
index 3a7c232..a9c1bbf 100644
---
a/gobblin-service/src/main/java/org/apache/gobblin/service/modules/orchestration/DagManager.java
+++
b/gobblin-service/src/main/java/org/apache/gobblin/service/modules/orchestration/DagManager.java
@@ -332,14 +332,14 @@ public class DagManager extends AbstractIdleService {
throws IOException {
//Add Dag to the map of running dags
String dagId = DagManagerUtils.generateDagId(dag);
- log.info("Initializing Dag {}", dagId);
+ log.info("Initializing Dag {}",
DagManagerUtils.getFullyQualifiedDagName(dag));
if (this.dags.containsKey(dagId)) {
log.warn("Already tracking a dag with dagId {}, skipping.", dagId);
return;
}
this.dags.put(dagId, dag);
- log.info("Dag {} - determining if any jobs are already running.", dagId);
+ log.debug("Dag {} - determining if any jobs are already running.",
DagManagerUtils.getFullyQualifiedDagName(dag));
//Are there any jobs already in the running state? This check is for
Dags already running
//before a leadership change occurs.
for (DagNode<JobExecutionPlan> dagNode : dag.getNodes()) {
@@ -347,13 +347,13 @@ public class DagManager extends AbstractIdleService {
addJobState(dagId, dagNode);
}
}
- log.info("Dag {} submitting jobs ready for execution.", dagId);
+ log.debug("Dag {} submitting jobs ready for execution.",
DagManagerUtils.getFullyQualifiedDagName(dag));
//Determine the next set of jobs to run and submit them for execution
Map<String, Set<DagNode<JobExecutionPlan>>> nextSubmitted =
submitNext(dagId);
for (DagNode dagNode: nextSubmitted.get(dagId)) {
addJobState(dagId, dagNode);
}
- log.info("Dag {} Initialization complete.", dagId);
+ log.info("Dag {} Initialization complete.",
DagManagerUtils.getFullyQualifiedDagName(dag));
}
/**
@@ -451,18 +451,12 @@ public class DagManager extends AbstractIdleService {
JobSpec jobSpec = DagManagerUtils.getJobSpec(dagNode);
Map<String, String> jobMetadata =
TimingEventUtils.getJobMetadata(Maps.newHashMap(), jobExecutionPlan);
+ String specExecutorUri =
dagNode.getValue().getSpecExecutor().getUri().toString();
+
// Run this spec on selected executor
SpecProducer producer = null;
try {
producer = DagManagerUtils.getSpecProducer(dagNode);
- Config jobConfig = DagManagerUtils.getJobConfig(dagNode);
- if (!jobConfig.hasPath(ConfigurationKeys.FLOW_EXECUTION_ID_KEY)) {
- log.warn("JobSpec does not contain flowExecutionId.");
- }
- log.info("Submitting job: {} on executor: {}", jobSpec, producer);
-
- log.info("Going to orchestrate JobSpec: {} on Executor: {}", jobSpec,
producer);
-
TimingEvent jobOrchestrationTimer = this.eventSubmitter.isPresent() ?
this.eventSubmitter.get().
getTimingEvent(TimingEvent.LauncherTimings.JOB_ORCHESTRATED) :
null;
@@ -475,12 +469,11 @@ public class DagManager extends AbstractIdleService {
if (jobOrchestrationTimer != null) {
jobOrchestrationTimer.stop(jobMetadata);
}
-
- log.info("Orchestrated JobSpec: {} on Executor: {}", jobSpec,
producer);
+ log.info("Orchestrated job: {} on Executor: {}",
DagManagerUtils.getFullyQualifiedJobName(dagNode), specExecutorUri);
} catch (Exception e) {
TimingEvent jobFailedTimer = this.eventSubmitter.isPresent() ?
this.eventSubmitter.get().
getTimingEvent(TimingEvent.LauncherTimings.JOB_FAILED) : null;
- log.error("Cannot submit job: {} on executor: {}", jobSpec, producer,
e);
+ log.error("Cannot submit job: {} on Executor: {}",
DagManagerUtils.getFullyQualifiedJobName(dagNode), specExecutorUri, e);
if (jobFailedTimer != null) {
jobFailedTimer.stop(jobMetadata);
}
@@ -495,7 +488,7 @@ public class DagManager extends AbstractIdleService {
throws IOException {
Dag<JobExecutionPlan> dag = this.jobToDag.get(dagNode);
String dagId = DagManagerUtils.generateDagId(dag);
- String jobName = DagManagerUtils.getJobName(dagNode);
+ String jobName = DagManagerUtils.getFullyQualifiedJobName(dagNode);
ExecutionStatus jobStatus = DagManagerUtils.getExecutionStatus(dagNode);
log.info("Job {} of Dag {} has finished with status {}", jobName, dagId,
jobStatus.name());
diff --git
a/gobblin-service/src/main/java/org/apache/gobblin/service/modules/orchestration/DagManagerUtils.java
b/gobblin-service/src/main/java/org/apache/gobblin/service/modules/orchestration/DagManagerUtils.java
index 9ac51fe..0044c2b 100644
---
a/gobblin-service/src/main/java/org/apache/gobblin/service/modules/orchestration/DagManagerUtils.java
+++
b/gobblin-service/src/main/java/org/apache/gobblin/service/modules/orchestration/DagManagerUtils.java
@@ -51,10 +51,40 @@ public class DagManagerUtils {
return Joiner.on("_").join(flowGroup, flowName, flowExecutionId);
}
+ /**
+ * Returns a fully-qualified {@link Dag} name that includes: (flowGroup,
flowName, flowExecutionId).
+ * @param dag
+ * @return fully qualified name of the underlying {@link Dag}.
+ */
+ static String getFullyQualifiedDagName(Dag<JobExecutionPlan> dag) {
+ Config jobConfig =
dag.getStartNodes().get(0).getValue().getJobSpec().getConfig();
+ String flowGroup = ConfigUtils.getString(jobConfig,
ConfigurationKeys.FLOW_GROUP_KEY, "");
+ String flowName = ConfigUtils.getString(jobConfig,
ConfigurationKeys.FLOW_NAME_KEY, "");
+ Long flowExecutionId = ConfigUtils.getLong(jobConfig,
ConfigurationKeys.FLOW_EXECUTION_ID_KEY, 0L);
+
+ return "(flowGroup: " + flowGroup + ", flowName: " + flowName + ",
flowExecutionId: " + flowExecutionId + ")";
+ }
+
static String getJobName(DagNode<JobExecutionPlan> dagNode) {
return
dagNode.getValue().getJobSpec().getConfig().getString(ConfigurationKeys.JOB_NAME_KEY);
}
+ /**
+ * Returns a fully-qualified job name that includes: (flowGroup, flowName,
flowExecutionId, jobName).
+ * @param dagNode
+ * @return a fully qualified name of the underlying job.
+ */
+ static String getFullyQualifiedJobName(DagNode<JobExecutionPlan> dagNode) {
+ Config jobConfig = dagNode.getValue().getJobSpec().getConfig();
+
+ String flowGroup = ConfigUtils.getString(jobConfig,
ConfigurationKeys.FLOW_GROUP_KEY, "");
+ String flowName = ConfigUtils.getString(jobConfig,
ConfigurationKeys.FLOW_NAME_KEY, "");
+ Long flowExecutionId = ConfigUtils.getLong(jobConfig,
ConfigurationKeys.FLOW_EXECUTION_ID_KEY, 0L);
+ String jobName = ConfigUtils.getString(jobConfig,
ConfigurationKeys.JOB_NAME_KEY, "");
+
+ return "(flowGroup: " + flowGroup + ", flowName: " + flowName + ",
flowExecutionId: " + flowExecutionId + ", jobName: " + jobName + ")";
+ }
+
static JobExecutionPlan getJobExecutionPlan(DagNode<JobExecutionPlan>
dagNode) {
return dagNode.getValue();
}