This is an automated email from the ASF dual-hosted git repository.
suvasude pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/incubator-gobblin.git
The following commit(s) were added to refs/heads/master by this push:
new 6bed94a [GOBBLIN-802] change gauge metrics context to
RootMetricsContext
6bed94a is described below
commit 6bed94a0f213ef64b225bb05b80ab338c25bbc17
Author: Arjun <[email protected]>
AuthorDate: Wed Jun 12 09:52:05 2019 -0700
[GOBBLIN-802] change gauge metrics context to RootMetricsContext
Closes #2668 from arjun4084346/metric-fix
---
.../org/apache/gobblin/metrics/MetricContext.java | 1 +
.../modules/orchestration/Orchestrator.java | 43 +++++++++++++++++-----
2 files changed, 35 insertions(+), 9 deletions(-)
diff --git
a/gobblin-metrics-libs/gobblin-metrics-base/src/main/java/org/apache/gobblin/metrics/MetricContext.java
b/gobblin-metrics-libs/gobblin-metrics-base/src/main/java/org/apache/gobblin/metrics/MetricContext.java
index c712c4d..7a851e4 100644
---
a/gobblin-metrics-libs/gobblin-metrics-base/src/main/java/org/apache/gobblin/metrics/MetricContext.java
+++
b/gobblin-metrics-libs/gobblin-metrics-base/src/main/java/org/apache/gobblin/metrics/MetricContext.java
@@ -508,6 +508,7 @@ public class MetricContext extends MetricRegistry
implements ReportableContext,
/**
* Create a new {@link ContextAwareGauge} wrapping a given {@link
com.codahale.metrics.Gauge}.
+ * Unlike other metrics, gauges are supposed to be registered by the caller.
*
* @param name name of the {@link ContextAwareGauge}
* @param gauge the {@link com.codahale.metrics.Gauge} to be wrapped by the
{@link ContextAwareGauge}
diff --git
a/gobblin-service/src/main/java/org/apache/gobblin/service/modules/orchestration/Orchestrator.java
b/gobblin-service/src/main/java/org/apache/gobblin/service/modules/orchestration/Orchestrator.java
index c371b51..d8d5480 100644
---
a/gobblin-service/src/main/java/org/apache/gobblin/service/modules/orchestration/Orchestrator.java
+++
b/gobblin-service/src/main/java/org/apache/gobblin/service/modules/orchestration/Orchestrator.java
@@ -34,6 +34,7 @@ import com.codahale.metrics.MetricRegistry;
import com.codahale.metrics.Timer;
import com.google.common.annotations.VisibleForTesting;
import com.google.common.base.Optional;
+import com.google.common.collect.Maps;
import com.typesafe.config.Config;
import javax.annotation.Nonnull;
@@ -45,7 +46,9 @@ import org.apache.gobblin.configuration.ConfigurationKeys;
import org.apache.gobblin.configuration.State;
import org.apache.gobblin.instrumented.Instrumentable;
import org.apache.gobblin.instrumented.Instrumented;
+import org.apache.gobblin.metrics.ContextAwareGauge;
import org.apache.gobblin.metrics.MetricContext;
+import org.apache.gobblin.metrics.RootMetricContext;
import org.apache.gobblin.metrics.ServiceMetricNames;
import org.apache.gobblin.metrics.Tag;
import org.apache.gobblin.metrics.event.EventSubmitter;
@@ -98,6 +101,8 @@ public class Orchestrator implements SpecCatalogListener,
Instrumentable {
private final ClassAliasResolver<SpecCompiler> aliasResolver;
+ private Map<String, FlowCompiledState> flowGauges = Maps.newHashMap();
+
public Orchestrator(Config config, Optional<TopologyCatalog>
topologyCatalog, Optional<DagManager> dagManager, Optional<Logger> log,
boolean instrumentationEnabled) {
_log = log.isPresent() ? log.get() : LoggerFactory.getLogger(getClass());
@@ -223,25 +228,28 @@ public class Orchestrator implements SpecCatalogListener,
Instrumentable {
?
this.eventSubmitter.get().getTimingEvent(TimingEvent.FlowTimings.FLOW_COMPILED)
: null;
- //If the FlowSpec disallows concurrent executions, then check if another
instance of the flow is already
- //running. If so, return immediately.
Config flowConfig = ((FlowSpec) spec).getConfig();
String flowGroup =
flowConfig.getString(ConfigurationKeys.FLOW_GROUP_KEY);
String flowName = flowConfig.getString(ConfigurationKeys.FLOW_NAME_KEY);
+
+ if (!flowGauges.containsKey(spec.getUri().toString())) {
+ String flowCompiledGaugeName =
MetricRegistry.name(MetricReportUtils.GOBBLIN_SERVICE_METRICS_PREFIX,
flowGroup, flowName, ServiceMetricNames.COMPILED);
+ flowGauges.put(spec.getUri().toString(), new FlowCompiledState());
+ ContextAwareGauge<Integer> gauge =
RootMetricContext.get().newContextAwareGauge(flowCompiledGaugeName, () ->
flowGauges.get(spec.getUri().toString()).state.value);
+ RootMetricContext.get().register(flowCompiledGaugeName, gauge);
+ }
+
+ //If the FlowSpec disallows concurrent executions, then check if another
instance of the flow is already
+ //running. If so, return immediately.
boolean allowConcurrentExecution = ConfigUtils.getBoolean(flowConfig,
ConfigurationKeys.FLOW_ALLOW_CONCURRENT_EXECUTION, true);
if (!canRun(flowName, flowGroup, allowConcurrentExecution)) {
_log.warn("Another instance of flowGroup: {}, flowName: {} running;
Skipping flow execution since "
+ "concurrent executions are disabled for this flow.", flowGroup,
flowName);
- // We send a gauge with value 0 signifying that the flow could not be
compiled because previous execution is already running
- metricContext.newContextAwareGauge(
-
MetricRegistry.name(MetricReportUtils.GOBBLIN_SERVICE_METRICS_PREFIX,
flowGroup, flowName, ServiceMetricNames.COMPILED),
- () -> 0L);
+
flowGauges.get(spec.getUri().toString()).setState(CompiledState.FAILED);
return;
} else {
- metricContext.newContextAwareGauge(
-
MetricRegistry.name(MetricReportUtils.GOBBLIN_SERVICE_METRICS_PREFIX,
flowGroup, flowName, ServiceMetricNames.COMPILED),
- () -> 1L);
+
flowGauges.get(spec.getUri().toString()).setState(CompiledState.SUCCESSFUL);
}
Dag<JobExecutionPlan> jobExecutionPlanDag =
specCompiler.compileFlow(spec);
@@ -404,4 +412,21 @@ public class Orchestrator implements SpecCatalogListener,
Instrumentable {
public void switchMetricContext(MetricContext context) {
throw new UnsupportedOperationException();
}
+
+ @Setter
+ private static class FlowCompiledState {
+ private CompiledState state = CompiledState.UNKNOWN;
+ }
+
+ private enum CompiledState {
+ FAILED(-1),
+ UNKNOWN(0),
+ SUCCESSFUL(1);
+
+ public int value;
+
+ CompiledState(int value) {
+ this.value = value;
+ }
+ }
}
\ No newline at end of file