This is an automated email from the ASF dual-hosted git repository.

suvasude pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/gobblin.git


The following commit(s) were added to refs/heads/master by this push:
     new a445b0d  add a few metrics in GaaS to help debug point of failure 
(#3342)
a445b0d is described below

commit a445b0ddcc38441e8768884f07695ac849dcc7fe
Author: Arjun Singh Bora <[email protected]>
AuthorDate: Mon Aug 2 21:29:58 2021 +0530

    add a few metrics in GaaS to help debug point of failure (#3342)
---
 .../apache/gobblin/metrics/ServiceMetricNames.java |  4 +++-
 .../gobblin/service/SimpleKafkaSpecProducer.java   | 24 ++++++++++++++++++++++
 .../modules/orchestration/Orchestrator.java        |  4 ++++
 .../scheduler/GobblinServiceJobScheduler.java      | 12 +++++++++++
 4 files changed, 43 insertions(+), 1 deletion(-)

diff --git 
a/gobblin-metrics-libs/gobblin-metrics/src/main/java/org/apache/gobblin/metrics/ServiceMetricNames.java
 
b/gobblin-metrics-libs/gobblin-metrics/src/main/java/org/apache/gobblin/metrics/ServiceMetricNames.java
index 7e6bb55..fb34425 100644
--- 
a/gobblin-metrics-libs/gobblin-metrics/src/main/java/org/apache/gobblin/metrics/ServiceMetricNames.java
+++ 
b/gobblin-metrics-libs/gobblin-metrics/src/main/java/org/apache/gobblin/metrics/ServiceMetricNames.java
@@ -42,7 +42,9 @@ public class ServiceMetricNames {
   public static final String RUN_IMMEDIATELY_FLOW_METER = "RunImmediatelyFlow";
   public static final String SUCCESSFUL_FLOW_METER = "SuccessfulFlows";
   public static final String FAILED_FLOW_METER = "FailedFlows";
-
+  public static final String SCHEDULED_FLOW_METER = GOBBLIN_SERVICE_PREFIX + 
".ScheduledFlows";
+  public static final String NON_SCHEDULED_FLOW_METER = GOBBLIN_SERVICE_PREFIX 
+ ".NonScheduledFlows";
+  public static final String SKIPPED_FLOWS = GOBBLIN_SERVICE_PREFIX + 
".SkippedFlows";
   public static final String RUNNING_FLOWS_COUNTER = "RunningFlows";
   public static final String SERVICE_USERS = "ServiceUsers";
   public static final String COMPILED = "Compiled";
diff --git 
a/gobblin-modules/gobblin-service-kafka/src/main/java/org/apache/gobblin/service/SimpleKafkaSpecProducer.java
 
b/gobblin-modules/gobblin-service-kafka/src/main/java/org/apache/gobblin/service/SimpleKafkaSpecProducer.java
index 5f1445f..0548a20 100644
--- 
a/gobblin-modules/gobblin-service-kafka/src/main/java/org/apache/gobblin/service/SimpleKafkaSpecProducer.java
+++ 
b/gobblin-modules/gobblin-service-kafka/src/main/java/org/apache/gobblin/service/SimpleKafkaSpecProducer.java
@@ -28,11 +28,17 @@ import java.util.Properties;
 import org.apache.commons.lang3.reflect.ConstructorUtils;
 import org.slf4j.Logger;
 
+import com.codahale.metrics.Meter;
+import com.codahale.metrics.MetricRegistry;
 import com.google.common.base.Optional;
 import com.google.common.collect.ImmutableMap;
 import com.google.common.collect.Maps;
 import com.typesafe.config.Config;
 
+import org.apache.gobblin.configuration.State;
+import org.apache.gobblin.instrumented.Instrumented;
+import org.apache.gobblin.metrics.MetricContext;
+import org.apache.gobblin.metrics.ServiceMetricNames;
 import org.apache.gobblin.metrics.reporter.util.AvroBinarySerializer;
 import org.apache.gobblin.metrics.reporter.util.AvroSerializer;
 import org.apache.gobblin.metrics.reporter.util.FixedSchemaVersionWriter;
@@ -61,9 +67,19 @@ public class SimpleKafkaSpecProducer implements 
SpecProducer<Spec>, Closeable  {
   private Config _config;
   private final String _kafkaProducerClassName;
 
+  private Meter addSpecMeter;
+  private Meter deleteSpecMeter;
+  private Meter updateSpecMeter;
+  private Meter cancelSpecMeter;
+  private MetricContext metricContext = Instrumented.getMetricContext(new 
State(), getClass());
+
   public SimpleKafkaSpecProducer(Config config, Optional<Logger> log) {
     _kafkaProducerClassName = ConfigUtils.getString(config, 
KAFKA_DATA_WRITER_CLASS_KEY,
         DEFAULT_KAFKA_DATA_WRITER_CLASS);
+    this.addSpecMeter = createMeter("-Add");
+    this.deleteSpecMeter = createMeter("-Delete");
+    this.updateSpecMeter = createMeter("-Update");
+    this.cancelSpecMeter = createMeter("-Cancel");
 
     try {
       _serializer = new AvroBinarySerializer<>(AvroJobSpec.SCHEMA$, new 
FixedSchemaVersionWriter());
@@ -82,11 +98,16 @@ public class SimpleKafkaSpecProducer implements 
SpecProducer<Spec>, Closeable  {
     this(config, Optional.<Logger>absent());
   }
 
+  private Meter createMeter(String suffix) {
+    return 
this.metricContext.meter(MetricRegistry.name(ServiceMetricNames.GOBBLIN_SERVICE_PREFIX,
 getClass().getSimpleName(), suffix));
+  }
+
   @Override
   public Future<?> addSpec(Spec addedSpec) {
     AvroJobSpec avroJobSpec = convertToAvroJobSpec(addedSpec, 
SpecExecutor.Verb.ADD);
 
     log.info("Adding Spec: " + addedSpec + " using Kafka.");
+    this.addSpecMeter.mark();
 
     return getKafkaProducer().write(_serializer.serializeRecord(avroJobSpec), 
new KafkaWriteCallback(avroJobSpec));
   }
@@ -96,6 +117,7 @@ public class SimpleKafkaSpecProducer implements 
SpecProducer<Spec>, Closeable  {
     AvroJobSpec avroJobSpec = convertToAvroJobSpec(updatedSpec, 
SpecExecutor.Verb.UPDATE);
 
     log.info("Updating Spec: " + updatedSpec + " using Kafka.");
+    this.updateSpecMeter.mark();
 
     return getKafkaProducer().write(_serializer.serializeRecord(avroJobSpec), 
new KafkaWriteCallback(avroJobSpec));
   }
@@ -108,6 +130,7 @@ public class SimpleKafkaSpecProducer implements 
SpecProducer<Spec>, Closeable  {
         .setProperties(Maps.fromProperties(headers)).build();
 
     log.info("Deleting Spec: " + deletedSpecURI + " using Kafka.");
+    this.deleteSpecMeter.mark();
 
     return getKafkaProducer().write(_serializer.serializeRecord(avroJobSpec), 
new KafkaWriteCallback(avroJobSpec));
   }
@@ -119,6 +142,7 @@ public class SimpleKafkaSpecProducer implements 
SpecProducer<Spec>, Closeable  {
         .setProperties(Maps.fromProperties(properties)).build();
 
     log.info("Cancelling job: " + deletedSpecURI + " using Kafka.");
+    this.cancelSpecMeter.mark();
 
     return getKafkaProducer().write(_serializer.serializeRecord(avroJobSpec), 
new KafkaWriteCallback(avroJobSpec));
   }
diff --git 
a/gobblin-service/src/main/java/org/apache/gobblin/service/modules/orchestration/Orchestrator.java
 
b/gobblin-service/src/main/java/org/apache/gobblin/service/modules/orchestration/Orchestrator.java
index 635337c..b256ae2 100644
--- 
a/gobblin-service/src/main/java/org/apache/gobblin/service/modules/orchestration/Orchestrator.java
+++ 
b/gobblin-service/src/main/java/org/apache/gobblin/service/modules/orchestration/Orchestrator.java
@@ -96,6 +96,7 @@ public class Orchestrator implements SpecCatalogListener, 
Instrumentable {
   private Optional<Meter> flowOrchestrationFailedMeter;
   @Getter
   private Optional<Timer> flowOrchestrationTimer;
+  private Optional<Meter> skippedFlowsMeter;
   @Setter
   private FlowStatusGenerator flowStatusGenerator;
 
@@ -136,12 +137,14 @@ public class Orchestrator implements SpecCatalogListener, 
Instrumentable {
       this.flowOrchestrationSuccessFulMeter = 
Optional.of(this.metricContext.meter(ServiceMetricNames.FLOW_ORCHESTRATION_SUCCESSFUL_METER));
       this.flowOrchestrationFailedMeter = 
Optional.of(this.metricContext.meter(ServiceMetricNames.FLOW_ORCHESTRATION_FAILED_METER));
       this.flowOrchestrationTimer = 
Optional.of(this.metricContext.timer(ServiceMetricNames.FLOW_ORCHESTRATION_TIMER));
+      this.skippedFlowsMeter = 
Optional.of(metricContext.contextAwareMeter(ServiceMetricNames.SKIPPED_FLOWS));
       this.eventSubmitter = Optional.of(new 
EventSubmitter.Builder(this.metricContext, 
"org.apache.gobblin.service").build());
     } else {
       this.metricContext = null;
       this.flowOrchestrationSuccessFulMeter = Optional.absent();
       this.flowOrchestrationFailedMeter = Optional.absent();
       this.flowOrchestrationTimer = Optional.absent();
+      this.skippedFlowsMeter = Optional.absent();
       this.eventSubmitter = Optional.absent();
     }
     this.flowConcurrencyFlag = ConfigUtils.getBoolean(config, 
ServiceConfigKeys.FLOW_CONCURRENCY_ALLOWED,
@@ -235,6 +238,7 @@ public class Orchestrator implements SpecCatalogListener, 
Instrumentable {
         _log.warn("Another instance of flowGroup: {}, flowName: {} running; 
Skipping flow execution since "
             + "concurrent executions are disabled for this flow.", flowGroup, 
flowName);
         
flowGauges.get(spec.getUri().toString()).setState(CompiledState.SKIPPED);
+        Instrumented.markMeter(this.skippedFlowsMeter);
 
         // Send FLOW_FAILED event
         Map<String, String> flowMetadata = 
TimingEventUtils.getFlowMetadata((FlowSpec) spec);
diff --git 
a/gobblin-service/src/main/java/org/apache/gobblin/service/modules/scheduler/GobblinServiceJobScheduler.java
 
b/gobblin-service/src/main/java/org/apache/gobblin/service/modules/scheduler/GobblinServiceJobScheduler.java
index f83bf7d..d9b778e 100644
--- 
a/gobblin-service/src/main/java/org/apache/gobblin/service/modules/scheduler/GobblinServiceJobScheduler.java
+++ 
b/gobblin-service/src/main/java/org/apache/gobblin/service/modules/scheduler/GobblinServiceJobScheduler.java
@@ -26,6 +26,7 @@ import java.util.Map;
 import java.util.Properties;
 
 import org.apache.commons.lang.StringUtils;
+import org.apache.gobblin.metrics.ContextAwareMeter;
 import org.apache.helix.HelixManager;
 import org.quartz.DisallowConcurrentExecution;
 import org.quartz.InterruptableJob;
@@ -50,6 +51,9 @@ import lombok.extern.slf4j.Slf4j;
 
 import org.apache.gobblin.annotation.Alpha;
 import org.apache.gobblin.configuration.ConfigurationKeys;
+import org.apache.gobblin.instrumented.Instrumented;
+import org.apache.gobblin.metrics.MetricContext;
+import org.apache.gobblin.metrics.ServiceMetricNames;
 import org.apache.gobblin.runtime.JobException;
 import org.apache.gobblin.runtime.api.FlowSpec;
 import org.apache.gobblin.runtime.api.Spec;
@@ -97,6 +101,10 @@ public class GobblinServiceJobScheduler extends 
JobScheduler implements SpecCata
   @Getter
   private volatile boolean isActive;
   private String serviceName;
+  private static final MetricContext metricContext = 
Instrumented.getMetricContext(new org.apache.gobblin.configuration.State(),
+      GobblinServiceJobScheduler.class);
+  private static final ContextAwareMeter scheduledFlows = 
metricContext.contextAwareMeter(ServiceMetricNames.SCHEDULED_FLOW_METER);
+  private static final ContextAwareMeter nonScheduledFlows = 
metricContext.contextAwareMeter(ServiceMetricNames.NON_SCHEDULED_FLOW_METER);;
 
   /**
    * If current instances is nominated as a handler for DR traffic from down 
GaaS-Instance.
@@ -435,6 +443,8 @@ public class GobblinServiceJobScheduler extends 
JobScheduler implements SpecCata
         jobScheduler.runJob(jobProps, jobListener);
       } catch (Throwable t) {
         throw new JobExecutionException(t);
+      } finally {
+        scheduledFlows.mark();
       }
     }
 
@@ -481,6 +491,8 @@ public class GobblinServiceJobScheduler extends 
JobScheduler implements SpecCata
         _log.error("Failed to run job " + 
this.jobConfig.getProperty(ConfigurationKeys.JOB_NAME_KEY), je);
       } catch (InterruptedException e) {
       _log.error("Failed to delete the spec " + specUri, e);
+      } finally {
+        nonScheduledFlows.mark();
       }
     }
   }

Reply via email to