This is an automated email from the ASF dual-hosted git repository.

suvasude pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/incubator-gobblin.git


The following commit(s) were added to refs/heads/master by this push:
     new 133831b  [GOBBLIN-771] add a few metrics for gobblin service
133831b is described below

commit 133831b702beb82fa9a7ecaa42991016375bb7e7
Author: Arjun <[email protected]>
AuthorDate: Thu May 23 23:50:55 2019 -0700

    [GOBBLIN-771] add a few metrics for gobblin service
    
    Closes #2635 from arjun4084346/gaasMetrics
---
 .../metrics/reporter/MetricReportReporter.java     |  1 -
 .../metrics/reporter/util/MetricReportUtils.java   |  3 +++
 .../gobblin/metrics}/ServiceMetricNames.java       |  9 +++++++-
 .../service/FlowConfigResourceLocalHandler.java    | 24 ++++++++++++++++++++++
 .../service/FlowConfigV2ResourceLocalHandler.java  |  2 ++
 .../modules/flow/BaseFlowToJobSpecCompiler.java    |  2 +-
 .../service/modules/orchestration/DagManager.java  | 21 +++++++++++++++++--
 .../modules/orchestration/Orchestrator.java        | 13 +++++++++++-
 8 files changed, 69 insertions(+), 6 deletions(-)

diff --git 
a/gobblin-metrics-libs/gobblin-metrics-base/src/main/java/org/apache/gobblin/metrics/reporter/MetricReportReporter.java
 
b/gobblin-metrics-libs/gobblin-metrics-base/src/main/java/org/apache/gobblin/metrics/reporter/MetricReportReporter.java
index 75dea54..c971480 100644
--- 
a/gobblin-metrics-libs/gobblin-metrics-base/src/main/java/org/apache/gobblin/metrics/reporter/MetricReportReporter.java
+++ 
b/gobblin-metrics-libs/gobblin-metrics-base/src/main/java/org/apache/gobblin/metrics/reporter/MetricReportReporter.java
@@ -124,7 +124,6 @@ public abstract class MetricReportReporter extends 
ConfiguredScheduledReporter {
     for (Map.Entry<String, Timer> timer : timers.entrySet()) {
       metrics.addAll(serializeSnapshot(timer.getKey(), 
timer.getValue().getSnapshot()));
       metrics.addAll(serializeMetered(timer.getKey(), timer.getValue()));
-      metrics.addAll(serializeSingleValue(timer.getKey(), 
timer.getValue().getCount(), Measurements.COUNT.getName()));
     }
 
     Map<String, Object> allTags = Maps.newHashMap();
diff --git 
a/gobblin-metrics-libs/gobblin-metrics-base/src/main/java/org/apache/gobblin/metrics/reporter/util/MetricReportUtils.java
 
b/gobblin-metrics-libs/gobblin-metrics-base/src/main/java/org/apache/gobblin/metrics/reporter/util/MetricReportUtils.java
index 34727fe..3cdb0a4 100644
--- 
a/gobblin-metrics-libs/gobblin-metrics-base/src/main/java/org/apache/gobblin/metrics/reporter/util/MetricReportUtils.java
+++ 
b/gobblin-metrics-libs/gobblin-metrics-base/src/main/java/org/apache/gobblin/metrics/reporter/util/MetricReportUtils.java
@@ -38,6 +38,9 @@ public class MetricReportUtils {
 
   public static final int SCHEMA_VERSION = 1;
   private static Optional<SpecificDatumReader<MetricReport>> READER = 
Optional.absent();
+  // This prefix can be used to distinguish metrics reported by GobblinService 
from other metrics reported by Gobblin
+  // This can be used in conjunction with MetricNameRegexFilter to filter out 
metrics in any MetricReporter
+  public static final String GOBBLIN_SERVICE_METRICS_PREFIX = "GobblinService";
 
   /**
    * Parses a {@link org.apache.gobblin.metrics.MetricReport} from a byte 
array representing a json input.
diff --git 
a/gobblin-service/src/main/java/org/apache/gobblin/service/ServiceMetricNames.java
 
b/gobblin-metrics-libs/gobblin-metrics/src/main/java/org/apache/gobblin/metrics/ServiceMetricNames.java
similarity index 83%
rename from 
gobblin-service/src/main/java/org/apache/gobblin/service/ServiceMetricNames.java
rename to 
gobblin-metrics-libs/gobblin-metrics/src/main/java/org/apache/gobblin/metrics/ServiceMetricNames.java
index f72093b..3a8ecaa 100644
--- 
a/gobblin-service/src/main/java/org/apache/gobblin/service/ServiceMetricNames.java
+++ 
b/gobblin-metrics-libs/gobblin-metrics/src/main/java/org/apache/gobblin/metrics/ServiceMetricNames.java
@@ -14,7 +14,7 @@
  * See the License for the specific language governing permissions and
  * limitations under the License.
  */
-package org.apache.gobblin.service;
+package org.apache.gobblin.metrics;
 
 public class ServiceMetricNames {
   private static final String GOBBLIN_SERVICE_PREFIX = "gobblin.service.";
@@ -31,4 +31,11 @@ public class ServiceMetricNames {
 
   //Job status poll timer
   public static final String JOB_STATUS_POLLED_TIMER = GOBBLIN_SERVICE_PREFIX 
+ "jobStatusPoll.time";
+
+  public static final String CREATE_FLOW_METER = "CreateFlow";
+  public static final String DELETE_FLOW_METER = "DeleteFlow";
+  public static final String RUN_IMMEDIATELY_FLOW_METER = "RunImmediatelyFlow";
+
+  public static final String RUNNING_FLOWS_COUNTER = "RunningFlows";
+  public static final String COMPILED = "Compiled";
 }
diff --git 
a/gobblin-restli/gobblin-flow-config-service/gobblin-flow-config-service-server/src/main/java/org/apache/gobblin/service/FlowConfigResourceLocalHandler.java
 
b/gobblin-restli/gobblin-flow-config-service/gobblin-flow-config-service-server/src/main/java/org/apache/gobblin/service/FlowConfigResourceLocalHandler.java
index a82e760..fcabd63 100644
--- 
a/gobblin-restli/gobblin-flow-config-service/gobblin-flow-config-service-server/src/main/java/org/apache/gobblin/service/FlowConfigResourceLocalHandler.java
+++ 
b/gobblin-restli/gobblin-flow-config-service/gobblin-flow-config-service-server/src/main/java/org/apache/gobblin/service/FlowConfigResourceLocalHandler.java
@@ -23,6 +23,7 @@ import java.util.Properties;
 
 import org.apache.commons.lang.StringUtils;
 
+import com.codahale.metrics.MetricRegistry;
 import com.google.common.collect.Maps;
 import com.linkedin.data.template.StringMap;
 import com.linkedin.restli.common.ComplexResourceKey;
@@ -39,9 +40,16 @@ import lombok.extern.slf4j.Slf4j;
 
 import org.apache.gobblin.config.ConfigBuilder;
 import org.apache.gobblin.configuration.ConfigurationKeys;
+import org.apache.gobblin.instrumented.Instrumented;
+import org.apache.gobblin.metrics.ContextAwareMeter;
+import org.apache.gobblin.metrics.MetricContext;
+import org.apache.gobblin.metrics.ServiceMetricNames;
+import org.apache.gobblin.metrics.reporter.util.MetricReportUtils;
 import org.apache.gobblin.runtime.api.FlowSpec;
 import org.apache.gobblin.runtime.api.SpecNotFoundException;
 import org.apache.gobblin.runtime.spec_catalog.FlowCatalog;
+import org.apache.gobblin.util.ConfigUtils;
+
 
 /**
  * A {@link FlowConfigsResourceHandler} that handles Restli locally.
@@ -51,8 +59,19 @@ public class FlowConfigResourceLocalHandler implements 
FlowConfigsResourceHandle
   public static final Schedule NEVER_RUN_CRON_SCHEDULE = new 
Schedule().setCronSchedule("0 0 0 ? 1 1 2050");
   @Getter
   protected FlowCatalog flowCatalog;
+  protected final ContextAwareMeter createFlow;
+  protected final ContextAwareMeter deleteFlow;
+  protected final ContextAwareMeter runImmediatelyFlow;
+
   public FlowConfigResourceLocalHandler(FlowCatalog flowCatalog) {
     this.flowCatalog = flowCatalog;
+    MetricContext metricContext = 
Instrumented.getMetricContext(ConfigUtils.configToState(ConfigFactory.empty()), 
getClass());
+    this.createFlow = metricContext.contextAwareMeter(
+        MetricRegistry.name(MetricReportUtils.GOBBLIN_SERVICE_METRICS_PREFIX, 
ServiceMetricNames.CREATE_FLOW_METER));
+    this.deleteFlow = metricContext.contextAwareMeter(
+        MetricRegistry.name(MetricReportUtils.GOBBLIN_SERVICE_METRICS_PREFIX, 
ServiceMetricNames.DELETE_FLOW_METER));
+    this.runImmediatelyFlow = metricContext.contextAwareMeter(
+        MetricRegistry.name(MetricReportUtils.GOBBLIN_SERVICE_METRICS_PREFIX, 
ServiceMetricNames.RUN_IMMEDIATELY_FLOW_METER));
   }
 
   /**
@@ -108,6 +127,10 @@ public class FlowConfigResourceLocalHandler implements 
FlowConfigsResourceHandle
    */
   public CreateResponse createFlowConfig(FlowConfig flowConfig, boolean 
triggerListener) throws FlowConfigLoggedException {
     log.info("[GAAS-REST] Create called with flowGroup " + 
flowConfig.getId().getFlowGroup() + " flowName " + 
flowConfig.getId().getFlowName());
+    this.createFlow.mark();
+    if (!flowConfig.hasSchedule() || 
StringUtils.isEmpty(flowConfig.getSchedule().getCronSchedule())) {
+      this.runImmediatelyFlow.mark();
+    }
 
     if (flowConfig.hasExplain()) {
       //Return Error if FlowConfig has explain set. Explain request is only 
valid for v2 FlowConfig.
@@ -171,6 +194,7 @@ public class FlowConfigResourceLocalHandler implements 
FlowConfigsResourceHandle
   public UpdateResponse deleteFlowConfig(FlowId flowId, Properties header, 
boolean triggerListener) throws FlowConfigLoggedException {
 
     log.info("[GAAS-REST] Delete called with flowGroup {} flowName {}", 
flowId.getFlowGroup(), flowId.getFlowName());
+    this.deleteFlow.mark();
     URI flowUri = null;
 
     try {
diff --git 
a/gobblin-restli/gobblin-flow-config-service/gobblin-flow-config-service-server/src/main/java/org/apache/gobblin/service/FlowConfigV2ResourceLocalHandler.java
 
b/gobblin-restli/gobblin-flow-config-service/gobblin-flow-config-service-server/src/main/java/org/apache/gobblin/service/FlowConfigV2ResourceLocalHandler.java
index c0c7fa6..b1e8eeb 100644
--- 
a/gobblin-restli/gobblin-flow-config-service/gobblin-flow-config-service-server/src/main/java/org/apache/gobblin/service/FlowConfigV2ResourceLocalHandler.java
+++ 
b/gobblin-restli/gobblin-flow-config-service/gobblin-flow-config-service-server/src/main/java/org/apache/gobblin/service/FlowConfigV2ResourceLocalHandler.java
@@ -43,6 +43,8 @@ public class FlowConfigV2ResourceLocalHandler extends 
FlowConfigResourceLocalHan
    */
   public CreateKVResponse createFlowConfig(FlowConfig flowConfig, boolean 
triggerListener) throws FlowConfigLoggedException {
     String createLog = "[GAAS-REST] Create called with flowGroup " + 
flowConfig.getId().getFlowGroup() + " flowName " + 
flowConfig.getId().getFlowName();
+    this.createFlow.mark();
+
     if (flowConfig.hasExplain()) {
       createLog += " explain " + Boolean.toString(flowConfig.isExplain());
     }
diff --git 
a/gobblin-service/src/main/java/org/apache/gobblin/service/modules/flow/BaseFlowToJobSpecCompiler.java
 
b/gobblin-service/src/main/java/org/apache/gobblin/service/modules/flow/BaseFlowToJobSpecCompiler.java
index 16bb04c..8374211 100644
--- 
a/gobblin-service/src/main/java/org/apache/gobblin/service/modules/flow/BaseFlowToJobSpecCompiler.java
+++ 
b/gobblin-service/src/main/java/org/apache/gobblin/service/modules/flow/BaseFlowToJobSpecCompiler.java
@@ -56,7 +56,7 @@ import org.apache.gobblin.runtime.job_catalog.FSJobCatalog;
 import org.apache.gobblin.runtime.job_spec.ResolvedJobSpec;
 import org.apache.gobblin.runtime.spec_catalog.AddSpecResponse;
 import org.apache.gobblin.service.ServiceConfigKeys;
-import org.apache.gobblin.service.ServiceMetricNames;
+import org.apache.gobblin.metrics.ServiceMetricNames;
 import org.apache.gobblin.service.modules.flowgraph.Dag;
 import org.apache.gobblin.service.modules.spec.JobExecutionPlan;
 import org.apache.gobblin.util.ConfigUtils;
diff --git 
a/gobblin-service/src/main/java/org/apache/gobblin/service/modules/orchestration/DagManager.java
 
b/gobblin-service/src/main/java/org/apache/gobblin/service/modules/orchestration/DagManager.java
index 5210778..e8f45d5 100644
--- 
a/gobblin-service/src/main/java/org/apache/gobblin/service/modules/orchestration/DagManager.java
+++ 
b/gobblin-service/src/main/java/org/apache/gobblin/service/modules/orchestration/DagManager.java
@@ -33,6 +33,7 @@ import java.util.concurrent.LinkedBlockingDeque;
 import java.util.concurrent.ScheduledExecutorService;
 import java.util.concurrent.TimeUnit;
 
+import com.codahale.metrics.MetricRegistry;
 import com.codahale.metrics.Timer;
 import com.google.common.base.Optional;
 import com.google.common.collect.Lists;
@@ -46,15 +47,17 @@ import lombok.extern.slf4j.Slf4j;
 import org.apache.gobblin.annotation.Alpha;
 import org.apache.gobblin.configuration.ConfigurationKeys;
 import org.apache.gobblin.instrumented.Instrumented;
+import org.apache.gobblin.metrics.ContextAwareCounter;
 import org.apache.gobblin.metrics.MetricContext;
 import org.apache.gobblin.metrics.event.EventSubmitter;
 import org.apache.gobblin.metrics.event.TimingEvent;
+import org.apache.gobblin.metrics.reporter.util.MetricReportUtils;
 import org.apache.gobblin.runtime.api.JobSpec;
 import org.apache.gobblin.runtime.api.Spec;
 import org.apache.gobblin.runtime.api.SpecProducer;
 import org.apache.gobblin.runtime.api.TopologySpec;
 import org.apache.gobblin.service.ExecutionStatus;
-import org.apache.gobblin.service.ServiceMetricNames;
+import org.apache.gobblin.metrics.ServiceMetricNames;
 import org.apache.gobblin.service.modules.flowgraph.Dag;
 import org.apache.gobblin.service.modules.flowgraph.Dag.DagNode;
 import org.apache.gobblin.service.modules.spec.JobExecutionPlan;
@@ -278,7 +281,6 @@ public class DagManager extends AbstractIdleService {
         this.metricContext = 
Instrumented.getMetricContext(ConfigUtils.configToState(ConfigFactory.empty()), 
getClass());
         this.eventSubmitter = Optional.of(new 
EventSubmitter.Builder(this.metricContext, 
"org.apache.gobblin.service").build());
         this.jobStatusPolledTimer = 
Optional.of(this.metricContext.timer(ServiceMetricNames.JOB_STATUS_POLLED_TIMER));
-
       } else {
         this.metricContext = null;
         this.eventSubmitter = Optional.absent();
@@ -463,6 +465,9 @@ public class DagManager extends AbstractIdleService {
         // either successfully or unsuccessfully. To catch any exceptions in 
the job submission, the DagManagerThread
         // blocks (by calling Future#get()) until the submission is completed.
         producer.addSpec(jobSpec).get();
+        if (this.metricContext != null) {
+          getRunningJobsCounter(dagNode).inc();
+        }
 
         if (jobOrchestrationTimer != null) {
           jobOrchestrationTimer.stop(jobMetadata);
@@ -490,6 +495,10 @@ public class DagManager extends AbstractIdleService {
       ExecutionStatus jobStatus = DagManagerUtils.getExecutionStatus(dagNode);
       log.info("Job {} of Dag {} has finished with status {}", jobName, dagId, 
jobStatus.name());
 
+      if (this.metricContext != null) {
+        getRunningJobsCounter(dagNode).dec();
+      }
+
       if (jobStatus == COMPLETE) {
         return submitNext(dagId);
       } else if (jobStatus == FAILED) {
@@ -523,6 +532,14 @@ public class DagManager extends AbstractIdleService {
       return !this.dagToJobs.get(dagId).isEmpty();
     }
 
+    private ContextAwareCounter 
getRunningJobsCounter(DagNode<JobExecutionPlan> dagNode) {
+      return metricContext.contextAwareCounter(
+          MetricRegistry.name(
+              MetricReportUtils.GOBBLIN_SERVICE_METRICS_PREFIX,
+              ServiceMetricNames.RUNNING_FLOWS_COUNTER,
+              dagNode.getValue().getSpecExecutor().getUri().toString()));
+    }
+
     /**
      * Perform clean up. Remove a dag from the dagstore if the dag is complete 
and update internal state.
      */
diff --git 
a/gobblin-service/src/main/java/org/apache/gobblin/service/modules/orchestration/Orchestrator.java
 
b/gobblin-service/src/main/java/org/apache/gobblin/service/modules/orchestration/Orchestrator.java
index 76f5b7f..cda6b52 100644
--- 
a/gobblin-service/src/main/java/org/apache/gobblin/service/modules/orchestration/Orchestrator.java
+++ 
b/gobblin-service/src/main/java/org/apache/gobblin/service/modules/orchestration/Orchestrator.java
@@ -30,6 +30,7 @@ import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 
 import com.codahale.metrics.Meter;
+import com.codahale.metrics.MetricRegistry;
 import com.codahale.metrics.Timer;
 import com.google.common.annotations.VisibleForTesting;
 import com.google.common.base.Optional;
@@ -45,9 +46,11 @@ import org.apache.gobblin.configuration.State;
 import org.apache.gobblin.instrumented.Instrumentable;
 import org.apache.gobblin.instrumented.Instrumented;
 import org.apache.gobblin.metrics.MetricContext;
+import org.apache.gobblin.metrics.ServiceMetricNames;
 import org.apache.gobblin.metrics.Tag;
 import org.apache.gobblin.metrics.event.EventSubmitter;
 import org.apache.gobblin.metrics.event.TimingEvent;
+import org.apache.gobblin.metrics.reporter.util.MetricReportUtils;
 import org.apache.gobblin.runtime.api.FlowSpec;
 import org.apache.gobblin.runtime.api.JobSpec;
 import org.apache.gobblin.runtime.api.Spec;
@@ -57,7 +60,6 @@ import org.apache.gobblin.runtime.api.TopologySpec;
 import org.apache.gobblin.runtime.spec_catalog.AddSpecResponse;
 import org.apache.gobblin.runtime.spec_catalog.TopologyCatalog;
 import org.apache.gobblin.service.ServiceConfigKeys;
-import org.apache.gobblin.service.ServiceMetricNames;
 import org.apache.gobblin.service.modules.flow.SpecCompiler;
 import org.apache.gobblin.service.modules.flowgraph.Dag;
 import org.apache.gobblin.service.modules.spec.JobExecutionPlan;
@@ -83,6 +85,7 @@ public class Orchestrator implements SpecCatalogListener, 
Instrumentable {
   protected final Optional<DagManager> dagManager;
 
   protected final MetricContext metricContext;
+
   protected final Optional<EventSubmitter> eventSubmitter;
   @Getter
   private Optional<Meter> flowOrchestrationSuccessFulMeter;
@@ -230,7 +233,15 @@ public class Orchestrator implements SpecCatalogListener, 
Instrumentable {
       if (!canRun(flowName, flowGroup, allowConcurrentExecution)) {
         _log.warn("Another instance of flowGroup: {}, flowName: {} running; 
Skipping flow execution since "
             + "concurrent executions are disabled for this flow.", flowGroup, 
flowName);
+        // We send a gauge with value 0 signifying that the flow could not be 
compiled because previous execution is already running
+        metricContext.newContextAwareGauge(
+            
MetricRegistry.name(MetricReportUtils.GOBBLIN_SERVICE_METRICS_PREFIX, 
flowGroup, flowName, ServiceMetricNames.COMPILED),
+            () -> 0L);
         return;
+      } else {
+        metricContext.newContextAwareGauge(
+            
MetricRegistry.name(MetricReportUtils.GOBBLIN_SERVICE_METRICS_PREFIX, 
flowGroup, flowName, ServiceMetricNames.COMPILED),
+            () -> 1L);
       }
 
       Dag<JobExecutionPlan> jobExecutionPlanDag = 
specCompiler.compileFlow(spec);

Reply via email to