This is an automated email from the ASF dual-hosted git repository.
wlo pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/gobblin.git
The following commit(s) were added to refs/heads/master by this push:
new 2d3095a53 [GOBBLIN-1793] Add metrics to measure and isolate bottleneck
for init… (#3652)
2d3095a53 is described below
commit 2d3095a5366f7e3adf63579c8c237854d5630990
Author: umustafi <[email protected]>
AuthorDate: Fri Mar 3 18:42:24 2023 -0800
[GOBBLIN-1793] Add metrics to measure and isolate bottleneck for init…
(#3652)
* [GOBBLIN-1793] Add metrics to measure and isolate bottleneck for
initializing scheduler
* remove unecessary checks
* convert millsecond measurement to nanosecond
---------
Co-authored-by: Urmi Mustafi <[email protected]>
---
.../gobblin/runtime/metrics/RuntimeMetrics.java | 15 ++++++-
.../scheduler/GobblinServiceJobScheduler.java | 49 +++++++++++++++++-----
2 files changed, 51 insertions(+), 13 deletions(-)
diff --git
a/gobblin-runtime/src/main/java/org/apache/gobblin/runtime/metrics/RuntimeMetrics.java
b/gobblin-runtime/src/main/java/org/apache/gobblin/runtime/metrics/RuntimeMetrics.java
index b24ef000d..3ad15c933 100644
---
a/gobblin-runtime/src/main/java/org/apache/gobblin/runtime/metrics/RuntimeMetrics.java
+++
b/gobblin-runtime/src/main/java/org/apache/gobblin/runtime/metrics/RuntimeMetrics.java
@@ -54,9 +54,20 @@ public class RuntimeMetrics {
public static final String
GOBBLIN_MYSQL_QUOTA_MANAGER_QUOTA_REQUESTS_EXCEEDED =
ServiceMetricNames.GOBBLIN_SERVICE_PREFIX +
"gobblin.mysql.quota.manager.quotaRequests.exceeded";
public static final String GOBBLIN_MYSQL_QUOTA_MANAGER_TIME_TO_CHECK_QUOTA =
ServiceMetricNames.GOBBLIN_SERVICE_PREFIX +
"gobblin.mysql.quota.manager.time.to.check.quota";
- public static final String
GOBBLIN_JOB_SCHEDULER_AVERAGE_GET_SPEC_SPEED_WHILE_LOADING_ALL_SPECS_MILLIS =
ServiceMetricNames.GOBBLIN_SERVICE_PREFIX +
".jobScheduler.getSpecSpeedDuringStartupAvgMillis";
+ // The following metrics are used to identify the bottlenecks for
initializing the job scheduler
+ public static final String
GOBBLIN_JOB_SCHEDULER_AVERAGE_GET_SPEC_SPEED_WHILE_LOADING_ALL_SPECS_NANOS =
ServiceMetricNames.GOBBLIN_SERVICE_PREFIX +
".jobScheduler.getSpecSpeedDuringStartupAvgNanos";
public static final String GOBBLIN_JOB_SCHEDULER_LOAD_SPECS_BATCH_SIZE =
ServiceMetricNames.GOBBLIN_SERVICE_PREFIX + ".jobScheduler.loadSpecBatchSize";
- public static final String
GOBBLIN_JOB_SCHEDULER_TIME_TO_INITIALIZE_SCHEDULER_MILLIS =
ServiceMetricNames.GOBBLIN_SERVICE_PREFIX +
".jobScheduler.timeToInitializeSchedulerMillis";
+ public static final String
+ GOBBLIN_JOB_SCHEDULER_TIME_TO_INITIALIZE_SCHEDULER_NANOS =
ServiceMetricNames.GOBBLIN_SERVICE_PREFIX +
".jobScheduler.timeToInitializeSchedulerNanos";
+ public static final String
+ GOBBLIN_JOB_SCHEDULER_TIME_TO_OBTAIN_SPEC_URIS_NANOS =
ServiceMetricNames.GOBBLIN_SERVICE_PREFIX +
".jobScheduler.timeToObtainSpecUrisNanos";
+ public static final String
+ GOBBLIN_JOB_SCHEDULER_INDIVIDUAL_GET_SPEC_SPEED_NANOS =
ServiceMetricNames.GOBBLIN_SERVICE_PREFIX +
".jobScheduler.individualGetSpecSpeedNanos";
+ public static final String GOBBLIN_JOB_SCHEDULER_ADD_SPEC_TIME_NANOS =
ServiceMetricNames.GOBBLIN_SERVICE_PREFIX +
".jobScheduler.totalAddSpecTimeNanos";
+ public static final String
+ GOBBLIN_JOB_SCHEDULER_FLOW_COMPILATION_TIME_NANOS =
ServiceMetricNames.GOBBLIN_SERVICE_PREFIX +
".jobScheduler.specCompilationTimeNanos";
+ public static final String
+ GOBBLIN_JOB_SCHEDULER_TIME_TO_SCHEDULE_ONE_JOB_NANOS =
ServiceMetricNames.GOBBLIN_SERVICE_PREFIX +
".jobScheduler.timeToScheduleOneJobNanos";
// Metadata keys
public static final String TOPIC = "topic";
public static final String GROUP_ID = "groupId";
diff --git
a/gobblin-service/src/main/java/org/apache/gobblin/service/modules/scheduler/GobblinServiceJobScheduler.java
b/gobblin-service/src/main/java/org/apache/gobblin/service/modules/scheduler/GobblinServiceJobScheduler.java
index 54b9a6a26..8c64ed129 100644
---
a/gobblin-service/src/main/java/org/apache/gobblin/service/modules/scheduler/GobblinServiceJobScheduler.java
+++
b/gobblin-service/src/main/java/org/apache/gobblin/service/modules/scheduler/GobblinServiceJobScheduler.java
@@ -113,9 +113,19 @@ public class GobblinServiceJobScheduler extends
JobScheduler implements SpecCata
private String serviceName;
private volatile Long averageGetSpecTimeValue = -1L;
private volatile Long timeToInitializeSchedulerValue = -1L;
- private final ContextAwareGauge averageGetSpecTimeMillis =
metricContext.newContextAwareGauge(RuntimeMetrics.GOBBLIN_JOB_SCHEDULER_AVERAGE_GET_SPEC_SPEED_WHILE_LOADING_ALL_SPECS_MILLIS,
() -> this.averageGetSpecTimeValue);;
+ private volatile Long timeToObtainSpecUrisNanosValue = -1L;
+ private volatile Long individualGetSpecSpeedNanosValue = -1L;
+ private volatile Long addSpecTimeNanosValue = -1L;
+ private volatile Long flowCompilationTimeNanosValue = -1L;
+ private volatile Long timeToScheduleOneJobValue = -1L;
+ private final ContextAwareGauge averageGetSpecTimeNanos =
metricContext.newContextAwareGauge(RuntimeMetrics.GOBBLIN_JOB_SCHEDULER_AVERAGE_GET_SPEC_SPEED_WHILE_LOADING_ALL_SPECS_NANOS,
() -> this.averageGetSpecTimeValue);;
private final ContextAwareGauge batchSize =
metricContext.newContextAwareGauge(RuntimeMetrics.GOBBLIN_JOB_SCHEDULER_LOAD_SPECS_BATCH_SIZE,
() -> this.loadSpecsBatchSize);
- private final ContextAwareGauge timeToInitalizeSchedulerMillis =
metricContext.newContextAwareGauge(RuntimeMetrics.GOBBLIN_JOB_SCHEDULER_TIME_TO_INITIALIZE_SCHEDULER_MILLIS,
() -> this.timeToInitializeSchedulerValue);
+ private final ContextAwareGauge timeToInitalizeSchedulerNanos =
metricContext.newContextAwareGauge(RuntimeMetrics.GOBBLIN_JOB_SCHEDULER_TIME_TO_INITIALIZE_SCHEDULER_NANOS,
() -> this.timeToInitializeSchedulerValue);
+ private final ContextAwareGauge timeToObtainSpecUrisNanos =
metricContext.newContextAwareGauge(RuntimeMetrics.GOBBLIN_JOB_SCHEDULER_TIME_TO_OBTAIN_SPEC_URIS_NANOS,
() -> timeToObtainSpecUrisNanosValue);
+ private final ContextAwareGauge individualGetSpecSpeedNanos =
metricContext.newContextAwareGauge(RuntimeMetrics.GOBBLIN_JOB_SCHEDULER_INDIVIDUAL_GET_SPEC_SPEED_NANOS,
() -> individualGetSpecSpeedNanosValue);
+ private final ContextAwareGauge addSpecTimeNanos =
metricContext.newContextAwareGauge(RuntimeMetrics.GOBBLIN_JOB_SCHEDULER_ADD_SPEC_TIME_NANOS,
() -> addSpecTimeNanosValue);
+ private final ContextAwareGauge flowCompilationTimeNanos =
metricContext.newContextAwareGauge(RuntimeMetrics.GOBBLIN_JOB_SCHEDULER_FLOW_COMPILATION_TIME_NANOS,
() -> flowCompilationTimeNanosValue);
+ private final ContextAwareGauge timeToScheduleOneJob =
metricContext.newContextAwareGauge(RuntimeMetrics.GOBBLIN_JOB_SCHEDULER_TIME_TO_SCHEDULE_ONE_JOB_NANOS,
() -> timeToScheduleOneJobValue);
private static final MetricContext metricContext =
Instrumented.getMetricContext(new org.apache.gobblin.configuration.State(),
GobblinServiceJobScheduler.class);
private static final ContextAwareMeter scheduledFlows =
metricContext.contextAwareMeter(ServiceMetricNames.SCHEDULED_FLOW_METER);
@@ -123,7 +133,7 @@ public class GobblinServiceJobScheduler extends
JobScheduler implements SpecCata
/**
* If current instances is nominated as a handler for DR traffic from down
GaaS-Instance.
- * Note this is, currently, different from leadership change/fail-over
handling, where the traffice could come
+ * Note this is, currently, different from leadership change/fail-over
handling, where the traffic could come
* from GaaS instance out of current GaaS Cluster:
* e.g. There are multi-datacenter deployment of GaaS Cluster.
Intra-datacenter fail-over could be handled by
* leadership change mechanism, while inter-datacenter fail-over would be
handled by DR handling mechanism.
@@ -158,11 +168,16 @@ public class GobblinServiceJobScheduler extends
JobScheduler implements SpecCata
this.quotaManager = quotaManager;
// Check that these metrics do not exist before adding, mainly for testing
purpose which creates multiple instances
// of the scheduler. If one metric exists, then the others should as well.
- MetricFilter filter =
MetricFilter.contains(RuntimeMetrics.GOBBLIN_JOB_SCHEDULER_AVERAGE_GET_SPEC_SPEED_WHILE_LOADING_ALL_SPECS_MILLIS);
+ MetricFilter filter =
MetricFilter.contains(RuntimeMetrics.GOBBLIN_JOB_SCHEDULER_AVERAGE_GET_SPEC_SPEED_WHILE_LOADING_ALL_SPECS_NANOS);
if (metricContext.getGauges(filter).isEmpty()) {
- metricContext.register(this.averageGetSpecTimeMillis);
+ metricContext.register(this.averageGetSpecTimeNanos);
metricContext.register(this.batchSize);
- metricContext.register(timeToInitalizeSchedulerMillis);
+ metricContext.register(this.timeToInitalizeSchedulerNanos);
+ metricContext.register(this.timeToObtainSpecUrisNanos);
+ metricContext.register(this.individualGetSpecSpeedNanos);
+ metricContext.register(this.addSpecTimeNanos);
+ metricContext.register(this.flowCompilationTimeNanos);
+ metricContext.register(this.timeToScheduleOneJob);
}
}
@@ -237,7 +252,8 @@ public class GobblinServiceJobScheduler extends
JobScheduler implements SpecCata
*/
private void scheduleSpecsFromCatalog() {
int numSpecs = this.flowCatalog.get().getSize();
- long startTime = System.currentTimeMillis();
+ _log.info("Scheduling specs from catalog: {} flows to schedule", numSpecs);
+ long startTime = System.nanoTime();
Iterator<URI> uriIterator;
HashSet<URI> urisLeftToSchedule = new HashSet<>();
try {
@@ -248,6 +264,7 @@ public class GobblinServiceJobScheduler extends
JobScheduler implements SpecCata
} catch (IOException e) {
throw new RuntimeException(e);
}
+ this.timeToObtainSpecUrisNanosValue = System.nanoTime() - startTime;
try {
// If current instances nominated as DR handler, will take additional
URIS from FlowCatalog.
@@ -266,10 +283,10 @@ public class GobblinServiceJobScheduler extends
JobScheduler implements SpecCata
long batchGetEndTime;
while (startOffset < numSpecs) {
- batchGetStartTime = System.currentTimeMillis();
+ batchGetStartTime = System.nanoTime();
Collection<Spec> batchOfSpecs =
this.flowCatalog.get().getSpecsPaginated(startOffset, this.loadSpecsBatchSize);
Iterator<Spec> batchOfSpecsIterator = batchOfSpecs.iterator();
- batchGetEndTime = System.currentTimeMillis();
+ batchGetEndTime = System.nanoTime();
while (batchOfSpecsIterator.hasNext()) {
Spec spec = batchOfSpecsIterator.next();
@@ -289,10 +306,13 @@ public class GobblinServiceJobScheduler extends
JobScheduler implements SpecCata
// Ensure we did not miss any specs due to ordering changing
(deletions/insertions) while loading
Iterator<URI> urisLeft = urisLeftToSchedule.iterator();
+ long individualGetSpecStartTime;
while (urisLeft.hasNext()) {
URI uri = urisLeft.next();
try {
+ individualGetSpecStartTime = System.nanoTime();
Spec spec = this.flowCatalog.get().getSpecWrapper(uri);
+ this.individualGetSpecSpeedNanosValue = System.nanoTime() -
individualGetSpecStartTime;
addSpecHelperMethod(spec);
} catch (Exception e) {
// If there is an uncaught error thrown during compilation, log it
and continue adding flows
@@ -302,7 +322,7 @@ public class GobblinServiceJobScheduler extends
JobScheduler implements SpecCata
}
this.flowCatalog.get().getMetrics().updateGetSpecTime(startTime);
- this.timeToInitializeSchedulerValue = System.currentTimeMillis() -
startTime;
+ this.timeToInitializeSchedulerValue = System.nanoTime() - startTime;
}
/**
@@ -365,6 +385,7 @@ public class GobblinServiceJobScheduler extends
JobScheduler implements SpecCata
*/
@Override
public AddSpecResponse onAddSpec(Spec addedSpec) {
+ long startTime = System.nanoTime();
if (this.helixManager.isPresent() &&
!this.helixManager.get().isConnected()) {
// Specs in store will be notified when Scheduler is added as listener
to FlowCatalog, so ignore
// .. Specs if in cluster mode and Helix is not yet initialized
@@ -384,8 +405,10 @@ public class GobblinServiceJobScheduler extends
JobScheduler implements SpecCata
boolean isExplain = flowSpec.isExplain();
String response = null;
+ long compileStartTime = System.nanoTime();
// always try to compile the flow to verify if it is compilable
Dag<JobExecutionPlan> dag =
this.orchestrator.getSpecCompiler().compileFlow(flowSpec);
+ this.flowCompilationTimeNanosValue = System.nanoTime() - compileStartTime;
// If dag is null then a compilation error has occurred
if (dag != null && !dag.isEmpty()) {
response = dag.toString();
@@ -400,7 +423,6 @@ public class GobblinServiceJobScheduler extends
JobScheduler implements SpecCata
addedSpec, isExplain, compileSuccess, this.isActive);
return new AddSpecResponse<>(response);
}
-
// Check quota limits against adhoc flows before saving the schedule
// In warm standby mode, this quota check will happen on restli API layer
when we accept the flow
if (!this.warmStandbyEnabled &&
!jobConfig.containsKey(ConfigurationKeys.JOB_SCHEDULE_KEY)) {
@@ -430,6 +452,7 @@ public class GobblinServiceJobScheduler extends
JobScheduler implements SpecCata
||
(this.lastUpdatedTimeForFlowSpec.get(uriString).equals(modificationTime) &&
!isRunImmediately)) {
_log.warn("Ignoring the spec {} modified at time {} because we have a
more updated version from time {}",
addedSpec,
modificationTime,this.lastUpdatedTimeForFlowSpec.get(uriString));
+ this.addSpecTimeNanosValue = System.nanoTime() - startTime;
return new AddSpecResponse(response);
}
}
@@ -441,11 +464,14 @@ public class GobblinServiceJobScheduler extends
JobScheduler implements SpecCata
if (jobConfig.containsKey(ConfigurationKeys.JOB_SCHEDULE_KEY)) {
_log.info("{} Scheduling flow spec: {} ", this.serviceName, addedSpec);
try {
+ long scheduleStartTime = System.nanoTime();
scheduleJob(jobConfig, null);
+ this.timeToScheduleOneJobValue = System.nanoTime() - scheduleStartTime;
} catch (JobException je) {
_log.error("{} Failed to schedule or run FlowSpec {}", serviceName,
addedSpec, je);
this.scheduledFlowSpecs.remove(addedSpec.getUri().toString());
this.lastUpdatedTimeForFlowSpec.remove(flowSpecUri.toString());
+ this.addSpecTimeNanosValue = System.nanoTime() - startTime;
return null;
}
if (PropertiesUtils.getPropAsBoolean(jobConfig,
ConfigurationKeys.FLOW_RUN_IMMEDIATELY, "false")) {
@@ -457,6 +483,7 @@ public class GobblinServiceJobScheduler extends
JobScheduler implements SpecCata
this.jobExecutor.execute(new NonScheduledJobRunner(flowSpecUri, true,
jobConfig, null));
}
+ this.addSpecTimeNanosValue = System.nanoTime() - startTime;
return new AddSpecResponse<>(response);
}