This is an automated email from the ASF dual-hosted git repository.
suvasude pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/incubator-gobblin.git
The following commit(s) were added to refs/heads/master by this push:
new 8ef1159 [GOBBLIN-792] submit a GobblinTrackingEvent when jobs are
compiled but not yet orchestrated
8ef1159 is described below
commit 8ef115992eaa5eb4e1c47a2e0eab726f387ea0b8
Author: Arjun <[email protected]>
AuthorDate: Fri Jul 19 14:36:22 2019 -0700
[GOBBLIN-792] submit a GobblinTrackingEvent when jobs are compiled but not
yet orchestrated
Closes #2657 from
arjun4084346/preOrchestrationEvent
---
.../apache/gobblin/metrics/event/TimingEvent.java | 1 +
.../apache/gobblin/service/ExecutionStatus.pdsc | 5 +++--
...ache.gobblin.service.flowstatuses.snapshot.json | 5 +++--
.../apache/gobblin/service/FlowStatusResource.java | 8 +++++++-
.../service/modules/orchestration/DagManager.java | 23 ++++++++++++++++++++++
.../modules/orchestration/DagManagerUtils.java | 2 +-
.../service/modules/spec/JobExecutionPlan.java | 2 +-
.../monitoring/KafkaAvroJobStatusMonitor.java | 3 +++
8 files changed, 42 insertions(+), 7 deletions(-)
diff --git
a/gobblin-metrics-libs/gobblin-metrics-base/src/main/java/org/apache/gobblin/metrics/event/TimingEvent.java
b/gobblin-metrics-libs/gobblin-metrics-base/src/main/java/org/apache/gobblin/metrics/event/TimingEvent.java
index d699238..db30229 100644
---
a/gobblin-metrics-libs/gobblin-metrics-base/src/main/java/org/apache/gobblin/metrics/event/TimingEvent.java
+++
b/gobblin-metrics-libs/gobblin-metrics-base/src/main/java/org/apache/gobblin/metrics/event/TimingEvent.java
@@ -39,6 +39,7 @@ public class TimingEvent extends GobblinEventBuilder
implements Closeable {
public static final String FULL_JOB_EXECUTION = "FullJobExecutionTimer";
public static final String WORK_UNITS_CREATION = "WorkUnitsCreationTimer";
public static final String WORK_UNITS_PREPARATION =
"WorkUnitsPreparationTimer";
+ public static final String JOB_PENDING = "JobPending";
public static final String JOB_ORCHESTRATED = "JobOrchestrated";
public static final String JOB_PREPARE = "JobPrepareTimer";
public static final String JOB_START = "JobStartTimer";
diff --git
a/gobblin-restli/gobblin-flow-config-service/gobblin-flow-config-service-api/src/main/pegasus/org/apache/gobblin/service/ExecutionStatus.pdsc
b/gobblin-restli/gobblin-flow-config-service/gobblin-flow-config-service-api/src/main/pegasus/org/apache/gobblin/service/ExecutionStatus.pdsc
index c87c484..5a9635e 100644
---
a/gobblin-restli/gobblin-flow-config-service/gobblin-flow-config-service-api/src/main/pegasus/org/apache/gobblin/service/ExecutionStatus.pdsc
+++
b/gobblin-restli/gobblin-flow-config-service/gobblin-flow-config-service-api/src/main/pegasus/org/apache/gobblin/service/ExecutionStatus.pdsc
@@ -3,13 +3,14 @@
"name" : "ExecutionStatus",
"namespace" : "org.apache.gobblin.service",
"doc" : "Execution status for a flow or job",
- "symbols" : ["COMPILED", "ORCHESTRATED", "RUNNING", "FAILED", "CANCELLED",
"COMPLETE"],
+ "symbols" : ["COMPILED", "ORCHESTRATED", "RUNNING", "FAILED", "CANCELLED",
"COMPLETE", "PENDING"],
"symbolDocs" : {
"COMPILED":"Flow compiled to jobs.",
"ORCHESTRATED":"Job(s) orchestrated to spec executors.",
"RUNNING": "Flow or job is currently executing",
"FAILED":"Flow or job failed",
"CANCELLED":"Flow cancelled.",
- "COMPLETE":"Flow or job completed execution"
+ "COMPLETE":"Flow or job completed execution",
+ "PENDING":"Flow or job is in pending state."
}
}
diff --git
a/gobblin-restli/gobblin-flow-config-service/gobblin-flow-config-service-api/src/main/snapshot/org.apache.gobblin.service.flowstatuses.snapshot.json
b/gobblin-restli/gobblin-flow-config-service/gobblin-flow-config-service-api/src/main/snapshot/org.apache.gobblin.service.flowstatuses.snapshot.json
index 74d906b..d84ea73 100644
---
a/gobblin-restli/gobblin-flow-config-service/gobblin-flow-config-service-api/src/main/snapshot/org.apache.gobblin.service.flowstatuses.snapshot.json
+++
b/gobblin-restli/gobblin-flow-config-service/gobblin-flow-config-service-api/src/main/snapshot/org.apache.gobblin.service.flowstatuses.snapshot.json
@@ -36,14 +36,15 @@
"name" : "ExecutionStatus",
"namespace" : "org.apache.gobblin.service",
"doc" : "Execution status for a flow or job",
- "symbols" : [ "COMPILED", "ORCHESTRATED", "RUNNING", "FAILED",
"CANCELLED", "COMPLETE" ],
+ "symbols" : [ "COMPILED", "ORCHESTRATED", "RUNNING", "FAILED",
"CANCELLED", "COMPLETE", "PENDING" ],
"symbolDocs" : {
"COMPILED" : "Flow compiled to jobs.",
"ORCHESTRATED" : "Job(s) orchestrated to spec executors.",
"RUNNING" : "Flow or job is currently executing",
"FAILED" : "Flow or job failed",
"CANCELLED" : "Flow cancelled.",
- "COMPLETE" : "Flow or job completed execution"
+ "COMPLETE" : "Flow or job completed execution",
+ "PENDING" : "Flow or job is in pending state."
}
}, {
"type" : "record",
diff --git
a/gobblin-restli/gobblin-flow-config-service/gobblin-flow-config-service-server/src/main/java/org/apache/gobblin/service/FlowStatusResource.java
b/gobblin-restli/gobblin-flow-config-service/gobblin-flow-config-service-server/src/main/java/org/apache/gobblin/service/FlowStatusResource.java
index 1f371d6..78f28f8 100644
---
a/gobblin-restli/gobblin-flow-config-service/gobblin-flow-config-service-server/src/main/java/org/apache/gobblin/service/FlowStatusResource.java
+++
b/gobblin-restli/gobblin-flow-config-service/gobblin-flow-config-service-server/src/main/java/org/apache/gobblin/service/FlowStatusResource.java
@@ -183,10 +183,16 @@ public class FlowStatusResource extends
ComplexKeyResourceTemplate<FlowStatusId,
return ExecutionStatus.CANCELLED;
}
+ if (currentFlowExecutionStatus == ExecutionStatus.COMPLETE &&
+ jobExecutionStatus == ExecutionStatus.PENDING) {
+ return ExecutionStatus.PENDING;
+ }
+
if (currentFlowExecutionStatus == ExecutionStatus.RUNNING ||
jobExecutionStatus == ExecutionStatus.RUNNING ||
jobExecutionStatus == ExecutionStatus.ORCHESTRATED ||
- jobExecutionStatus == ExecutionStatus.COMPILED) {
+ jobExecutionStatus == ExecutionStatus.COMPILED ||
+ jobExecutionStatus == ExecutionStatus.PENDING) {
return ExecutionStatus.RUNNING;
}
diff --git
a/gobblin-service/src/main/java/org/apache/gobblin/service/modules/orchestration/DagManager.java
b/gobblin-service/src/main/java/org/apache/gobblin/service/modules/orchestration/DagManager.java
index 43f1ee8..2fbb796 100644
---
a/gobblin-service/src/main/java/org/apache/gobblin/service/modules/orchestration/DagManager.java
+++
b/gobblin-service/src/main/java/org/apache/gobblin/service/modules/orchestration/DagManager.java
@@ -72,6 +72,7 @@ import
org.apache.gobblin.util.reflection.GobblinConstructorUtils;
import static org.apache.gobblin.service.ExecutionStatus.COMPLETE;
import static org.apache.gobblin.service.ExecutionStatus.FAILED;
import static org.apache.gobblin.service.ExecutionStatus.RUNNING;
+import static org.apache.gobblin.service.ExecutionStatus.PENDING;
import static org.apache.gobblin.service.ExecutionStatus.valueOf;
@@ -146,6 +147,7 @@ public class DagManager extends AbstractIdleService {
private final JobStatusRetriever jobStatusRetriever;
private final KafkaJobStatusMonitor jobStatusMonitor;
private final Config config;
+ private final Optional<EventSubmitter> eventSubmitter;
private volatile boolean isActive = false;
@@ -156,6 +158,12 @@ public class DagManager extends AbstractIdleService {
this.scheduledExecutorPool = Executors.newScheduledThreadPool(numThreads);
this.pollingInterval = ConfigUtils.getInt(config,
JOB_STATUS_POLLING_INTERVAL_KEY, DEFAULT_JOB_STATUS_POLLING_INTERVAL);
this.instrumentationEnabled = instrumentationEnabled;
+ if (instrumentationEnabled) {
+ MetricContext metricContext =
Instrumented.getMetricContext(ConfigUtils.configToState(ConfigFactory.empty()),
getClass());
+ this.eventSubmitter = Optional.of(new
EventSubmitter.Builder(metricContext, "org.apache.gobblin.service").build());
+ } else {
+ this.eventSubmitter = Optional.absent();
+ }
try {
Class jobStatusRetrieverClass =
Class.forName(ConfigUtils.getString(config, JOB_STATUS_RETRIEVER_CLASS_KEY,
DEFAULT_JOB_STATUS_RETRIEVER_CLASS));
@@ -187,12 +195,24 @@ public class DagManager extends AbstractIdleService {
synchronized void offer(Dag<JobExecutionPlan> dag) throws IOException {
//Persist the dag
this.dagStateStore.writeCheckpoint(dag);
+ submitEventsAndSetStatus(dag);
//Add it to the queue of dags
if (!this.queue.offer(dag)) {
throw new IOException("Could not add dag" +
DagManagerUtils.generateDagId(dag) + "to queue");
}
}
+ private void submitEventsAndSetStatus(Dag<JobExecutionPlan> dag) {
+ if (this.eventSubmitter.isPresent()) {
+ for (DagNode<JobExecutionPlan> dagNode : dag.getNodes()) {
+ JobExecutionPlan jobExecutionPlan =
DagManagerUtils.getJobExecutionPlan(dagNode);
+ Map<String, String> jobMetadata =
TimingEventUtils.getJobMetadata(Maps.newHashMap(), jobExecutionPlan);
+
this.eventSubmitter.get().getTimingEvent(TimingEvent.LauncherTimings.JOB_PENDING).stop(jobMetadata);
+ jobExecutionPlan.setExecutionStatus(PENDING);
+ }
+ }
+ }
+
public synchronized void setTopologySpecMap(Map<URI, TopologySpec>
topologySpecMap) {
this.topologySpecMap = topologySpecMap;
}
@@ -380,6 +400,9 @@ public class DagManager extends AbstractIdleService {
nextSubmitted.putAll(onJobFinish(node));
nodesToCleanUp.add(node);
break;
+ case PENDING:
+ jobExecutionPlan.setExecutionStatus(PENDING);
+ break;
default:
jobExecutionPlan.setExecutionStatus(RUNNING);
break;
diff --git
a/gobblin-service/src/main/java/org/apache/gobblin/service/modules/orchestration/DagManagerUtils.java
b/gobblin-service/src/main/java/org/apache/gobblin/service/modules/orchestration/DagManagerUtils.java
index 14fc2bc..110ae0e 100644
---
a/gobblin-service/src/main/java/org/apache/gobblin/service/modules/orchestration/DagManagerUtils.java
+++
b/gobblin-service/src/main/java/org/apache/gobblin/service/modules/orchestration/DagManagerUtils.java
@@ -141,7 +141,7 @@ public class DagManagerUtils {
DagNode<JobExecutionPlan> node = nodesToExpand.poll();
ExecutionStatus executionStatus = getExecutionStatus(node);
boolean addFlag = true;
- if (executionStatus == ExecutionStatus.$UNKNOWN) {
+ if (executionStatus == ExecutionStatus.PENDING) {
//Add a node to be executed next, only if all of its parent nodes are
COMPLETE.
List<DagNode<JobExecutionPlan>> parentNodes = dag.getParents(node);
for (DagNode<JobExecutionPlan> parentNode : parentNodes) {
diff --git
a/gobblin-service/src/main/java/org/apache/gobblin/service/modules/spec/JobExecutionPlan.java
b/gobblin-service/src/main/java/org/apache/gobblin/service/modules/spec/JobExecutionPlan.java
index 485f555..acdafea 100644
---
a/gobblin-service/src/main/java/org/apache/gobblin/service/modules/spec/JobExecutionPlan.java
+++
b/gobblin-service/src/main/java/org/apache/gobblin/service/modules/spec/JobExecutionPlan.java
@@ -58,7 +58,7 @@ public class JobExecutionPlan {
private final JobSpec jobSpec;
private final SpecExecutor specExecutor;
- private ExecutionStatus executionStatus = ExecutionStatus.$UNKNOWN;
+ private ExecutionStatus executionStatus = ExecutionStatus.PENDING;
private final int maxAttempts;
private int currentAttempts = 0;
diff --git
a/gobblin-service/src/main/java/org/apache/gobblin/service/monitoring/KafkaAvroJobStatusMonitor.java
b/gobblin-service/src/main/java/org/apache/gobblin/service/monitoring/KafkaAvroJobStatusMonitor.java
index fe4d171..e8611cf 100644
---
a/gobblin-service/src/main/java/org/apache/gobblin/service/monitoring/KafkaAvroJobStatusMonitor.java
+++
b/gobblin-service/src/main/java/org/apache/gobblin/service/monitoring/KafkaAvroJobStatusMonitor.java
@@ -126,6 +126,9 @@ public class KafkaAvroJobStatusMonitor extends
KafkaJobStatusMonitor {
case TimingEvent.FlowTimings.FLOW_COMPILE_FAILED:
properties.put(JobStatusRetriever.EVENT_NAME_FIELD,
ExecutionStatus.FAILED.name());
break;
+ case TimingEvent.LauncherTimings.JOB_PENDING:
+ properties.put(JobStatusRetriever.EVENT_NAME_FIELD,
ExecutionStatus.PENDING.name());
+ break;
case TimingEvent.LauncherTimings.JOB_ORCHESTRATED:
properties.put(JobStatusRetriever.EVENT_NAME_FIELD,
ExecutionStatus.ORCHESTRATED.name());
break;