This is an automated email from the ASF dual-hosted git repository.
zihanli58 pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/gobblin.git
The following commit(s) were added to refs/heads/master by this push:
new 35f16cb44 [GOBBLIN-1672] Refactor metrics from DagManager into its own
class, add metrics per … (#3532)
35f16cb44 is described below
commit 35f16cb44d58dc5eca102e3633fcca2c9722bfaf
Author: William Lo <[email protected]>
AuthorDate: Tue Aug 9 15:02:31 2022 -0700
[GOBBLIN-1672] Refactor metrics from DagManager into its own class, add
metrics per … (#3532)
* Refactor metrics from DagManager into its own class, add metrics per
executor for SUCCESS, FAILED, SLA_EXCEEDED, START_SLA_EXCEEDED
* Address review, fix flow gauge for failed flows
---
.../apache/gobblin/metrics/ServiceMetricNames.java | 2 +
.../service/modules/orchestration/DagManager.java | 200 ++++------------
.../modules/orchestration/DagManagerMetrics.java | 251 +++++++++++++++++++++
.../modules/orchestration/DagManagerTest.java | 182 ++++++++++++++-
.../modules/orchestration/TestServiceMetrics.java | 2 +-
5 files changed, 468 insertions(+), 169 deletions(-)
diff --git
a/gobblin-metrics-libs/gobblin-metrics/src/main/java/org/apache/gobblin/metrics/ServiceMetricNames.java
b/gobblin-metrics-libs/gobblin-metrics/src/main/java/org/apache/gobblin/metrics/ServiceMetricNames.java
index 3bd4083a1..eb8ff896c 100644
---
a/gobblin-metrics-libs/gobblin-metrics/src/main/java/org/apache/gobblin/metrics/ServiceMetricNames.java
+++
b/gobblin-metrics-libs/gobblin-metrics/src/main/java/org/apache/gobblin/metrics/ServiceMetricNames.java
@@ -42,6 +42,7 @@ public class ServiceMetricNames {
public static final String RUN_IMMEDIATELY_FLOW_METER = "RunImmediatelyFlow";
public static final String SUCCESSFUL_FLOW_METER = "SuccessfulFlows";
public static final String START_SLA_EXCEEDED_FLOWS_METER =
"StartSLAExceededFlows";
+ public static final String SLA_EXCEEDED_FLOWS_METER = "SlaExceededFlows";
public static final String FAILED_FLOW_METER = "FailedFlows";
public static final String SCHEDULED_FLOW_METER = GOBBLIN_SERVICE_PREFIX +
".ScheduledFlows";
public static final String NON_SCHEDULED_FLOW_METER = GOBBLIN_SERVICE_PREFIX
+ ".NonScheduledFlows";
@@ -50,6 +51,7 @@ public class ServiceMetricNames {
public static final String SERVICE_USERS = "ServiceUsers";
public static final String COMPILED = "Compiled";
public static final String RUNNING_STATUS = "RunningStatus";
+ public static final String JOBS_SENT_TO_SPEC_EXECUTOR =
"JobsSentToSpecExecutor";
public static final String HELIX_LEADER_STATE = "HelixLeaderState";
}
diff --git
a/gobblin-service/src/main/java/org/apache/gobblin/service/modules/orchestration/DagManager.java
b/gobblin-service/src/main/java/org/apache/gobblin/service/modules/orchestration/DagManager.java
index dcfbe9cdf..f423277cf 100644
---
a/gobblin-service/src/main/java/org/apache/gobblin/service/modules/orchestration/DagManager.java
+++
b/gobblin-service/src/main/java/org/apache/gobblin/service/modules/orchestration/DagManager.java
@@ -18,7 +18,6 @@
package org.apache.gobblin.service.modules.orchestration;
import com.codahale.metrics.Meter;
-import com.google.common.annotations.VisibleForTesting;
import java.io.IOException;
import java.net.URI;
import java.util.ArrayList;
@@ -40,9 +39,6 @@ import java.util.concurrent.ScheduledExecutorService;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.atomic.AtomicLong;
-import org.apache.commons.lang3.StringUtils;
-
-import com.codahale.metrics.MetricRegistry;
import com.codahale.metrics.Timer;
import com.google.common.base.Optional;
import com.google.common.collect.Lists;
@@ -61,15 +57,11 @@ import lombok.extern.slf4j.Slf4j;
import org.apache.gobblin.annotation.Alpha;
import org.apache.gobblin.configuration.ConfigurationKeys;
import org.apache.gobblin.instrumented.Instrumented;
-import org.apache.gobblin.metrics.ContextAwareCounter;
import org.apache.gobblin.metrics.ContextAwareGauge;
-import org.apache.gobblin.metrics.ContextAwareMeter;
import org.apache.gobblin.metrics.MetricContext;
-import org.apache.gobblin.metrics.RootMetricContext;
import org.apache.gobblin.metrics.ServiceMetricNames;
import org.apache.gobblin.metrics.event.EventSubmitter;
import org.apache.gobblin.metrics.event.TimingEvent;
-import org.apache.gobblin.metrics.metric.filter.MetricNameRegexFilter;
import org.apache.gobblin.runtime.api.FlowSpec;
import org.apache.gobblin.runtime.api.JobSpec;
import org.apache.gobblin.runtime.api.Spec;
@@ -77,8 +69,6 @@ import org.apache.gobblin.runtime.api.SpecProducer;
import org.apache.gobblin.runtime.api.TopologySpec;
import org.apache.gobblin.service.ExecutionStatus;
import org.apache.gobblin.service.FlowId;
-import org.apache.gobblin.service.RequesterService;
-import org.apache.gobblin.service.ServiceRequester;
import org.apache.gobblin.service.modules.flowgraph.Dag;
import org.apache.gobblin.service.modules.flowgraph.Dag.DagNode;
import org.apache.gobblin.service.modules.spec.JobExecutionPlan;
@@ -186,6 +176,7 @@ public class DagManager extends AbstractIdleService {
private final Config config;
private final Optional<EventSubmitter> eventSubmitter;
private final long failedDagRetentionTime;
+ private final DagManagerMetrics dagManagerMetrics;
private volatile boolean isActive = false;
@@ -199,12 +190,14 @@ public class DagManager extends AbstractIdleService {
this.pollingInterval = ConfigUtils.getInt(config,
JOB_STATUS_POLLING_INTERVAL_KEY, DEFAULT_JOB_STATUS_POLLING_INTERVAL);
this.retentionPollingInterval = ConfigUtils.getInt(config,
FAILED_DAG_POLLING_INTERVAL, DEFAULT_FAILED_DAG_POLLING_INTERVAL);
this.instrumentationEnabled = instrumentationEnabled;
+ MetricContext metricContext = null;
if (instrumentationEnabled) {
- MetricContext metricContext =
Instrumented.getMetricContext(ConfigUtils.configToState(ConfigFactory.empty()),
getClass());
+ metricContext =
Instrumented.getMetricContext(ConfigUtils.configToState(ConfigFactory.empty()),
getClass());
this.eventSubmitter = Optional.of(new
EventSubmitter.Builder(metricContext, "org.apache.gobblin.service").build());
} else {
this.eventSubmitter = Optional.absent();
}
+ this.dagManagerMetrics = new DagManagerMetrics(metricContext);
TimeUnit jobStartTimeUnit = TimeUnit.valueOf(ConfigUtils.getString(config,
JOB_START_SLA_UNITS,
ConfigurationKeys.FALLBACK_GOBBLIN_JOB_START_SLA_TIME_UNIT));
this.defaultJobStartSlaTimeMillis =
jobStartTimeUnit.toMillis(ConfigUtils.getLong(config, JOB_START_SLA_TIME,
ConfigurationKeys.FALLBACK_GOBBLIN_JOB_START_SLA_TIME));
this.jobStatusRetriever = jobStatusRetriever;
@@ -355,18 +348,7 @@ public class DagManager extends AbstractIdleService {
topologySpecMap);
Set<String> failedDagIds =
Collections.synchronizedSet(failedDagStateStore.getDagIds());
- ContextAwareMeter allSuccessfulMeter = null;
- ContextAwareMeter allFailedMeter = null;
- // TODO: create a map of RatioGauges for success/failed per executor,
will require preprocessing of available executors
- Map<String, ContextAwareMeter> startSlaExceededMeters =
Maps.newConcurrentMap();
-
- if (instrumentationEnabled) {
- MetricContext metricContext =
Instrumented.getMetricContext(ConfigUtils.configToState(ConfigFactory.empty()),
getClass());
- allSuccessfulMeter =
metricContext.contextAwareMeter(MetricRegistry.name(ServiceMetricNames.GOBBLIN_SERVICE_PREFIX,
- ServiceMetricNames.SUCCESSFUL_FLOW_METER));
- allFailedMeter =
metricContext.contextAwareMeter(MetricRegistry.name(ServiceMetricNames.GOBBLIN_SERVICE_PREFIX,
- ServiceMetricNames.FAILED_FLOW_METER));
- }
+ this.dagManagerMetrics.activate();
UserQuotaManager quotaManager = new UserQuotaManager(config);
// Before initializing the DagManagerThreads check which dags are
currently running before shutdown
@@ -382,8 +364,8 @@ public class DagManager extends AbstractIdleService {
this.dagManagerThreads = new DagManagerThread[numThreads];
for (int i = 0; i < numThreads; i++) {
DagManagerThread dagManagerThread = new
DagManagerThread(jobStatusRetriever, dagStateStore, failedDagStateStore,
- runQueue[i], cancelQueue[i], resumeQueue[i],
instrumentationEnabled, failedDagIds, allSuccessfulMeter,
- allFailedMeter, startSlaExceededMeters,
this.defaultJobStartSlaTimeMillis, quotaManager, i);
+ runQueue[i], cancelQueue[i], resumeQueue[i],
instrumentationEnabled, failedDagIds, this.dagManagerMetrics,
+ this.defaultJobStartSlaTimeMillis, quotaManager, i);
this.dagManagerThreads[i] = dagManagerThread;
this.scheduledExecutorPool.scheduleAtFixedRate(dagManagerThread, 0,
this.pollingInterval, TimeUnit.SECONDS);
}
@@ -397,9 +379,7 @@ public class DagManager extends AbstractIdleService {
} else { //Mark the DagManager inactive.
log.info("Inactivating the DagManager. Shutting down all DagManager
threads");
this.scheduledExecutorPool.shutdown();
- // The DMThread's metrics mappings follow the lifecycle of the
DMThread itself and so are lost by DM deactivation-reactivation but the
RootMetricContext is a (persistent) singleton.
- // To avoid IllegalArgumentException by the RMC preventing (re-)add of
a metric already known, remove all metrics that a new DMThread thread would
attempt to add (in DagManagerThread::initialize) whenever running
post-re-enablement
-
RootMetricContext.get().removeMatching(getMetricsFilterForDagManager());
+ this.dagManagerMetrics.cleanup();
try {
this.scheduledExecutorPool.awaitTermination(TERMINATION_TIMEOUT,
TimeUnit.SECONDS);
} catch (InterruptedException e) {
@@ -412,11 +392,6 @@ public class DagManager extends AbstractIdleService {
}
}
- @VisibleForTesting
- protected static MetricNameRegexFilter getMetricsFilterForDagManager() {
- return new MetricNameRegexFilter(ServiceMetricNames.GOBBLIN_SERVICE_PREFIX
+ "\\..*\\." + ServiceMetricNames.RUNNING_STATUS);
- }
-
/**
* Each {@link DagManagerThread} performs 2 actions when scheduled:
* <ol>
@@ -439,12 +414,7 @@ public class DagManager extends AbstractIdleService {
private final Optional<EventSubmitter> eventSubmitter;
private final Optional<Timer> jobStatusPolledTimer;
private final AtomicLong orchestrationDelay = new AtomicLong(0);
- private static final Map<String, FlowState> flowGauges =
Maps.newConcurrentMap();
- private final ContextAwareMeter allSuccessfulMeter;
- private final ContextAwareMeter allFailedMeter;
- private static final Map<String, ContextAwareMeter> groupSuccessfulMeters
= Maps.newConcurrentMap();
- private static final Map<String, ContextAwareMeter> groupFailureMeters =
Maps.newConcurrentMap();
- private final Map<String, ContextAwareMeter> startSlaExceededMeters;
+ private final DagManagerMetrics dagManagerMetrics;
private final UserQuotaManager quotaManager;
private final JobStatusRetriever jobStatusRetriever;
private final DagStateStore dagStateStore;
@@ -459,8 +429,7 @@ public class DagManager extends AbstractIdleService {
*/
DagManagerThread(JobStatusRetriever jobStatusRetriever, DagStateStore
dagStateStore, DagStateStore failedDagStateStore,
BlockingQueue<Dag<JobExecutionPlan>> queue, BlockingQueue<String>
cancelQueue, BlockingQueue<String> resumeQueue,
- boolean instrumentationEnabled, Set<String> failedDagIds,
ContextAwareMeter allSuccessfulMeter,
- ContextAwareMeter allFailedMeter, Map<String, ContextAwareMeter>
startSlaExceededMeters,
+ boolean instrumentationEnabled, Set<String> failedDagIds,
DagManagerMetrics dagManagerMetrics,
Long defaultJobStartSla, UserQuotaManager quotaManager, int
dagMangerThreadId) {
this.jobStatusRetriever = jobStatusRetriever;
this.dagStateStore = dagStateStore;
@@ -469,11 +438,9 @@ public class DagManager extends AbstractIdleService {
this.queue = queue;
this.cancelQueue = cancelQueue;
this.resumeQueue = resumeQueue;
- this.allSuccessfulMeter = allSuccessfulMeter;
- this.allFailedMeter = allFailedMeter;
+ this.dagManagerMetrics = dagManagerMetrics;
this.defaultJobStartSlaTimeMillis = defaultJobStartSla;
this.quotaManager = quotaManager;
- this.startSlaExceededMeters = startSlaExceededMeters;
if (instrumentationEnabled) {
this.metricContext =
Instrumented.getMetricContext(ConfigUtils.configToState(ConfigFactory.empty()),
getClass());
@@ -673,22 +640,13 @@ public class DagManager extends AbstractIdleService {
if (DagManagerUtils.getExecutionStatus(dagNode) == RUNNING) {
addJobState(dagId, dagNode);
//Update the running jobs counter.
- getRunningJobsCounter(dagNode).inc();
-
getRunningJobsCounterForUser(dagNode).forEach(ContextAwareCounter::inc);
+ dagManagerMetrics.incrementRunningJobMetrics(dagNode);
isDagRunning = true;
}
}
FlowId flowId = DagManagerUtils.getFlowId(dag);
- // Do not register flow-specific metrics for a flow
- if (!flowGauges.containsKey(flowId.toString()) &&
DagManagerUtils.shouldFlowOutputMetrics(dag)) {
- String flowStateGaugeName =
MetricRegistry.name(ServiceMetricNames.GOBBLIN_SERVICE_PREFIX,
flowId.getFlowGroup(),
- flowId.getFlowName(), ServiceMetricNames.RUNNING_STATUS);
- flowGauges.put(flowId.toString(), FlowState.RUNNING);
- ContextAwareGauge<Integer> gauge = RootMetricContext
- .get().newContextAwareGauge(flowStateGaugeName, () ->
flowGauges.get(flowId.toString()).value);
- RootMetricContext.get().register(flowStateGaugeName, gauge);
- }
+ this.dagManagerMetrics.registerFlowMetric(flowId, dag);
log.debug("Dag {} submitting jobs ready for execution.",
DagManagerUtils.getFullyQualifiedDagName(dag));
//Determine the next set of jobs to run and submit them for execution
@@ -699,7 +657,7 @@ public class DagManager extends AbstractIdleService {
// Set flow status to running
DagManagerUtils.emitFlowEvent(this.eventSubmitter, dag,
TimingEvent.FlowTimings.FLOW_RUNNING);
- conditionallyUpdateFlowGaugeExecutionState(flowGauges, flowId,
FlowState.RUNNING);
+ dagManagerMetrics.conditionallyMarkFlowAsState(flowId,
FlowState.RUNNING);
// Report the orchestration delay the first time the Dag is initialized.
Orchestration delay is defined as
// the time difference between the instant when a flow first transitions
to the running state and the instant
@@ -805,14 +763,12 @@ public class DagManager extends AbstractIdleService {
DagManagerUtils.getJobName(node),
DagManagerUtils.getFullyQualifiedDagName(node),
timeOutForJobStart);
+ dagManagerMetrics.incrementCountsStartSlaExceeded(node);
cancelDagNode(node);
String dagId = DagManagerUtils.generateDagId(node);
this.dags.get(dagId).setFlowEvent(TimingEvent.FlowTimings.FLOW_START_DEADLINE_EXCEEDED);
this.dags.get(dagId).setMessage("Flow killed because no update
received for " + timeOutForJobStart + " ms after orchestration");
- if (this.metricContext != null) {
- this.getExecutorMeterForDag(node,
ServiceMetricNames.START_SLA_EXCEEDED_FLOWS_METER,
startSlaExceededMeters).mark();
- }
return true;
} else {
return false;
@@ -865,6 +821,7 @@ public class DagManager extends AbstractIdleService {
log.info("Flow {} exceeded the SLA of {} ms. Killing the job {}
now...",
node.getValue().getJobSpec().getConfig().getString(ConfigurationKeys.FLOW_NAME_KEY),
flowSla,
node.getValue().getJobSpec().getConfig().getString(ConfigurationKeys.JOB_NAME_KEY));
+ dagManagerMetrics.incrementExecutorSlaExceeded(node);
cancelDagNode(node);
this.dags.get(dagId).setFlowEvent(TimingEvent.FlowTimings.FLOW_RUN_DEADLINE_EXCEEDED);
@@ -964,9 +921,8 @@ public class DagManager extends AbstractIdleService {
// Quota release is guaranteed, despite failure, because exception
handling within would mark the job FAILED.
// When the ensuing kafka message spurs DagManager processing, the
quota is released and the counts decremented
// Ensure that we do not double increment for flows that are retried
- if (this.metricContext != null &&
dagNode.getValue().getCurrentAttempts() == 1) {
- getRunningJobsCounter(dagNode).inc();
-
getRunningJobsCounterForUser(dagNode).forEach(ContextAwareCounter::inc);
+ if (dagNode.getValue().getCurrentAttempts() == 1) {
+ dagManagerMetrics.incrementRunningJobMetrics(dagNode);
}
// Submit the job to the SpecProducer, which in turn performs the
actual job submission to the SpecExecutor instance.
// The SpecProducer implementations submit the job to the underlying
executor and return when the submission is complete,
@@ -985,6 +941,7 @@ public class DagManager extends AbstractIdleService {
jobOrchestrationTimer.stop(jobMetadata);
}
log.info("Orchestrated job: {} on Executor: {}",
DagManagerUtils.getFullyQualifiedJobName(dagNode), specExecutorUri);
+ this.dagManagerMetrics.incrementJobsSentToExecutor(dagNode);
} catch (Exception e) {
TimingEvent jobFailedTimer = this.eventSubmitter.isPresent() ?
this.eventSubmitter.get().
getTimingEvent(TimingEvent.LauncherTimings.JOB_FAILED) : null;
@@ -1012,9 +969,8 @@ public class DagManager extends AbstractIdleService {
ExecutionStatus jobStatus = DagManagerUtils.getExecutionStatus(dagNode);
log.info("Job {} of Dag {} has finished with status {}", jobName, dagId,
jobStatus.name());
// Only decrement counters and quota for jobs that actually ran on the
executor, not from a GaaS side failure/skip event
- if (quotaManager.releaseQuota(dagNode) && this.metricContext != null) {
- getRunningJobsCounter(dagNode).dec();
-
getRunningJobsCounterForUser(dagNode).forEach(ContextAwareCounter::dec);
+ if (quotaManager.releaseQuota(dagNode)) {
+ dagManagerMetrics.decrementRunningJobMetrics(dagNode);
}
switch (jobStatus) {
@@ -1025,6 +981,7 @@ public class DagManager extends AbstractIdleService {
} else {
this.failedDagIdsFinishAllPossible.add(dagId);
}
+ dagManagerMetrics.incrementExecutorFailed(dagNode);
return Maps.newHashMap();
case CANCELLED:
if (DagManagerUtils.getFailureOption(dag) ==
FailureOption.FINISH_RUNNING) {
@@ -1034,6 +991,7 @@ public class DagManager extends AbstractIdleService {
}
return Maps.newHashMap();
case COMPLETE:
+ dagManagerMetrics.incrementExecutorSuccess(dagNode);
return submitNext(dagId);
default:
log.warn("It should not reach here. Job status is unexpected.");
@@ -1064,61 +1022,6 @@ public class DagManager extends AbstractIdleService {
return dagNodes != null && !dagNodes.isEmpty();
}
- private ContextAwareCounter
getRunningJobsCounter(DagNode<JobExecutionPlan> dagNode) {
- return metricContext.contextAwareCounter(
- MetricRegistry.name(
- ServiceMetricNames.GOBBLIN_SERVICE_PREFIX,
- ServiceMetricNames.RUNNING_FLOWS_COUNTER,
- DagManagerUtils.getSpecExecutorName(dagNode)));
- }
-
- private List<ContextAwareCounter>
getRunningJobsCounterForUser(DagNode<JobExecutionPlan> dagNode) {
- Config configs = dagNode.getValue().getJobSpec().getConfig();
- String proxy = ConfigUtils.getString(configs,
AzkabanProjectConfig.USER_TO_PROXY, null);
- List<ContextAwareCounter> counters = new ArrayList<>();
-
- if (StringUtils.isNotEmpty(proxy)) {
- counters.add(metricContext.contextAwareCounter(
- MetricRegistry.name(
- ServiceMetricNames.GOBBLIN_SERVICE_PREFIX,
- ServiceMetricNames.SERVICE_USERS, proxy)));
- }
-
- try {
- String serializedRequesters =
DagManagerUtils.getSerializedRequesterList(dagNode);
- if (StringUtils.isNotEmpty(serializedRequesters)) {
- List<ServiceRequester> requesters =
RequesterService.deserialize(serializedRequesters);
- for (ServiceRequester requester : requesters) {
- counters.add(metricContext.contextAwareCounter(MetricRegistry
- .name(ServiceMetricNames.GOBBLIN_SERVICE_PREFIX,
ServiceMetricNames.SERVICE_USERS, requester.getName())));
- }
- }
- } catch (IOException e) {
- log.error("Error while fetching requester list.", e);
- }
-
- return counters;
- }
-
- private ContextAwareMeter getGroupMeterForDag(String dagId, String
meterName, Map<String, ContextAwareMeter> meterMap) {
- String flowGroup =
DagManagerUtils.getFlowId(this.dags.get(dagId)).getFlowGroup();
- return meterMap.computeIfAbsent(flowGroup,
- group ->
metricContext.contextAwareMeter(MetricRegistry.name(ServiceMetricNames.GOBBLIN_SERVICE_PREFIX,
group, meterName)));
- }
-
- /**
- * Used to track metrics for different specExecutors to detect issues with
the specExecutor itself
- * @param dagNode
- * @param meterName
- * @param meterMap
- * @return
- */
- private ContextAwareMeter getExecutorMeterForDag(DagNode<JobExecutionPlan>
dagNode, String meterName, Map<String, ContextAwareMeter> meterMap) {
- String executorName = DagManagerUtils.getSpecExecutorName(dagNode);
- return meterMap.computeIfAbsent(executorName,
- executorUri ->
metricContext.contextAwareMeter(MetricRegistry.name(ServiceMetricNames.GOBBLIN_SERVICE_PREFIX,
executorUri, meterName)));
- }
-
/**
* Perform clean up. Remove a dag from the dagstore if the dag is complete
and update internal state.
*/
@@ -1132,24 +1035,27 @@ public class DagManager extends AbstractIdleService {
DagNode<JobExecutionPlan> dagNode = dagNodeList.poll();
deleteJobState(dagId, dagNode);
}
- log.info("Dag {} has finished with status FAILED; Cleaning up dag from
the state store.", dagId);
- onFlowFailure(dagId);
+ Dag<JobExecutionPlan> dag = this.dags.get(dagId);
+ String status = TimingEvent.FlowTimings.FLOW_FAILED;
+ addFailedDag(dagId, dag);
+ log.info("Dag {} has finished with status {}; Cleaning up dag from the
state store.", dagId, status);
// send an event before cleaning up dag
- DagManagerUtils.emitFlowEvent(this.eventSubmitter,
this.dags.get(dagId), TimingEvent.FlowTimings.FLOW_FAILED);
+ DagManagerUtils.emitFlowEvent(this.eventSubmitter,
this.dags.get(dagId), status);
dagIdstoClean.add(dagId);
}
- //Clean up completed dags
- for (String dagId : this.dags.keySet()) {
+ // Remove dags that are finished and emit their appropriate metrics
+ for (Map.Entry<String, Dag<JobExecutionPlan>> dagIdKeyPair :
this.dags.entrySet()) {
+ String dagId = dagIdKeyPair.getKey();
+ Dag<JobExecutionPlan> dag = dagIdKeyPair.getValue();
if (!hasRunningJobs(dagId) &&
!this.failedDagIdsFinishRunning.contains(dagId)) {
String status = TimingEvent.FlowTimings.FLOW_SUCCEEDED;
if (this.failedDagIdsFinishAllPossible.contains(dagId)) {
- onFlowFailure(dagId);
status = TimingEvent.FlowTimings.FLOW_FAILED;
+ addFailedDag(dagId, dag);
this.failedDagIdsFinishAllPossible.remove(dagId);
- conditionallyUpdateFlowGaugeExecutionState(flowGauges,
DagManagerUtils.getFlowId(this.dags.get(dagId)), FlowState.FAILED);
} else {
- onFlowSuccess(dagId);
+
dagManagerMetrics.emitFlowSuccessMetrics(DagManagerUtils.getFlowId(this.dags.get(dagId)));
}
log.info("Dag {} has finished with status {}; Cleaning up dag from
the state store.", dagId, status);
// send an event before cleaning up dag
@@ -1163,27 +1069,11 @@ public class DagManager extends AbstractIdleService {
}
}
- private void onFlowSuccess(String dagId) {
- if (this.metricContext != null) {
- conditionallyUpdateFlowGaugeExecutionState(flowGauges,
DagManagerUtils.getFlowId(this.dags.get(dagId)), FlowState.SUCCESSFUL);
- this.allSuccessfulMeter.mark();
- getGroupMeterForDag(dagId, ServiceMetricNames.SUCCESSFUL_FLOW_METER,
groupSuccessfulMeters).mark();
- }
- }
-
- private void onFlowFailure(String dagId) {
- addFailedDag(dagId);
- if (this.metricContext != null) {
- conditionallyUpdateFlowGaugeExecutionState(flowGauges,
DagManagerUtils.getFlowId(this.dags.get(dagId)), FlowState.FAILED);
- this.allFailedMeter.mark();
- getGroupMeterForDag(dagId, ServiceMetricNames.FAILED_FLOW_METER,
groupFailureMeters).mark();
- }
- }
-
/**
* Add a dag to failed dag state store
*/
- private synchronized void addFailedDag(String dagId) {
+ private synchronized void addFailedDag(String dagId, Dag<JobExecutionPlan>
dag) {
+ FlowId flowId = DagManagerUtils.getFlowId(dag);
try {
log.info("Adding dag " + dagId + " to failed dag state store");
this.failedDagStateStore.writeCheckpoint(this.dags.get(dagId));
@@ -1191,6 +1081,12 @@ public class DagManager extends AbstractIdleService {
log.error("Failed to add dag " + dagId + " to failed dag state store",
e);
}
this.failedDagIds.add(dagId);
+ if
(TimingEvent.FlowTimings.FLOW_RUN_DEADLINE_EXCEEDED.equals(dag.getFlowEvent()))
{
+ this.dagManagerMetrics.emitFlowSlaExceededMetrics(flowId);
+ } else if
(!TimingEvent.FlowTimings.FLOW_START_DEADLINE_EXCEEDED.equals(dag.getFlowEvent()))
{
+ dagManagerMetrics.emitFlowFailedMetrics(flowId);
+ }
+ this.dagManagerMetrics.conditionallyMarkFlowAsState(flowId,
DagManager.FlowState.FAILED);
}
/**
@@ -1210,18 +1106,6 @@ public class DagManager extends AbstractIdleService {
this.dags.remove(dagId);
this.dagToJobs.remove(dagId);
}
-
- /**
- * Updates flowGauges with the appropriate state if the gauge is being
tracked for the flow
- * @param flowGauges
- * @param flowId
- * @param state
- */
- private void conditionallyUpdateFlowGaugeExecutionState(Map<String,
FlowState> flowGauges, FlowId flowId, FlowState state) {
- if (flowGauges.containsKey(flowId.toString())) {
- flowGauges.put(flowId.toString(), state);
- }
- }
}
public enum FlowState {
diff --git
a/gobblin-service/src/main/java/org/apache/gobblin/service/modules/orchestration/DagManagerMetrics.java
b/gobblin-service/src/main/java/org/apache/gobblin/service/modules/orchestration/DagManagerMetrics.java
new file mode 100644
index 000000000..acf7c0310
--- /dev/null
+++
b/gobblin-service/src/main/java/org/apache/gobblin/service/modules/orchestration/DagManagerMetrics.java
@@ -0,0 +1,251 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.gobblin.service.modules.orchestration;
+
+import com.codahale.metrics.MetricRegistry;
+import com.google.common.annotations.VisibleForTesting;
+import com.google.common.collect.Maps;
+import com.typesafe.config.Config;
+import java.io.IOException;
+import java.util.ArrayList;
+import java.util.List;
+import java.util.Map;
+import lombok.extern.slf4j.Slf4j;
+import org.apache.commons.lang3.StringUtils;
+import org.apache.gobblin.configuration.ConfigurationKeys;
+import org.apache.gobblin.metrics.ContextAwareCounter;
+import org.apache.gobblin.metrics.ContextAwareGauge;
+import org.apache.gobblin.metrics.ContextAwareMeter;
+import org.apache.gobblin.metrics.MetricContext;
+import org.apache.gobblin.metrics.RootMetricContext;
+import org.apache.gobblin.metrics.ServiceMetricNames;
+import org.apache.gobblin.metrics.metric.filter.MetricNameRegexFilter;
+import org.apache.gobblin.service.FlowId;
+import org.apache.gobblin.service.RequesterService;
+import org.apache.gobblin.service.ServiceRequester;
+import org.apache.gobblin.service.modules.flowgraph.Dag;
+import org.apache.gobblin.service.modules.spec.JobExecutionPlan;
+import org.apache.gobblin.util.ConfigUtils;
+
+
+@Slf4j
+public class DagManagerMetrics {
+ private static final Map<String, DagManager.FlowState> flowGauges =
Maps.newConcurrentMap();
+ // Meters representing the total number of flows in a given state
+ private ContextAwareMeter allSuccessfulMeter;
+ private ContextAwareMeter allFailedMeter;
+ private ContextAwareMeter allRunningMeter;
+ private ContextAwareMeter allSlaExceededMeter;
+ private ContextAwareMeter allStartSlaExceededMeter;
+ // Meters representing the flows in a given state per flowgroup
+ private final Map<String, ContextAwareMeter> groupSuccessfulMeters =
Maps.newConcurrentMap();
+ private final Map<String, ContextAwareMeter> groupFailureMeters =
Maps.newConcurrentMap();
+ private final Map<String, ContextAwareMeter> groupStartSlaExceededMeters =
Maps.newConcurrentMap();
+ private final Map<String, ContextAwareMeter> groupSlaExceededMeters =
Maps.newConcurrentMap();
+
+ // Meters representing the jobs in a given state per executor
+ // These metrics need to be invoked differently to account for automated
retries and multihop scenarios.
+ private final Map<String, ContextAwareMeter> executorSuccessMeters =
Maps.newConcurrentMap();
+ private final Map<String, ContextAwareMeter> executorFailureMeters =
Maps.newConcurrentMap();
+ private final Map<String, ContextAwareMeter> executorStartSlaExceededMeters
= Maps.newConcurrentMap();
+ private final Map<String, ContextAwareMeter> executorSlaExceededMeters =
Maps.newConcurrentMap();
+ private final Map<String, ContextAwareMeter> executorJobSentMeters =
Maps.newConcurrentMap();
+ MetricContext metricContext;
+
+ public DagManagerMetrics(MetricContext metricContext) {
+ this.metricContext = metricContext;
+ }
+
+ public void activate() {
+ if (this.metricContext != null) {
+ allSuccessfulMeter =
metricContext.contextAwareMeter(MetricRegistry.name(ServiceMetricNames.GOBBLIN_SERVICE_PREFIX,
+ ServiceMetricNames.SUCCESSFUL_FLOW_METER));
+ allFailedMeter =
metricContext.contextAwareMeter(MetricRegistry.name(ServiceMetricNames.GOBBLIN_SERVICE_PREFIX,
+ ServiceMetricNames.FAILED_FLOW_METER));
+ allStartSlaExceededMeter =
metricContext.contextAwareMeter(MetricRegistry.name(ServiceMetricNames.GOBBLIN_SERVICE_PREFIX,
+ ServiceMetricNames.START_SLA_EXCEEDED_FLOWS_METER));
+ allSlaExceededMeter =
metricContext.contextAwareMeter(MetricRegistry.name(ServiceMetricNames.GOBBLIN_SERVICE_PREFIX,
+ ServiceMetricNames.SLA_EXCEEDED_FLOWS_METER));
+ allRunningMeter =
metricContext.contextAwareMeter(MetricRegistry.name(ServiceMetricNames.GOBBLIN_SERVICE_PREFIX,
+ ServiceMetricNames.JOBS_SENT_TO_SPEC_EXECUTOR));
+ }
+ }
+
+ public void registerFlowMetric(FlowId flowId, Dag<JobExecutionPlan> dag) {
+ // Do not register flow-specific metrics for an adhoc flow
+ if (!flowGauges.containsKey(flowId.toString()) &&
DagManagerUtils.shouldFlowOutputMetrics(dag)) {
+ String flowStateGaugeName =
MetricRegistry.name(ServiceMetricNames.GOBBLIN_SERVICE_PREFIX,
flowId.getFlowGroup(),
+ flowId.getFlowName(), ServiceMetricNames.RUNNING_STATUS);
+ flowGauges.put(flowId.toString(), DagManager.FlowState.RUNNING);
+ ContextAwareGauge<Integer> gauge = RootMetricContext
+ .get().newContextAwareGauge(flowStateGaugeName, () ->
flowGauges.get(flowId.toString()).value);
+ RootMetricContext.get().register(flowStateGaugeName, gauge);
+ }
+ }
+
+ public void incrementRunningJobMetrics(Dag.DagNode<JobExecutionPlan>
dagNode) {
+ if (this.metricContext != null) {
+ this.getRunningJobsCounterForExecutor(dagNode).inc();
+
this.getRunningJobsCounterForUser(dagNode).forEach(ContextAwareCounter::inc);
+ }
+ }
+
+ public void decrementRunningJobMetrics(Dag.DagNode<JobExecutionPlan>
dagNode) {
+ if (this.metricContext != null) {
+ this.getRunningJobsCounterForExecutor(dagNode).dec();
+
this.getRunningJobsCounterForUser(dagNode).forEach(ContextAwareCounter::dec);
+ }
+ }
+
+ /**
+ * Updates flowGauges with the appropriate state if the gauge is being
tracked for the flow
+ * @param flowId
+ * @param state
+ */
+ public void conditionallyMarkFlowAsState(FlowId flowId, DagManager.FlowState
state) {
+ if (flowGauges.containsKey(flowId.toString())) {
+ flowGauges.put(flowId.toString(), state);
+ }
+ }
+
+ public void emitFlowSuccessMetrics(FlowId flowId) {
+ if (this.metricContext != null) {
+ this.conditionallyMarkFlowAsState(flowId,
DagManager.FlowState.SUCCESSFUL);
+ this.allSuccessfulMeter.mark();
+ this.getGroupMeterForDag(flowId.getFlowGroup(),
ServiceMetricNames.SUCCESSFUL_FLOW_METER, groupSuccessfulMeters).mark();
+ }
+ }
+
+ public void emitFlowFailedMetrics(FlowId flowId) {
+ if (this.metricContext != null) {
+ this.conditionallyMarkFlowAsState(flowId, DagManager.FlowState.FAILED);
+ this.allFailedMeter.mark();
+ this.getGroupMeterForDag(flowId.getFlowGroup(),
ServiceMetricNames.FAILED_FLOW_METER, groupFailureMeters).mark();
+ }
+ }
+
+ public void emitFlowSlaExceededMetrics(FlowId flowId) {
+ if (this.metricContext != null) {
+ this.conditionallyMarkFlowAsState(flowId, DagManager.FlowState.FAILED);
+ this.allSlaExceededMeter.mark();
+ this.getGroupMeterForDag(flowId.getFlowGroup(),
ServiceMetricNames.SLA_EXCEEDED_FLOWS_METER, groupSlaExceededMeters).mark();
+ }
+ }
+
+ public void incrementExecutorSuccess(Dag.DagNode<JobExecutionPlan> node) {
+ if (this.metricContext != null) {
+ this.getExecutorMeterForDag(node,
ServiceMetricNames.SUCCESSFUL_FLOW_METER, executorSuccessMeters).mark();
+ }
+ }
+
+ public void incrementExecutorFailed(Dag.DagNode<JobExecutionPlan> node) {
+ if (this.metricContext != null) {
+ this.getExecutorMeterForDag(node, ServiceMetricNames.FAILED_FLOW_METER,
executorFailureMeters).mark();
+ }
+ }
+
+ public void incrementExecutorSlaExceeded(Dag.DagNode<JobExecutionPlan> node)
{
+ if (this.metricContext != null) {
+ this.getExecutorMeterForDag(node,
ServiceMetricNames.SLA_EXCEEDED_FLOWS_METER, executorSlaExceededMeters).mark();
+ }
+ }
+
+ public void incrementJobsSentToExecutor(Dag.DagNode<JobExecutionPlan> node) {
+ if (this.metricContext != null) {
+ this.getExecutorMeterForDag(node,
ServiceMetricNames.JOBS_SENT_TO_SPEC_EXECUTOR, executorJobSentMeters).mark();
+ this.allRunningMeter.mark();
+ }
+ }
+
+ // Increment the counts for start sla during the flow submission rather than
cleanup to account for retries obfuscating this metric
+ public void incrementCountsStartSlaExceeded(Dag.DagNode<JobExecutionPlan>
node) {
+ String flowGroup =
node.getValue().getJobSpec().getConfig().getString(ConfigurationKeys.FLOW_GROUP_KEY);
+ if (this.metricContext != null) {
+ this.getGroupMeterForDag(flowGroup,
ServiceMetricNames.START_SLA_EXCEEDED_FLOWS_METER, groupStartSlaExceededMeters);
+ this.allStartSlaExceededMeter.mark();
+ this.getExecutorMeterForDag(node,
ServiceMetricNames.START_SLA_EXCEEDED_FLOWS_METER,
executorStartSlaExceededMeters).mark();
+ }
+ }
+
+ private List<ContextAwareCounter>
getRunningJobsCounterForUser(Dag.DagNode<JobExecutionPlan> dagNode) {
+ Config configs = dagNode.getValue().getJobSpec().getConfig();
+ String proxy = ConfigUtils.getString(configs,
AzkabanProjectConfig.USER_TO_PROXY, null);
+ List<ContextAwareCounter> counters = new ArrayList<>();
+
+ if (StringUtils.isNotEmpty(proxy)) {
+ counters.add(this.metricContext.contextAwareCounter(
+ MetricRegistry.name(
+ ServiceMetricNames.GOBBLIN_SERVICE_PREFIX,
+ ServiceMetricNames.SERVICE_USERS, proxy)));
+ }
+
+ try {
+ String serializedRequesters =
DagManagerUtils.getSerializedRequesterList(dagNode);
+ if (StringUtils.isNotEmpty(serializedRequesters)) {
+ List<ServiceRequester> requesters =
RequesterService.deserialize(serializedRequesters);
+ for (ServiceRequester requester : requesters) {
+ counters.add(this.metricContext.contextAwareCounter(MetricRegistry
+ .name(ServiceMetricNames.GOBBLIN_SERVICE_PREFIX,
ServiceMetricNames.SERVICE_USERS, requester.getName())));
+ }
+ }
+ } catch (IOException e) {
+ log.error("Error while fetching requester list.", e);
+ }
+
+ return counters;
+ }
+
+ private ContextAwareCounter
getRunningJobsCounterForExecutor(Dag.DagNode<JobExecutionPlan> dagNode) {
+ return this.metricContext.contextAwareCounter(
+ MetricRegistry.name(
+ ServiceMetricNames.GOBBLIN_SERVICE_PREFIX,
+ DagManagerUtils.getSpecExecutorName(dagNode),
+ ServiceMetricNames.RUNNING_FLOWS_COUNTER));
+ }
+
+
+ private ContextAwareMeter getGroupMeterForDag(String flowGroup, String
meterName, Map<String, ContextAwareMeter> meterMap) {
+ return meterMap.computeIfAbsent(flowGroup,
+ group ->
metricContext.contextAwareMeter(MetricRegistry.name(ServiceMetricNames.GOBBLIN_SERVICE_PREFIX,
group, meterName)));
+ }
+
+ /**
+ * Used to track metrics for different specExecutors to detect issues with
the specExecutor itself
+ * @param dagNode
+ * @param meterName
+ * @param meterMap
+ * @return
+ */
+ private ContextAwareMeter
getExecutorMeterForDag(Dag.DagNode<JobExecutionPlan> dagNode, String meterName,
Map<String, ContextAwareMeter> meterMap) {
+ String executorName = DagManagerUtils.getSpecExecutorName(dagNode);
+ return meterMap.computeIfAbsent(executorName,
+ executorUri ->
metricContext.contextAwareMeter(MetricRegistry.name(ServiceMetricNames.GOBBLIN_SERVICE_PREFIX,
executorUri, meterName)));
+ }
+
+
+ @VisibleForTesting
+ protected static MetricNameRegexFilter getMetricsFilterForDagManager() {
+ return new MetricNameRegexFilter(ServiceMetricNames.GOBBLIN_SERVICE_PREFIX
+ "\\..*\\." + ServiceMetricNames.RUNNING_STATUS);
+ }
+
+ public void cleanup() {
+ // The DMThread's metrics mappings follow the lifecycle of the DMThread
itself and so are lost by DM deactivation-reactivation but the
RootMetricContext is a (persistent) singleton.
+ // To avoid IllegalArgumentException by the RMC preventing (re-)add of a
metric already known, remove all metrics that a new DMThread thread would
attempt to add (in DagManagerThread::initialize) whenever running
post-re-enablement
+ RootMetricContext.get().removeMatching(getMetricsFilterForDagManager());
+ }
+}
diff --git
a/gobblin-service/src/test/java/org/apache/gobblin/service/modules/orchestration/DagManagerTest.java
b/gobblin-service/src/test/java/org/apache/gobblin/service/modules/orchestration/DagManagerTest.java
index 216e28525..b414a76e6 100644
---
a/gobblin-service/src/test/java/org/apache/gobblin/service/modules/orchestration/DagManagerTest.java
+++
b/gobblin-service/src/test/java/org/apache/gobblin/service/modules/orchestration/DagManagerTest.java
@@ -23,6 +23,7 @@ import java.io.IOException;
import java.lang.reflect.Field;
import java.net.URI;
import java.net.URISyntaxException;
+import java.time.Duration;
import java.util.ArrayList;
import java.util.HashMap;
import java.util.HashSet;
@@ -71,6 +72,7 @@ public class DagManagerTest {
private DagStateStore _dagStateStore;
private DagStateStore _failedDagStateStore;
private JobStatusRetriever _jobStatusRetriever;
+ private DagManagerMetrics _dagManagerMetrics;
private DagManager.DagManagerThread _dagManagerThread;
private LinkedBlockingQueue<Dag<JobExecutionPlan>> queue;
private LinkedBlockingQueue<String> cancelQueue;
@@ -99,9 +101,10 @@ public class DagManagerTest {
Config quotaConfig = ConfigFactory.empty()
.withValue(UserQuotaManager.PER_USER_QUOTA,
ConfigValueFactory.fromAnyRef("user:1"));
this._gobblinServiceQuotaManager = new UserQuotaManager(quotaConfig);
+ this._dagManagerMetrics = new DagManagerMetrics(metricContext);
+ this._dagManagerMetrics.activate();
this._dagManagerThread = new
DagManager.DagManagerThread(_jobStatusRetriever, _dagStateStore,
_failedDagStateStore, queue, cancelQueue,
- resumeQueue, true, new HashSet<>(),
metricContext.contextAwareMeter("successMeter"),
- metricContext.contextAwareMeter("failedMeter"), new HashMap<>(),
START_SLA_DEFAULT, _gobblinServiceQuotaManager, 0);
+ resumeQueue, true, new HashSet<>(), this._dagManagerMetrics,
START_SLA_DEFAULT, _gobblinServiceQuotaManager, 0);
Field jobToDagField =
DagManager.DagManagerThread.class.getDeclaredField("jobToDag");
jobToDagField.setAccessible(true);
@@ -703,15 +706,15 @@ public class DagManagerTest {
// The start time should be 16 minutes ago, which is past the start SLA so
the job should be cancelled
Iterator<JobStatus> jobStatusIterator1 =
getMockJobStatus(flowName, flowGroup, flowExecutionId, jobName0,
flowGroup, String.valueOf(ExecutionStatus.ORCHESTRATED),
- false, flowExecutionId - 16 * 60 * 1000);
+ false, flowExecutionId - Duration.ofMinutes(16).toMillis());
// This is for the second Dag that does not match the SLA so should
schedule normally
Iterator<JobStatus> jobStatusIterator2 =
getMockJobStatus(flowName1, flowGroup1, flowExecutionId+1, jobName0,
flowGroup1, String.valueOf(ExecutionStatus.ORCHESTRATED),
- false, flowExecutionId - 10 * 60 * 1000);
+ false, flowExecutionId - Duration.ofMinutes(10).toMillis());
// Let the first job get reported as cancel due to SLA kill on start and
clean up
Iterator<JobStatus> jobStatusIterator3 =
getMockJobStatus(flowName, flowGroup, flowExecutionId, jobName0,
flowGroup, String.valueOf(ExecutionStatus.CANCELLED),
- false, flowExecutionId - 16 * 60 * 1000);
+ false, flowExecutionId - Duration.ofMinutes(16).toMillis());
// Cleanup the running job that is scheduled normally
Iterator<JobStatus> jobStatusIterator4 =
getMockJobStatus(flowName1, flowGroup1, flowExecutionId+1, jobName0,
flowGroup1, String.valueOf(ExecutionStatus.COMPLETE));
@@ -751,6 +754,8 @@ public class DagManagerTest {
@Test (dependsOnMethods = "testJobStartSLAKilledDag")
public void testJobKilledSLAMetricsArePerExecutor() throws
URISyntaxException, IOException {
long flowExecutionId = System.currentTimeMillis();
+ // The start time should be 16 minutes ago, which is past the start SLA so
the job should be cancelled
+ long startOrchestrationTime = flowExecutionId -
Duration.ofMinutes(16).toMillis();
Config executorOneConfig = ConfigFactory.empty()
.withValue(ConfigurationKeys.SPECEXECUTOR_INSTANCE_URI_KEY,
ConfigValueFactory.fromAnyRef("executorOne"))
.withValue(ConfigurationKeys.FLOW_EXECUTION_ID_KEY,
ConfigValueFactory.fromAnyRef(flowExecutionId));
@@ -758,20 +763,23 @@ public class DagManagerTest {
List<Dag<JobExecutionPlan>> dagList = buildDagList(2, "user",
executorOneConfig);
dagList.add(buildDag("2", flowExecutionId, "FINISH_RUNNING", 1, "user",
executorTwoConfig));
+ String allSlaKilledMeterName =
MetricRegistry.name(ServiceMetricNames.GOBBLIN_SERVICE_PREFIX,
ServiceMetricNames.START_SLA_EXCEEDED_FLOWS_METER);
+ long previousSlaKilledCount =
metricContext.getParent().get().getMeters().get(allSlaKilledMeterName) == null
? 0 :
+
metricContext.getParent().get().getMeters().get(allSlaKilledMeterName).getCount();
+
//Add a dag to the queue of dags
this.queue.offer(dagList.get(0));
this.queue.offer(dagList.get(1));
this.queue.offer(dagList.get(2));;
- // The start time should be 16 minutes ago, which is past the start SLA so
the job should be cancelled
Iterator<JobStatus> jobStatusIterator1 =
getMockJobStatus("flow0", "group0", flowExecutionId, "job0", "group0",
String.valueOf(ExecutionStatus.ORCHESTRATED),
- false, flowExecutionId - 16 * 60 * 1000);
+ false, startOrchestrationTime);
Iterator<JobStatus> jobStatusIterator2 =
getMockJobStatus("flow1", "flow1", flowExecutionId+1, "job0",
"group1", String.valueOf(ExecutionStatus.ORCHESTRATED),
- false, flowExecutionId - 16 * 60 * 1000);
+ false, startOrchestrationTime);
Iterator<JobStatus> jobStatusIterator3 =
getMockJobStatus("flow2", "flow2", flowExecutionId+1, "job0",
"group2", String.valueOf(ExecutionStatus.ORCHESTRATED),
- false, flowExecutionId - 16 * 60 * 1000);
+ false, startOrchestrationTime);
Mockito.when(_jobStatusRetriever
.getJobStatusesForFlowExecution(Mockito.anyString(),
Mockito.anyString(), Mockito.anyLong(), Mockito.anyString(),
Mockito.anyString())).
@@ -779,15 +787,18 @@ public class DagManagerTest {
thenReturn(jobStatusIterator2).
thenReturn(jobStatusIterator3);
- // Run the thread once. All 3 jobs should be emitted an sla exceeded event
+ // Run the thread once. All 3 jobs should be emitted an SLA exceeded event
this._dagManagerThread.run();
String slakilledMeterName1 =
MetricRegistry.name(ServiceMetricNames.GOBBLIN_SERVICE_PREFIX, "executorOne",
ServiceMetricNames.START_SLA_EXCEEDED_FLOWS_METER);
String slakilledMeterName2 =
MetricRegistry.name(ServiceMetricNames.GOBBLIN_SERVICE_PREFIX, "executorTwo",
ServiceMetricNames.START_SLA_EXCEEDED_FLOWS_METER);
+
Assert.assertEquals(metricContext.getParent().get().getMeters().get(slakilledMeterName1).getCount(),
2);
Assert.assertEquals(metricContext.getParent().get().getMeters().get(slakilledMeterName2).getCount(),
1);
// Cleanup
this._dagManagerThread.run();
+
Assert.assertEquals(metricContext.getParent().get().getMeters().get(allSlaKilledMeterName).getCount(),
previousSlaKilledCount + 3);
+
Assert.assertEquals(this.dags.size(), 0);
Assert.assertEquals(this.jobToDag.size(), 0);
Assert.assertEquals(this.dagToJobs.size(), 0);
@@ -1053,6 +1064,157 @@ public class DagManagerTest {
Assert.assertEquals(metricContext.getParent().get().getGauges().get(flowStateGaugeName1).getValue(),
DagManager.FlowState.SUCCESSFUL.value);
}
+ @Test (dependsOnMethods = "testEmitFlowMetricOnlyIfNotAdhoc")
+ public void testJobSlaKilledMetrics() throws URISyntaxException, IOException
{
+ long flowExecutionId = System.currentTimeMillis() -
Duration.ofMinutes(20).toMillis();
+ Config executorOneConfig = ConfigFactory.empty()
+ .withValue(ConfigurationKeys.SPECEXECUTOR_INSTANCE_URI_KEY,
ConfigValueFactory.fromAnyRef("executorOne"))
+ .withValue(ConfigurationKeys.FLOW_EXECUTION_ID_KEY,
ConfigValueFactory.fromAnyRef(flowExecutionId))
+ .withValue(ConfigurationKeys.GOBBLIN_FLOW_SLA_TIME,
ConfigValueFactory.fromAnyRef(10));
+ Config executorTwoConfig = ConfigFactory.empty()
+ .withValue(ConfigurationKeys.SPECEXECUTOR_INSTANCE_URI_KEY,
ConfigValueFactory.fromAnyRef("executorTwo"))
+ .withValue(ConfigurationKeys.GOBBLIN_FLOW_SLA_TIME,
ConfigValueFactory.fromAnyRef(10));
+ List<Dag<JobExecutionPlan>> dagList = buildDagList(2, "newUser",
executorOneConfig);
+ dagList.add(buildDag("2", flowExecutionId, "FINISH_RUNNING", 1, "newUser",
executorTwoConfig));
+
+ String allSlaKilledMeterName =
MetricRegistry.name(ServiceMetricNames.GOBBLIN_SERVICE_PREFIX,
ServiceMetricNames.SLA_EXCEEDED_FLOWS_METER);
+ long previousSlaKilledCount =
metricContext.getParent().get().getMeters().get(allSlaKilledMeterName) == null
? 0 :
+
metricContext.getParent().get().getMeters().get(allSlaKilledMeterName).getCount();
+
+ //Add a dag to the queue of dags
+ this.queue.offer(dagList.get(0));
+ this.queue.offer(dagList.get(1));
+ this.queue.offer(dagList.get(2));;
+ // Set orchestration time to be 20 minutes in the past, the job should be
marked as SLA killed
+ Iterator<JobStatus> jobStatusIterator1 =
+ getMockJobStatus("flow0", "group0", flowExecutionId, "job0", "group0",
String.valueOf(ExecutionStatus.RUNNING),
+ false, flowExecutionId);
+ Iterator<JobStatus> jobStatusIterator2 =
+ getMockJobStatus("flow1", "flow1", flowExecutionId, "job0", "group1",
String.valueOf(ExecutionStatus.RUNNING),
+ false, flowExecutionId);
+ Iterator<JobStatus> jobStatusIterator3 =
+ getMockJobStatus("flow2", "flow2", flowExecutionId, "job0", "group2",
String.valueOf(ExecutionStatus.RUNNING),
+ false, flowExecutionId);
+ Iterator<JobStatus> jobStatusIterator4 =
+ getMockJobStatus("flow0", "flow0", flowExecutionId, "job0", "group0",
String.valueOf(ExecutionStatus.CANCELLED),
+ false, flowExecutionId);
+ Iterator<JobStatus> jobStatusIterator5 =
+ getMockJobStatus("flow1", "flow1", flowExecutionId, "job0", "group1",
String.valueOf(ExecutionStatus.CANCELLED),
+ false, flowExecutionId);
+ Iterator<JobStatus> jobStatusIterator6 =
+ getMockJobStatus("flow2", "flow2", flowExecutionId, "job0", "group2",
String.valueOf(ExecutionStatus.CANCELLED),
+ false, flowExecutionId);
+
+ Mockito.when(_jobStatusRetriever
+ .getJobStatusesForFlowExecution(Mockito.anyString(),
Mockito.anyString(), Mockito.anyLong(), Mockito.anyString(),
Mockito.anyString())).
+ thenReturn(jobStatusIterator1).
+ thenReturn(jobStatusIterator2).
+ thenReturn(jobStatusIterator3).
+ thenReturn(jobStatusIterator4).
+ thenReturn(jobStatusIterator5).
+ thenReturn(jobStatusIterator6);
+
+ // Run the thread once. All 3 jobs should be emitted an SLA exceeded event
+ this._dagManagerThread.run();
+ this._dagManagerThread.run();
+
+ String slakilledMeterName1 =
MetricRegistry.name(ServiceMetricNames.GOBBLIN_SERVICE_PREFIX, "executorOne",
ServiceMetricNames.SLA_EXCEEDED_FLOWS_METER);
+ String slakilledMeterName2 =
MetricRegistry.name(ServiceMetricNames.GOBBLIN_SERVICE_PREFIX, "executorTwo",
ServiceMetricNames.SLA_EXCEEDED_FLOWS_METER);
+ String failedFlowGauge =
MetricRegistry.name(ServiceMetricNames.GOBBLIN_SERVICE_PREFIX,
"group1","flow1", ServiceMetricNames.RUNNING_STATUS);
+
+ String slakilledGroupName =
MetricRegistry.name(ServiceMetricNames.GOBBLIN_SERVICE_PREFIX, "group0",
ServiceMetricNames.SLA_EXCEEDED_FLOWS_METER);
+
Assert.assertEquals(metricContext.getParent().get().getMeters().get(slakilledMeterName1).getCount(),
2);
+
Assert.assertEquals(metricContext.getParent().get().getMeters().get(slakilledMeterName2).getCount(),
1);
+
Assert.assertEquals(metricContext.getParent().get().getMeters().get(slakilledGroupName).getCount(),
1);
+ // Cleanup
+ this._dagManagerThread.run();
+
Assert.assertEquals(metricContext.getParent().get().getMeters().get(allSlaKilledMeterName).getCount(),
previousSlaKilledCount + 3);
+
Assert.assertEquals(metricContext.getParent().get().getGauges().get(failedFlowGauge).getValue(),
-1);
+
+ Assert.assertEquals(this.dags.size(), 0);
+ Assert.assertEquals(this.jobToDag.size(), 0);
+ Assert.assertEquals(this.dagToJobs.size(), 0);
+ }
+
+ @Test (dependsOnMethods = "testJobSlaKilledMetrics")
+ public void testPerExecutorMetricsSuccessFails() throws URISyntaxException,
IOException {
+ long flowExecutionId = System.currentTimeMillis();
+ Config executorOneConfig = ConfigFactory.empty()
+ .withValue(ConfigurationKeys.SPECEXECUTOR_INSTANCE_URI_KEY,
ConfigValueFactory.fromAnyRef("executorOne"))
+ .withValue(ConfigurationKeys.FLOW_EXECUTION_ID_KEY,
ConfigValueFactory.fromAnyRef(flowExecutionId));
+ Config executorTwoConfig = ConfigFactory.empty()
+ .withValue(ConfigurationKeys.SPECEXECUTOR_INSTANCE_URI_KEY,
ConfigValueFactory.fromAnyRef("executorTwo"));
+ List<Dag<JobExecutionPlan>> dagList = buildDagList(2, "newUser",
executorOneConfig);
+ dagList.add(buildDag("2", flowExecutionId, "FINISH_RUNNING", 1, "newUser",
executorTwoConfig));
+ // Get global metric count before any changes are applied
+ String allSuccessfulFlowsMeterName =
MetricRegistry.name(ServiceMetricNames.GOBBLIN_SERVICE_PREFIX,
ServiceMetricNames.SUCCESSFUL_FLOW_METER);
+ long previousSuccessCount =
metricContext.getParent().get().getMeters().get(allSuccessfulFlowsMeterName) ==
null ? 0 :
+
metricContext.getParent().get().getMeters().get(allSuccessfulFlowsMeterName).getCount();
+ String previousJobSentToExecutorMeterName =
MetricRegistry.name(ServiceMetricNames.GOBBLIN_SERVICE_PREFIX, "executorOne",
ServiceMetricNames.JOBS_SENT_TO_SPEC_EXECUTOR);
+ long previousJobSentToExecutorCount =
metricContext.getParent().get().getMeters().get(previousJobSentToExecutorMeterName)
== null ? 0 :
+
metricContext.getParent().get().getMeters().get(previousJobSentToExecutorMeterName).getCount();
+
+ //Add a dag to the queue of dags
+ this.queue.offer(dagList.get(0));
+ this.queue.offer(dagList.get(1));
+ this.queue.offer(dagList.get(2));;
+ // The start time should be 16 minutes ago, which is past the start SLA so
the job should be cancelled
+ Iterator<JobStatus> jobStatusIterator1 =
+ getMockJobStatus( "flow0", "group0", flowExecutionId, "job0",
"group0", String.valueOf(ExecutionStatus.ORCHESTRATED),
+ false, flowExecutionId);
+ Iterator<JobStatus> jobStatusIterator2 =
+ getMockJobStatus("flow1", "flow1", flowExecutionId+1, "job0",
"group1", String.valueOf(ExecutionStatus.ORCHESTRATED),
+ false, flowExecutionId);
+ Iterator<JobStatus> jobStatusIterator3 =
+ getMockJobStatus("flow2", "flow2", flowExecutionId+1, "job0",
"group2", String.valueOf(ExecutionStatus.ORCHESTRATED),
+ false, flowExecutionId);
+ Iterator<JobStatus> jobStatusIterator4 =
+ getMockJobStatus( "flow0", "flow0", flowExecutionId+1, "job0",
"group0", String.valueOf(ExecutionStatus.COMPLETE),
+ false, flowExecutionId);
+ Iterator<JobStatus> jobStatusIterator5 =
+ getMockJobStatus("flow1", "flow1", flowExecutionId+1, "job0",
"group1", String.valueOf(ExecutionStatus.FAILED),
+ false, flowExecutionId);
+ Iterator<JobStatus> jobStatusIterator6 =
+ getMockJobStatus("flow2", "flow2", flowExecutionId+1, "job0",
"group2", String.valueOf(ExecutionStatus.COMPLETE),
+ false, flowExecutionId);
+
+ Mockito.when(_jobStatusRetriever
+ .getJobStatusesForFlowExecution(Mockito.eq("flow0"),
Mockito.eq("group0"), Mockito.anyLong(), Mockito.anyString(),
Mockito.anyString())).
+ thenReturn(jobStatusIterator1).
+ thenReturn(jobStatusIterator4);
+
+ Mockito.when(_jobStatusRetriever
+ .getJobStatusesForFlowExecution(Mockito.eq("flow1"),
Mockito.eq("group1"), Mockito.anyLong(), Mockito.anyString(),
Mockito.anyString())).
+ thenReturn(jobStatusIterator2).
+ thenReturn(jobStatusIterator5);
+
+ Mockito.when(_jobStatusRetriever
+ .getJobStatusesForFlowExecution(Mockito.eq("flow2"),
Mockito.eq("group2"), Mockito.anyLong(), Mockito.anyString(),
Mockito.anyString())).
+ thenReturn(jobStatusIterator3).
+ thenReturn(jobStatusIterator6);
+
+ this._dagManagerThread.run();
+
+ String slaSuccessfulFlowsExecutorOneMeterName =
MetricRegistry.name(ServiceMetricNames.GOBBLIN_SERVICE_PREFIX, "executorOne",
ServiceMetricNames.SUCCESSFUL_FLOW_METER);
+ String slaFailedFlowsExecutorOneMeterName =
MetricRegistry.name(ServiceMetricNames.GOBBLIN_SERVICE_PREFIX, "executorOne",
ServiceMetricNames.FAILED_FLOW_METER);
+ String failedFlowGauge =
MetricRegistry.name(ServiceMetricNames.GOBBLIN_SERVICE_PREFIX, "group1",
"flow1", ServiceMetricNames.RUNNING_STATUS);
+
+ this._dagManagerThread.run();
+
+
Assert.assertEquals(metricContext.getParent().get().getMeters().get(slaSuccessfulFlowsExecutorOneMeterName).getCount(),
1);
+
Assert.assertEquals(metricContext.getParent().get().getMeters().get(slaFailedFlowsExecutorOneMeterName).getCount(),
1);
+
Assert.assertEquals(metricContext.getParent().get().getMeters().get(allSuccessfulFlowsMeterName).getCount(),
previousSuccessCount + 2);
+
Assert.assertEquals(metricContext.getParent().get().getMeters().get(previousJobSentToExecutorMeterName).getCount(),
previousJobSentToExecutorCount + 2);
+
Assert.assertEquals(metricContext.getParent().get().getGauges().get(failedFlowGauge).getValue(),
-1);
+ // Cleanup
+ this._dagManagerThread.run();
+
+ Assert.assertEquals(this.dags.size(), 0);
+ Assert.assertEquals(this.jobToDag.size(), 0);
+ Assert.assertEquals(this.dagToJobs.size(), 0);
+ }
+
+
@AfterClass
public void cleanUp() throws Exception {
diff --git
a/gobblin-service/src/test/java/org/apache/gobblin/service/modules/orchestration/TestServiceMetrics.java
b/gobblin-service/src/test/java/org/apache/gobblin/service/modules/orchestration/TestServiceMetrics.java
index 42113829b..67188e1e8 100644
---
a/gobblin-service/src/test/java/org/apache/gobblin/service/modules/orchestration/TestServiceMetrics.java
+++
b/gobblin-service/src/test/java/org/apache/gobblin/service/modules/orchestration/TestServiceMetrics.java
@@ -29,7 +29,7 @@ public class TestServiceMetrics {
@Test
public void matchesTest() {
- MetricNameRegexFilter metricNameRegexForDagManager =
DagManager.getMetricsFilterForDagManager();
+ MetricNameRegexFilter metricNameRegexForDagManager =
DagManagerMetrics.getMetricsFilterForDagManager();
Assert.assertTrue(metricNameRegexForDagManager.matches("GobblinService.testGroup.testFlow.RunningStatus",
mock(Metric.class)));
Assert.assertTrue(metricNameRegexForDagManager.matches("GobblinService.test..RunningStatus",
mock(Metric.class)));
Assert.assertFalse(metricNameRegexForDagManager.matches("test3.RunningStatus",
mock(Metric.class)));