This is an automated email from the ASF dual-hosted git repository.
suvasude pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/gobblin.git
The following commit(s) were added to refs/heads/master by this push:
new a445b0d add a few metrics in GaaS to help debug point of failure
(#3342)
a445b0d is described below
commit a445b0ddcc38441e8768884f07695ac849dcc7fe
Author: Arjun Singh Bora <[email protected]>
AuthorDate: Mon Aug 2 21:29:58 2021 +0530
add a few metrics in GaaS to help debug point of failure (#3342)
---
.../apache/gobblin/metrics/ServiceMetricNames.java | 4 +++-
.../gobblin/service/SimpleKafkaSpecProducer.java | 24 ++++++++++++++++++++++
.../modules/orchestration/Orchestrator.java | 4 ++++
.../scheduler/GobblinServiceJobScheduler.java | 12 +++++++++++
4 files changed, 43 insertions(+), 1 deletion(-)
diff --git
a/gobblin-metrics-libs/gobblin-metrics/src/main/java/org/apache/gobblin/metrics/ServiceMetricNames.java
b/gobblin-metrics-libs/gobblin-metrics/src/main/java/org/apache/gobblin/metrics/ServiceMetricNames.java
index 7e6bb55..fb34425 100644
---
a/gobblin-metrics-libs/gobblin-metrics/src/main/java/org/apache/gobblin/metrics/ServiceMetricNames.java
+++
b/gobblin-metrics-libs/gobblin-metrics/src/main/java/org/apache/gobblin/metrics/ServiceMetricNames.java
@@ -42,7 +42,9 @@ public class ServiceMetricNames {
public static final String RUN_IMMEDIATELY_FLOW_METER = "RunImmediatelyFlow";
public static final String SUCCESSFUL_FLOW_METER = "SuccessfulFlows";
public static final String FAILED_FLOW_METER = "FailedFlows";
-
+ public static final String SCHEDULED_FLOW_METER = GOBBLIN_SERVICE_PREFIX +
".ScheduledFlows";
+ public static final String NON_SCHEDULED_FLOW_METER = GOBBLIN_SERVICE_PREFIX
+ ".NonScheduledFlows";
+ public static final String SKIPPED_FLOWS = GOBBLIN_SERVICE_PREFIX +
".SkippedFlows";
public static final String RUNNING_FLOWS_COUNTER = "RunningFlows";
public static final String SERVICE_USERS = "ServiceUsers";
public static final String COMPILED = "Compiled";
diff --git
a/gobblin-modules/gobblin-service-kafka/src/main/java/org/apache/gobblin/service/SimpleKafkaSpecProducer.java
b/gobblin-modules/gobblin-service-kafka/src/main/java/org/apache/gobblin/service/SimpleKafkaSpecProducer.java
index 5f1445f..0548a20 100644
---
a/gobblin-modules/gobblin-service-kafka/src/main/java/org/apache/gobblin/service/SimpleKafkaSpecProducer.java
+++
b/gobblin-modules/gobblin-service-kafka/src/main/java/org/apache/gobblin/service/SimpleKafkaSpecProducer.java
@@ -28,11 +28,17 @@ import java.util.Properties;
import org.apache.commons.lang3.reflect.ConstructorUtils;
import org.slf4j.Logger;
+import com.codahale.metrics.Meter;
+import com.codahale.metrics.MetricRegistry;
import com.google.common.base.Optional;
import com.google.common.collect.ImmutableMap;
import com.google.common.collect.Maps;
import com.typesafe.config.Config;
+import org.apache.gobblin.configuration.State;
+import org.apache.gobblin.instrumented.Instrumented;
+import org.apache.gobblin.metrics.MetricContext;
+import org.apache.gobblin.metrics.ServiceMetricNames;
import org.apache.gobblin.metrics.reporter.util.AvroBinarySerializer;
import org.apache.gobblin.metrics.reporter.util.AvroSerializer;
import org.apache.gobblin.metrics.reporter.util.FixedSchemaVersionWriter;
@@ -61,9 +67,19 @@ public class SimpleKafkaSpecProducer implements
SpecProducer<Spec>, Closeable {
private Config _config;
private final String _kafkaProducerClassName;
+ private Meter addSpecMeter;
+ private Meter deleteSpecMeter;
+ private Meter updateSpecMeter;
+ private Meter cancelSpecMeter;
+ private MetricContext metricContext = Instrumented.getMetricContext(new
State(), getClass());
+
public SimpleKafkaSpecProducer(Config config, Optional<Logger> log) {
_kafkaProducerClassName = ConfigUtils.getString(config,
KAFKA_DATA_WRITER_CLASS_KEY,
DEFAULT_KAFKA_DATA_WRITER_CLASS);
+ this.addSpecMeter = createMeter("-Add");
+ this.deleteSpecMeter = createMeter("-Delete");
+ this.updateSpecMeter = createMeter("-Update");
+ this.cancelSpecMeter = createMeter("-Cancel");
try {
_serializer = new AvroBinarySerializer<>(AvroJobSpec.SCHEMA$, new
FixedSchemaVersionWriter());
@@ -82,11 +98,16 @@ public class SimpleKafkaSpecProducer implements
SpecProducer<Spec>, Closeable {
this(config, Optional.<Logger>absent());
}
+ private Meter createMeter(String suffix) {
+ return
this.metricContext.meter(MetricRegistry.name(ServiceMetricNames.GOBBLIN_SERVICE_PREFIX,
getClass().getSimpleName(), suffix));
+ }
+
@Override
public Future<?> addSpec(Spec addedSpec) {
AvroJobSpec avroJobSpec = convertToAvroJobSpec(addedSpec,
SpecExecutor.Verb.ADD);
log.info("Adding Spec: " + addedSpec + " using Kafka.");
+ this.addSpecMeter.mark();
return getKafkaProducer().write(_serializer.serializeRecord(avroJobSpec),
new KafkaWriteCallback(avroJobSpec));
}
@@ -96,6 +117,7 @@ public class SimpleKafkaSpecProducer implements
SpecProducer<Spec>, Closeable {
AvroJobSpec avroJobSpec = convertToAvroJobSpec(updatedSpec,
SpecExecutor.Verb.UPDATE);
log.info("Updating Spec: " + updatedSpec + " using Kafka.");
+ this.updateSpecMeter.mark();
return getKafkaProducer().write(_serializer.serializeRecord(avroJobSpec),
new KafkaWriteCallback(avroJobSpec));
}
@@ -108,6 +130,7 @@ public class SimpleKafkaSpecProducer implements
SpecProducer<Spec>, Closeable {
.setProperties(Maps.fromProperties(headers)).build();
log.info("Deleting Spec: " + deletedSpecURI + " using Kafka.");
+ this.deleteSpecMeter.mark();
return getKafkaProducer().write(_serializer.serializeRecord(avroJobSpec),
new KafkaWriteCallback(avroJobSpec));
}
@@ -119,6 +142,7 @@ public class SimpleKafkaSpecProducer implements
SpecProducer<Spec>, Closeable {
.setProperties(Maps.fromProperties(properties)).build();
log.info("Cancelling job: " + deletedSpecURI + " using Kafka.");
+ this.cancelSpecMeter.mark();
return getKafkaProducer().write(_serializer.serializeRecord(avroJobSpec),
new KafkaWriteCallback(avroJobSpec));
}
diff --git
a/gobblin-service/src/main/java/org/apache/gobblin/service/modules/orchestration/Orchestrator.java
b/gobblin-service/src/main/java/org/apache/gobblin/service/modules/orchestration/Orchestrator.java
index 635337c..b256ae2 100644
---
a/gobblin-service/src/main/java/org/apache/gobblin/service/modules/orchestration/Orchestrator.java
+++
b/gobblin-service/src/main/java/org/apache/gobblin/service/modules/orchestration/Orchestrator.java
@@ -96,6 +96,7 @@ public class Orchestrator implements SpecCatalogListener,
Instrumentable {
private Optional<Meter> flowOrchestrationFailedMeter;
@Getter
private Optional<Timer> flowOrchestrationTimer;
+ private Optional<Meter> skippedFlowsMeter;
@Setter
private FlowStatusGenerator flowStatusGenerator;
@@ -136,12 +137,14 @@ public class Orchestrator implements SpecCatalogListener,
Instrumentable {
this.flowOrchestrationSuccessFulMeter =
Optional.of(this.metricContext.meter(ServiceMetricNames.FLOW_ORCHESTRATION_SUCCESSFUL_METER));
this.flowOrchestrationFailedMeter =
Optional.of(this.metricContext.meter(ServiceMetricNames.FLOW_ORCHESTRATION_FAILED_METER));
this.flowOrchestrationTimer =
Optional.of(this.metricContext.timer(ServiceMetricNames.FLOW_ORCHESTRATION_TIMER));
+ this.skippedFlowsMeter =
Optional.of(metricContext.contextAwareMeter(ServiceMetricNames.SKIPPED_FLOWS));
this.eventSubmitter = Optional.of(new
EventSubmitter.Builder(this.metricContext,
"org.apache.gobblin.service").build());
} else {
this.metricContext = null;
this.flowOrchestrationSuccessFulMeter = Optional.absent();
this.flowOrchestrationFailedMeter = Optional.absent();
this.flowOrchestrationTimer = Optional.absent();
+ this.skippedFlowsMeter = Optional.absent();
this.eventSubmitter = Optional.absent();
}
this.flowConcurrencyFlag = ConfigUtils.getBoolean(config,
ServiceConfigKeys.FLOW_CONCURRENCY_ALLOWED,
@@ -235,6 +238,7 @@ public class Orchestrator implements SpecCatalogListener,
Instrumentable {
_log.warn("Another instance of flowGroup: {}, flowName: {} running;
Skipping flow execution since "
+ "concurrent executions are disabled for this flow.", flowGroup,
flowName);
flowGauges.get(spec.getUri().toString()).setState(CompiledState.SKIPPED);
+ Instrumented.markMeter(this.skippedFlowsMeter);
// Send FLOW_FAILED event
Map<String, String> flowMetadata =
TimingEventUtils.getFlowMetadata((FlowSpec) spec);
diff --git
a/gobblin-service/src/main/java/org/apache/gobblin/service/modules/scheduler/GobblinServiceJobScheduler.java
b/gobblin-service/src/main/java/org/apache/gobblin/service/modules/scheduler/GobblinServiceJobScheduler.java
index f83bf7d..d9b778e 100644
---
a/gobblin-service/src/main/java/org/apache/gobblin/service/modules/scheduler/GobblinServiceJobScheduler.java
+++
b/gobblin-service/src/main/java/org/apache/gobblin/service/modules/scheduler/GobblinServiceJobScheduler.java
@@ -26,6 +26,7 @@ import java.util.Map;
import java.util.Properties;
import org.apache.commons.lang.StringUtils;
+import org.apache.gobblin.metrics.ContextAwareMeter;
import org.apache.helix.HelixManager;
import org.quartz.DisallowConcurrentExecution;
import org.quartz.InterruptableJob;
@@ -50,6 +51,9 @@ import lombok.extern.slf4j.Slf4j;
import org.apache.gobblin.annotation.Alpha;
import org.apache.gobblin.configuration.ConfigurationKeys;
+import org.apache.gobblin.instrumented.Instrumented;
+import org.apache.gobblin.metrics.MetricContext;
+import org.apache.gobblin.metrics.ServiceMetricNames;
import org.apache.gobblin.runtime.JobException;
import org.apache.gobblin.runtime.api.FlowSpec;
import org.apache.gobblin.runtime.api.Spec;
@@ -97,6 +101,10 @@ public class GobblinServiceJobScheduler extends
JobScheduler implements SpecCata
@Getter
private volatile boolean isActive;
private String serviceName;
+ private static final MetricContext metricContext =
Instrumented.getMetricContext(new org.apache.gobblin.configuration.State(),
+ GobblinServiceJobScheduler.class);
+ private static final ContextAwareMeter scheduledFlows =
metricContext.contextAwareMeter(ServiceMetricNames.SCHEDULED_FLOW_METER);
+ private static final ContextAwareMeter nonScheduledFlows =
metricContext.contextAwareMeter(ServiceMetricNames.NON_SCHEDULED_FLOW_METER);;
/**
* If current instances is nominated as a handler for DR traffic from down
GaaS-Instance.
@@ -435,6 +443,8 @@ public class GobblinServiceJobScheduler extends
JobScheduler implements SpecCata
jobScheduler.runJob(jobProps, jobListener);
} catch (Throwable t) {
throw new JobExecutionException(t);
+ } finally {
+ scheduledFlows.mark();
}
}
@@ -481,6 +491,8 @@ public class GobblinServiceJobScheduler extends
JobScheduler implements SpecCata
_log.error("Failed to run job " +
this.jobConfig.getProperty(ConfigurationKeys.JOB_NAME_KEY), je);
} catch (InterruptedException e) {
_log.error("Failed to delete the spec " + specUri, e);
+ } finally {
+ nonScheduledFlows.mark();
}
}
}