This is an automated email from the ASF dual-hosted git repository.
kuyu pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/incubator-gobblin.git
The following commit(s) were added to refs/heads/master by this push:
new 65e2a64 [GOBBLIN-1090] send compiled_skip metrics
65e2a64 is described below
commit 65e2a64d2e40f46203837f4785c2426eae29a12e
Author: Arjun <[email protected]>
AuthorDate: Wed Mar 18 20:58:13 2020 -0700
[GOBBLIN-1090] send compiled_skip metrics
Closes #2931 from arjun4084346/compiledMetrics
---
.../service/modules/orchestration/Orchestrator.java | 19 ++++++++++---------
1 file changed, 10 insertions(+), 9 deletions(-)
diff --git
a/gobblin-service/src/main/java/org/apache/gobblin/service/modules/orchestration/Orchestrator.java
b/gobblin-service/src/main/java/org/apache/gobblin/service/modules/orchestration/Orchestrator.java
index 7a1b611..b7b6593 100644
---
a/gobblin-service/src/main/java/org/apache/gobblin/service/modules/orchestration/Orchestrator.java
+++
b/gobblin-service/src/main/java/org/apache/gobblin/service/modules/orchestration/Orchestrator.java
@@ -225,10 +225,6 @@ public class Orchestrator implements SpecCatalogListener,
Instrumentable {
long startTime = System.nanoTime();
if (spec instanceof FlowSpec) {
- TimingEvent flowCompilationTimer = this.eventSubmitter.isPresent()
- ?
this.eventSubmitter.get().getTimingEvent(TimingEvent.FlowTimings.FLOW_COMPILED)
- : null;
-
Config flowConfig = ((FlowSpec) spec).getConfig();
String flowGroup =
flowConfig.getString(ConfigurationKeys.FLOW_GROUP_KEY);
String flowName = flowConfig.getString(ConfigurationKeys.FLOW_NAME_KEY);
@@ -247,12 +243,14 @@ public class Orchestrator implements SpecCatalogListener,
Instrumentable {
if (!canRun(flowName, flowGroup, allowConcurrentExecution)) {
_log.warn("Another instance of flowGroup: {}, flowName: {} running;
Skipping flow execution since "
+ "concurrent executions are disabled for this flow.", flowGroup,
flowName);
-
flowGauges.get(spec.getUri().toString()).setState(CompiledState.FAILED);
+
flowGauges.get(spec.getUri().toString()).setState(CompiledState.SKIPPED);
return;
- } else {
-
flowGauges.get(spec.getUri().toString()).setState(CompiledState.SUCCESSFUL);
}
+ TimingEvent flowCompilationTimer = this.eventSubmitter.isPresent()
+ ?
this.eventSubmitter.get().getTimingEvent(TimingEvent.FlowTimings.FLOW_COMPILED)
+ : null;
+
Dag<JobExecutionPlan> jobExecutionPlanDag =
specCompiler.compileFlow(spec);
Map<String, String> flowMetadata =
TimingEventUtils.getFlowMetadata((FlowSpec) spec);
@@ -265,11 +263,14 @@ public class Orchestrator implements SpecCatalogListener,
Instrumentable {
TimingEvent flowCompileFailedTimer = this.eventSubmitter.isPresent() ?
this.eventSubmitter.get()
.getTimingEvent(TimingEvent.FlowTimings.FLOW_COMPILE_FAILED) :
null;
Instrumented.markMeter(this.flowOrchestrationFailedMeter);
+
flowGauges.get(spec.getUri().toString()).setState(CompiledState.FAILED);
_log.warn("Cannot determine an executor to run on for Spec: " + spec);
if (flowCompileFailedTimer != null) {
flowCompileFailedTimer.stop(flowMetadata);
}
return;
+ } else {
+
flowGauges.get(spec.getUri().toString()).setState(CompiledState.SUCCESSFUL);
}
//If it is a scheduled flow (and hence, does not have flowExecutionId in
the FlowSpec) and the flow compilation is successful,
@@ -281,7 +282,6 @@ public class Orchestrator implements SpecCatalogListener,
Instrumentable {
flowCompilationTimer.stop(flowMetadata);
}
-
if (this.dagManager.isPresent()) {
//Send the dag to the DagManager.
this.dagManager.get().addDag(jobExecutionPlanDag, true);
@@ -406,7 +406,8 @@ public class Orchestrator implements SpecCatalogListener,
Instrumentable {
private enum CompiledState {
FAILED(-1),
UNKNOWN(0),
- SUCCESSFUL(1);
+ SUCCESSFUL(1),
+ SKIPPED(2);
public int value;