This is an automated email from the ASF dual-hosted git repository.
suvasude pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/incubator-gobblin.git
The following commit(s) were added to refs/heads/master by this push:
new 133831b [GOBBLIN-771] add a few metrics for gobblin service
133831b is described below
commit 133831b702beb82fa9a7ecaa42991016375bb7e7
Author: Arjun <[email protected]>
AuthorDate: Thu May 23 23:50:55 2019 -0700
[GOBBLIN-771] add a few metrics for gobblin service
Closes #2635 from arjun4084346/gaasMetrics
---
.../metrics/reporter/MetricReportReporter.java | 1 -
.../metrics/reporter/util/MetricReportUtils.java | 3 +++
.../gobblin/metrics}/ServiceMetricNames.java | 9 +++++++-
.../service/FlowConfigResourceLocalHandler.java | 24 ++++++++++++++++++++++
.../service/FlowConfigV2ResourceLocalHandler.java | 2 ++
.../modules/flow/BaseFlowToJobSpecCompiler.java | 2 +-
.../service/modules/orchestration/DagManager.java | 21 +++++++++++++++++--
.../modules/orchestration/Orchestrator.java | 13 +++++++++++-
8 files changed, 69 insertions(+), 6 deletions(-)
diff --git
a/gobblin-metrics-libs/gobblin-metrics-base/src/main/java/org/apache/gobblin/metrics/reporter/MetricReportReporter.java
b/gobblin-metrics-libs/gobblin-metrics-base/src/main/java/org/apache/gobblin/metrics/reporter/MetricReportReporter.java
index 75dea54..c971480 100644
---
a/gobblin-metrics-libs/gobblin-metrics-base/src/main/java/org/apache/gobblin/metrics/reporter/MetricReportReporter.java
+++
b/gobblin-metrics-libs/gobblin-metrics-base/src/main/java/org/apache/gobblin/metrics/reporter/MetricReportReporter.java
@@ -124,7 +124,6 @@ public abstract class MetricReportReporter extends
ConfiguredScheduledReporter {
for (Map.Entry<String, Timer> timer : timers.entrySet()) {
metrics.addAll(serializeSnapshot(timer.getKey(),
timer.getValue().getSnapshot()));
metrics.addAll(serializeMetered(timer.getKey(), timer.getValue()));
- metrics.addAll(serializeSingleValue(timer.getKey(),
timer.getValue().getCount(), Measurements.COUNT.getName()));
}
Map<String, Object> allTags = Maps.newHashMap();
diff --git
a/gobblin-metrics-libs/gobblin-metrics-base/src/main/java/org/apache/gobblin/metrics/reporter/util/MetricReportUtils.java
b/gobblin-metrics-libs/gobblin-metrics-base/src/main/java/org/apache/gobblin/metrics/reporter/util/MetricReportUtils.java
index 34727fe..3cdb0a4 100644
---
a/gobblin-metrics-libs/gobblin-metrics-base/src/main/java/org/apache/gobblin/metrics/reporter/util/MetricReportUtils.java
+++
b/gobblin-metrics-libs/gobblin-metrics-base/src/main/java/org/apache/gobblin/metrics/reporter/util/MetricReportUtils.java
@@ -38,6 +38,9 @@ public class MetricReportUtils {
public static final int SCHEMA_VERSION = 1;
private static Optional<SpecificDatumReader<MetricReport>> READER =
Optional.absent();
+ // This prefix can be used to distinguish metrics reported by GobblinService
from other metrics reported by Gobblin
+ // This can be used in conjunction with MetricNameRegexFilter to filter out
metrics in any MetricReporter
+ public static final String GOBBLIN_SERVICE_METRICS_PREFIX = "GobblinService";
/**
* Parses a {@link org.apache.gobblin.metrics.MetricReport} from a byte
array representing a json input.
diff --git
a/gobblin-service/src/main/java/org/apache/gobblin/service/ServiceMetricNames.java
b/gobblin-metrics-libs/gobblin-metrics/src/main/java/org/apache/gobblin/metrics/ServiceMetricNames.java
similarity index 83%
rename from
gobblin-service/src/main/java/org/apache/gobblin/service/ServiceMetricNames.java
rename to
gobblin-metrics-libs/gobblin-metrics/src/main/java/org/apache/gobblin/metrics/ServiceMetricNames.java
index f72093b..3a8ecaa 100644
---
a/gobblin-service/src/main/java/org/apache/gobblin/service/ServiceMetricNames.java
+++
b/gobblin-metrics-libs/gobblin-metrics/src/main/java/org/apache/gobblin/metrics/ServiceMetricNames.java
@@ -14,7 +14,7 @@
* See the License for the specific language governing permissions and
* limitations under the License.
*/
-package org.apache.gobblin.service;
+package org.apache.gobblin.metrics;
public class ServiceMetricNames {
private static final String GOBBLIN_SERVICE_PREFIX = "gobblin.service.";
@@ -31,4 +31,11 @@ public class ServiceMetricNames {
//Job status poll timer
public static final String JOB_STATUS_POLLED_TIMER = GOBBLIN_SERVICE_PREFIX
+ "jobStatusPoll.time";
+
+ public static final String CREATE_FLOW_METER = "CreateFlow";
+ public static final String DELETE_FLOW_METER = "DeleteFlow";
+ public static final String RUN_IMMEDIATELY_FLOW_METER = "RunImmediatelyFlow";
+
+ public static final String RUNNING_FLOWS_COUNTER = "RunningFlows";
+ public static final String COMPILED = "Compiled";
}
diff --git
a/gobblin-restli/gobblin-flow-config-service/gobblin-flow-config-service-server/src/main/java/org/apache/gobblin/service/FlowConfigResourceLocalHandler.java
b/gobblin-restli/gobblin-flow-config-service/gobblin-flow-config-service-server/src/main/java/org/apache/gobblin/service/FlowConfigResourceLocalHandler.java
index a82e760..fcabd63 100644
---
a/gobblin-restli/gobblin-flow-config-service/gobblin-flow-config-service-server/src/main/java/org/apache/gobblin/service/FlowConfigResourceLocalHandler.java
+++
b/gobblin-restli/gobblin-flow-config-service/gobblin-flow-config-service-server/src/main/java/org/apache/gobblin/service/FlowConfigResourceLocalHandler.java
@@ -23,6 +23,7 @@ import java.util.Properties;
import org.apache.commons.lang.StringUtils;
+import com.codahale.metrics.MetricRegistry;
import com.google.common.collect.Maps;
import com.linkedin.data.template.StringMap;
import com.linkedin.restli.common.ComplexResourceKey;
@@ -39,9 +40,16 @@ import lombok.extern.slf4j.Slf4j;
import org.apache.gobblin.config.ConfigBuilder;
import org.apache.gobblin.configuration.ConfigurationKeys;
+import org.apache.gobblin.instrumented.Instrumented;
+import org.apache.gobblin.metrics.ContextAwareMeter;
+import org.apache.gobblin.metrics.MetricContext;
+import org.apache.gobblin.metrics.ServiceMetricNames;
+import org.apache.gobblin.metrics.reporter.util.MetricReportUtils;
import org.apache.gobblin.runtime.api.FlowSpec;
import org.apache.gobblin.runtime.api.SpecNotFoundException;
import org.apache.gobblin.runtime.spec_catalog.FlowCatalog;
+import org.apache.gobblin.util.ConfigUtils;
+
/**
* A {@link FlowConfigsResourceHandler} that handles Restli locally.
@@ -51,8 +59,19 @@ public class FlowConfigResourceLocalHandler implements
FlowConfigsResourceHandle
public static final Schedule NEVER_RUN_CRON_SCHEDULE = new
Schedule().setCronSchedule("0 0 0 ? 1 1 2050");
@Getter
protected FlowCatalog flowCatalog;
+ protected final ContextAwareMeter createFlow;
+ protected final ContextAwareMeter deleteFlow;
+ protected final ContextAwareMeter runImmediatelyFlow;
+
public FlowConfigResourceLocalHandler(FlowCatalog flowCatalog) {
this.flowCatalog = flowCatalog;
+ MetricContext metricContext =
Instrumented.getMetricContext(ConfigUtils.configToState(ConfigFactory.empty()),
getClass());
+ this.createFlow = metricContext.contextAwareMeter(
+ MetricRegistry.name(MetricReportUtils.GOBBLIN_SERVICE_METRICS_PREFIX,
ServiceMetricNames.CREATE_FLOW_METER));
+ this.deleteFlow = metricContext.contextAwareMeter(
+ MetricRegistry.name(MetricReportUtils.GOBBLIN_SERVICE_METRICS_PREFIX,
ServiceMetricNames.DELETE_FLOW_METER));
+ this.runImmediatelyFlow = metricContext.contextAwareMeter(
+ MetricRegistry.name(MetricReportUtils.GOBBLIN_SERVICE_METRICS_PREFIX,
ServiceMetricNames.RUN_IMMEDIATELY_FLOW_METER));
}
/**
@@ -108,6 +127,10 @@ public class FlowConfigResourceLocalHandler implements
FlowConfigsResourceHandle
*/
public CreateResponse createFlowConfig(FlowConfig flowConfig, boolean
triggerListener) throws FlowConfigLoggedException {
log.info("[GAAS-REST] Create called with flowGroup " +
flowConfig.getId().getFlowGroup() + " flowName " +
flowConfig.getId().getFlowName());
+ this.createFlow.mark();
+ if (!flowConfig.hasSchedule() ||
StringUtils.isEmpty(flowConfig.getSchedule().getCronSchedule())) {
+ this.runImmediatelyFlow.mark();
+ }
if (flowConfig.hasExplain()) {
//Return Error if FlowConfig has explain set. Explain request is only
valid for v2 FlowConfig.
@@ -171,6 +194,7 @@ public class FlowConfigResourceLocalHandler implements
FlowConfigsResourceHandle
public UpdateResponse deleteFlowConfig(FlowId flowId, Properties header,
boolean triggerListener) throws FlowConfigLoggedException {
log.info("[GAAS-REST] Delete called with flowGroup {} flowName {}",
flowId.getFlowGroup(), flowId.getFlowName());
+ this.deleteFlow.mark();
URI flowUri = null;
try {
diff --git
a/gobblin-restli/gobblin-flow-config-service/gobblin-flow-config-service-server/src/main/java/org/apache/gobblin/service/FlowConfigV2ResourceLocalHandler.java
b/gobblin-restli/gobblin-flow-config-service/gobblin-flow-config-service-server/src/main/java/org/apache/gobblin/service/FlowConfigV2ResourceLocalHandler.java
index c0c7fa6..b1e8eeb 100644
---
a/gobblin-restli/gobblin-flow-config-service/gobblin-flow-config-service-server/src/main/java/org/apache/gobblin/service/FlowConfigV2ResourceLocalHandler.java
+++
b/gobblin-restli/gobblin-flow-config-service/gobblin-flow-config-service-server/src/main/java/org/apache/gobblin/service/FlowConfigV2ResourceLocalHandler.java
@@ -43,6 +43,8 @@ public class FlowConfigV2ResourceLocalHandler extends
FlowConfigResourceLocalHan
*/
public CreateKVResponse createFlowConfig(FlowConfig flowConfig, boolean
triggerListener) throws FlowConfigLoggedException {
String createLog = "[GAAS-REST] Create called with flowGroup " +
flowConfig.getId().getFlowGroup() + " flowName " +
flowConfig.getId().getFlowName();
+ this.createFlow.mark();
+
if (flowConfig.hasExplain()) {
createLog += " explain " + Boolean.toString(flowConfig.isExplain());
}
diff --git
a/gobblin-service/src/main/java/org/apache/gobblin/service/modules/flow/BaseFlowToJobSpecCompiler.java
b/gobblin-service/src/main/java/org/apache/gobblin/service/modules/flow/BaseFlowToJobSpecCompiler.java
index 16bb04c..8374211 100644
---
a/gobblin-service/src/main/java/org/apache/gobblin/service/modules/flow/BaseFlowToJobSpecCompiler.java
+++
b/gobblin-service/src/main/java/org/apache/gobblin/service/modules/flow/BaseFlowToJobSpecCompiler.java
@@ -56,7 +56,7 @@ import org.apache.gobblin.runtime.job_catalog.FSJobCatalog;
import org.apache.gobblin.runtime.job_spec.ResolvedJobSpec;
import org.apache.gobblin.runtime.spec_catalog.AddSpecResponse;
import org.apache.gobblin.service.ServiceConfigKeys;
-import org.apache.gobblin.service.ServiceMetricNames;
+import org.apache.gobblin.metrics.ServiceMetricNames;
import org.apache.gobblin.service.modules.flowgraph.Dag;
import org.apache.gobblin.service.modules.spec.JobExecutionPlan;
import org.apache.gobblin.util.ConfigUtils;
diff --git
a/gobblin-service/src/main/java/org/apache/gobblin/service/modules/orchestration/DagManager.java
b/gobblin-service/src/main/java/org/apache/gobblin/service/modules/orchestration/DagManager.java
index 5210778..e8f45d5 100644
---
a/gobblin-service/src/main/java/org/apache/gobblin/service/modules/orchestration/DagManager.java
+++
b/gobblin-service/src/main/java/org/apache/gobblin/service/modules/orchestration/DagManager.java
@@ -33,6 +33,7 @@ import java.util.concurrent.LinkedBlockingDeque;
import java.util.concurrent.ScheduledExecutorService;
import java.util.concurrent.TimeUnit;
+import com.codahale.metrics.MetricRegistry;
import com.codahale.metrics.Timer;
import com.google.common.base.Optional;
import com.google.common.collect.Lists;
@@ -46,15 +47,17 @@ import lombok.extern.slf4j.Slf4j;
import org.apache.gobblin.annotation.Alpha;
import org.apache.gobblin.configuration.ConfigurationKeys;
import org.apache.gobblin.instrumented.Instrumented;
+import org.apache.gobblin.metrics.ContextAwareCounter;
import org.apache.gobblin.metrics.MetricContext;
import org.apache.gobblin.metrics.event.EventSubmitter;
import org.apache.gobblin.metrics.event.TimingEvent;
+import org.apache.gobblin.metrics.reporter.util.MetricReportUtils;
import org.apache.gobblin.runtime.api.JobSpec;
import org.apache.gobblin.runtime.api.Spec;
import org.apache.gobblin.runtime.api.SpecProducer;
import org.apache.gobblin.runtime.api.TopologySpec;
import org.apache.gobblin.service.ExecutionStatus;
-import org.apache.gobblin.service.ServiceMetricNames;
+import org.apache.gobblin.metrics.ServiceMetricNames;
import org.apache.gobblin.service.modules.flowgraph.Dag;
import org.apache.gobblin.service.modules.flowgraph.Dag.DagNode;
import org.apache.gobblin.service.modules.spec.JobExecutionPlan;
@@ -278,7 +281,6 @@ public class DagManager extends AbstractIdleService {
this.metricContext =
Instrumented.getMetricContext(ConfigUtils.configToState(ConfigFactory.empty()),
getClass());
this.eventSubmitter = Optional.of(new
EventSubmitter.Builder(this.metricContext,
"org.apache.gobblin.service").build());
this.jobStatusPolledTimer =
Optional.of(this.metricContext.timer(ServiceMetricNames.JOB_STATUS_POLLED_TIMER));
-
} else {
this.metricContext = null;
this.eventSubmitter = Optional.absent();
@@ -463,6 +465,9 @@ public class DagManager extends AbstractIdleService {
// either successfully or unsuccessfully. To catch any exceptions in
the job submission, the DagManagerThread
// blocks (by calling Future#get()) until the submission is completed.
producer.addSpec(jobSpec).get();
+ if (this.metricContext != null) {
+ getRunningJobsCounter(dagNode).inc();
+ }
if (jobOrchestrationTimer != null) {
jobOrchestrationTimer.stop(jobMetadata);
@@ -490,6 +495,10 @@ public class DagManager extends AbstractIdleService {
ExecutionStatus jobStatus = DagManagerUtils.getExecutionStatus(dagNode);
log.info("Job {} of Dag {} has finished with status {}", jobName, dagId,
jobStatus.name());
+ if (this.metricContext != null) {
+ getRunningJobsCounter(dagNode).dec();
+ }
+
if (jobStatus == COMPLETE) {
return submitNext(dagId);
} else if (jobStatus == FAILED) {
@@ -523,6 +532,14 @@ public class DagManager extends AbstractIdleService {
return !this.dagToJobs.get(dagId).isEmpty();
}
+ private ContextAwareCounter
getRunningJobsCounter(DagNode<JobExecutionPlan> dagNode) {
+ return metricContext.contextAwareCounter(
+ MetricRegistry.name(
+ MetricReportUtils.GOBBLIN_SERVICE_METRICS_PREFIX,
+ ServiceMetricNames.RUNNING_FLOWS_COUNTER,
+ dagNode.getValue().getSpecExecutor().getUri().toString()));
+ }
+
/**
* Perform clean up. Remove a dag from the dagstore if the dag is complete
and update internal state.
*/
diff --git
a/gobblin-service/src/main/java/org/apache/gobblin/service/modules/orchestration/Orchestrator.java
b/gobblin-service/src/main/java/org/apache/gobblin/service/modules/orchestration/Orchestrator.java
index 76f5b7f..cda6b52 100644
---
a/gobblin-service/src/main/java/org/apache/gobblin/service/modules/orchestration/Orchestrator.java
+++
b/gobblin-service/src/main/java/org/apache/gobblin/service/modules/orchestration/Orchestrator.java
@@ -30,6 +30,7 @@ import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import com.codahale.metrics.Meter;
+import com.codahale.metrics.MetricRegistry;
import com.codahale.metrics.Timer;
import com.google.common.annotations.VisibleForTesting;
import com.google.common.base.Optional;
@@ -45,9 +46,11 @@ import org.apache.gobblin.configuration.State;
import org.apache.gobblin.instrumented.Instrumentable;
import org.apache.gobblin.instrumented.Instrumented;
import org.apache.gobblin.metrics.MetricContext;
+import org.apache.gobblin.metrics.ServiceMetricNames;
import org.apache.gobblin.metrics.Tag;
import org.apache.gobblin.metrics.event.EventSubmitter;
import org.apache.gobblin.metrics.event.TimingEvent;
+import org.apache.gobblin.metrics.reporter.util.MetricReportUtils;
import org.apache.gobblin.runtime.api.FlowSpec;
import org.apache.gobblin.runtime.api.JobSpec;
import org.apache.gobblin.runtime.api.Spec;
@@ -57,7 +60,6 @@ import org.apache.gobblin.runtime.api.TopologySpec;
import org.apache.gobblin.runtime.spec_catalog.AddSpecResponse;
import org.apache.gobblin.runtime.spec_catalog.TopologyCatalog;
import org.apache.gobblin.service.ServiceConfigKeys;
-import org.apache.gobblin.service.ServiceMetricNames;
import org.apache.gobblin.service.modules.flow.SpecCompiler;
import org.apache.gobblin.service.modules.flowgraph.Dag;
import org.apache.gobblin.service.modules.spec.JobExecutionPlan;
@@ -83,6 +85,7 @@ public class Orchestrator implements SpecCatalogListener,
Instrumentable {
protected final Optional<DagManager> dagManager;
protected final MetricContext metricContext;
+
protected final Optional<EventSubmitter> eventSubmitter;
@Getter
private Optional<Meter> flowOrchestrationSuccessFulMeter;
@@ -230,7 +233,15 @@ public class Orchestrator implements SpecCatalogListener,
Instrumentable {
if (!canRun(flowName, flowGroup, allowConcurrentExecution)) {
_log.warn("Another instance of flowGroup: {}, flowName: {} running;
Skipping flow execution since "
+ "concurrent executions are disabled for this flow.", flowGroup,
flowName);
+ // We send a gauge with value 0 signifying that the flow could not be
compiled because previous execution is already running
+ metricContext.newContextAwareGauge(
+
MetricRegistry.name(MetricReportUtils.GOBBLIN_SERVICE_METRICS_PREFIX,
flowGroup, flowName, ServiceMetricNames.COMPILED),
+ () -> 0L);
return;
+ } else {
+ metricContext.newContextAwareGauge(
+
MetricRegistry.name(MetricReportUtils.GOBBLIN_SERVICE_METRICS_PREFIX,
flowGroup, flowName, ServiceMetricNames.COMPILED),
+ () -> 1L);
}
Dag<JobExecutionPlan> jobExecutionPlanDag =
specCompiler.compileFlow(spec);