This is an automated email from the ASF dual-hosted git repository.
zihanli58 pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/gobblin.git
The following commit(s) were added to refs/heads/master by this push:
new 283752c04 [GOBBLIN-1641] Add meter for sla exceeded flows (#3502)
283752c04 is described below
commit 283752c0461416277f739add8327440b60fbd540
Author: William Lo <[email protected]>
AuthorDate: Wed May 11 13:48:40 2022 -0700
[GOBBLIN-1641] Add meter for sla exceeded flows (#3502)
* Add meter for sla exceeded flows
* fix tests
* Fix test nullpointer
* Address review + augment tests
---
.../apache/gobblin/metrics/ServiceMetricNames.java | 1 +
.../service/modules/orchestration/DagManager.java | 28 +++++++++--
.../modules/orchestration/DagManagerUtils.java | 4 ++
.../modules/orchestration/DagManagerTest.java | 55 ++++++++++++++++++++--
4 files changed, 81 insertions(+), 7 deletions(-)
diff --git
a/gobblin-metrics-libs/gobblin-metrics/src/main/java/org/apache/gobblin/metrics/ServiceMetricNames.java
b/gobblin-metrics-libs/gobblin-metrics/src/main/java/org/apache/gobblin/metrics/ServiceMetricNames.java
index fb3442559..3bd4083a1 100644
---
a/gobblin-metrics-libs/gobblin-metrics/src/main/java/org/apache/gobblin/metrics/ServiceMetricNames.java
+++
b/gobblin-metrics-libs/gobblin-metrics/src/main/java/org/apache/gobblin/metrics/ServiceMetricNames.java
@@ -41,6 +41,7 @@ public class ServiceMetricNames {
public static final String DELETE_FLOW_METER = "DeleteFlow";
public static final String RUN_IMMEDIATELY_FLOW_METER = "RunImmediatelyFlow";
public static final String SUCCESSFUL_FLOW_METER = "SuccessfulFlows";
+ public static final String START_SLA_EXCEEDED_FLOWS_METER =
"StartSLAExceededFlows";
public static final String FAILED_FLOW_METER = "FailedFlows";
public static final String SCHEDULED_FLOW_METER = GOBBLIN_SERVICE_PREFIX +
".ScheduledFlows";
public static final String NON_SCHEDULED_FLOW_METER = GOBBLIN_SERVICE_PREFIX
+ ".NonScheduledFlows";
diff --git
a/gobblin-service/src/main/java/org/apache/gobblin/service/modules/orchestration/DagManager.java
b/gobblin-service/src/main/java/org/apache/gobblin/service/modules/orchestration/DagManager.java
index eeb6b70b3..d106aaddb 100644
---
a/gobblin-service/src/main/java/org/apache/gobblin/service/modules/orchestration/DagManager.java
+++
b/gobblin-service/src/main/java/org/apache/gobblin/service/modules/orchestration/DagManager.java
@@ -357,6 +357,8 @@ public class DagManager extends AbstractIdleService {
ContextAwareMeter allSuccessfulMeter = null;
ContextAwareMeter allFailedMeter = null;
+ // TODO: create a map of RatioGauges for success/failed per executor,
will require preprocessing of available executors
+ Map<String, ContextAwareMeter> startSlaExceededMeters =
Maps.newConcurrentMap();
if (instrumentationEnabled) {
MetricContext metricContext =
Instrumented.getMetricContext(ConfigUtils.configToState(ConfigFactory.empty()),
getClass());
@@ -381,7 +383,7 @@ public class DagManager extends AbstractIdleService {
for (int i = 0; i < numThreads; i++) {
DagManagerThread dagManagerThread = new
DagManagerThread(jobStatusRetriever, dagStateStore, failedDagStateStore,
runQueue[i], cancelQueue[i], resumeQueue[i],
instrumentationEnabled, failedDagIds, allSuccessfulMeter,
- allFailedMeter, this.defaultJobStartSlaTimeMillis, quotaManager,
i);
+ allFailedMeter, startSlaExceededMeters,
this.defaultJobStartSlaTimeMillis, quotaManager, i);
this.dagManagerThreads[i] = dagManagerThread;
this.scheduledExecutorPool.scheduleAtFixedRate(dagManagerThread, 0,
this.pollingInterval, TimeUnit.SECONDS);
}
@@ -442,6 +444,7 @@ public class DagManager extends AbstractIdleService {
private final ContextAwareMeter allFailedMeter;
private static final Map<String, ContextAwareMeter> groupSuccessfulMeters
= Maps.newConcurrentMap();
private static final Map<String, ContextAwareMeter> groupFailureMeters =
Maps.newConcurrentMap();
+ private final Map<String, ContextAwareMeter> startSlaExceededMeters;
private final UserQuotaManager quotaManager;
private final JobStatusRetriever jobStatusRetriever;
private final DagStateStore dagStateStore;
@@ -457,7 +460,8 @@ public class DagManager extends AbstractIdleService {
DagManagerThread(JobStatusRetriever jobStatusRetriever, DagStateStore
dagStateStore, DagStateStore failedDagStateStore,
BlockingQueue<Dag<JobExecutionPlan>> queue, BlockingQueue<String>
cancelQueue, BlockingQueue<String> resumeQueue,
boolean instrumentationEnabled, Set<String> failedDagIds,
ContextAwareMeter allSuccessfulMeter,
- ContextAwareMeter allFailedMeter, Long defaultJobStartSla,
UserQuotaManager quotaManager, int dagMangerThreadId) {
+ ContextAwareMeter allFailedMeter, Map<String, ContextAwareMeter>
startSlaExceededMeters,
+ Long defaultJobStartSla, UserQuotaManager quotaManager, int
dagMangerThreadId) {
this.jobStatusRetriever = jobStatusRetriever;
this.dagStateStore = dagStateStore;
this.failedDagStateStore = failedDagStateStore;
@@ -469,6 +473,7 @@ public class DagManager extends AbstractIdleService {
this.allFailedMeter = allFailedMeter;
this.defaultJobStartSlaTimeMillis = defaultJobStartSla;
this.quotaManager = quotaManager;
+ this.startSlaExceededMeters = startSlaExceededMeters;
if (instrumentationEnabled) {
this.metricContext =
Instrumented.getMetricContext(ConfigUtils.configToState(ConfigFactory.empty()),
getClass());
@@ -805,7 +810,9 @@ public class DagManager extends AbstractIdleService {
String dagId = DagManagerUtils.generateDagId(node);
this.dags.get(dagId).setFlowEvent(TimingEvent.FlowTimings.FLOW_START_DEADLINE_EXCEEDED);
this.dags.get(dagId).setMessage("Flow killed because no update
received for " + timeOutForJobStart + " ms after orchestration");
-
+ if (this.metricContext != null) {
+ this.getExecutorMeterForDag(node,
ServiceMetricNames.START_SLA_EXCEEDED_FLOWS_METER,
startSlaExceededMeters).mark();
+ }
return true;
} else {
return false;
@@ -1061,7 +1068,7 @@ public class DagManager extends AbstractIdleService {
MetricRegistry.name(
ServiceMetricNames.GOBBLIN_SERVICE_PREFIX,
ServiceMetricNames.RUNNING_FLOWS_COUNTER,
- dagNode.getValue().getSpecExecutor().getUri().toString()));
+ DagManagerUtils.getSpecExecutorName(dagNode)));
}
private List<ContextAwareCounter>
getRunningJobsCounterForUser(DagNode<JobExecutionPlan> dagNode) {
@@ -1098,6 +1105,19 @@ public class DagManager extends AbstractIdleService {
group ->
metricContext.contextAwareMeter(MetricRegistry.name(ServiceMetricNames.GOBBLIN_SERVICE_PREFIX,
group, meterName)));
}
+ /**
+ * Used to track metrics for different specExecutors to detect issues with
the specExecutor itself
+ * @param dagNode
+ * @param meterName
+ * @param meterMap
+ * @return
+ */
+ private ContextAwareMeter getExecutorMeterForDag(DagNode<JobExecutionPlan>
dagNode, String meterName, Map<String, ContextAwareMeter> meterMap) {
+ String executorName = DagManagerUtils.getSpecExecutorName(dagNode);
+ return meterMap.computeIfAbsent(executorName,
+ executorUri ->
metricContext.contextAwareMeter(MetricRegistry.name(ServiceMetricNames.GOBBLIN_SERVICE_PREFIX,
executorUri, meterName)));
+ }
+
/**
* Perform clean up. Remove a dag from the dagstore if the dag is complete
and update internal state.
*/
diff --git
a/gobblin-service/src/main/java/org/apache/gobblin/service/modules/orchestration/DagManagerUtils.java
b/gobblin-service/src/main/java/org/apache/gobblin/service/modules/orchestration/DagManagerUtils.java
index 17b4fd3e7..5ec95793a 100644
---
a/gobblin-service/src/main/java/org/apache/gobblin/service/modules/orchestration/DagManagerUtils.java
+++
b/gobblin-service/src/main/java/org/apache/gobblin/service/modules/orchestration/DagManagerUtils.java
@@ -317,6 +317,10 @@ public class DagManagerUtils {
return ConfigUtils.getBoolean(getDagJobConfig(dag),
ConfigurationKeys.GOBBLIN_FLOW_ISADHOC,false);
}
+ static String getSpecExecutorName(DagNode<JobExecutionPlan> dagNode) {
+ return dagNode.getValue().getSpecExecutor().getUri().toString();
+ }
+
static void emitFlowEvent(Optional<EventSubmitter> eventSubmitter,
Dag<JobExecutionPlan> dag, String flowEvent) {
if (eventSubmitter.isPresent() && !dag.isEmpty()) {
// Every dag node will contain the same flow metadata
diff --git
a/gobblin-service/src/test/java/org/apache/gobblin/service/modules/orchestration/DagManagerTest.java
b/gobblin-service/src/test/java/org/apache/gobblin/service/modules/orchestration/DagManagerTest.java
index c098228ba..53f4d3284 100644
---
a/gobblin-service/src/test/java/org/apache/gobblin/service/modules/orchestration/DagManagerTest.java
+++
b/gobblin-service/src/test/java/org/apache/gobblin/service/modules/orchestration/DagManagerTest.java
@@ -101,7 +101,7 @@ public class DagManagerTest {
this._gobblinServiceQuotaManager = new UserQuotaManager(quotaConfig);
this._dagManagerThread = new
DagManager.DagManagerThread(_jobStatusRetriever, _dagStateStore,
_failedDagStateStore, queue, cancelQueue,
resumeQueue, true, new HashSet<>(),
metricContext.contextAwareMeter("successMeter"),
- metricContext.contextAwareMeter("failedMeter"), START_SLA_DEFAULT,
_gobblinServiceQuotaManager, 0);
+ metricContext.contextAwareMeter("failedMeter"), new HashMap<>(),
START_SLA_DEFAULT, _gobblinServiceQuotaManager, 0);
Field jobToDagField =
DagManager.DagManagerThread.class.getDeclaredField("jobToDag");
jobToDagField.setAccessible(true);
@@ -127,7 +127,7 @@ public class DagManagerTest {
static List<Dag<JobExecutionPlan>> buildDagList(int numDags, String
proxyUser, Config additionalConfig) throws URISyntaxException{
List<Dag<JobExecutionPlan>> dagList = new ArrayList<>();
for (int i = 0; i < numDags; i++) {
- dagList.add(buildDag(Integer.toString(i), System.currentTimeMillis(),
DagManager.FailureOption.FINISH_ALL_POSSIBLE.name(), 1,
+ dagList.add(buildDag(Integer.toString(i), System.currentTimeMillis(),
DagManager.FailureOption.FINISH_ALL_POSSIBLE.name(), 1,
proxyUser, additionalConfig));
}
return dagList;
@@ -172,7 +172,8 @@ public class DagManagerTest {
}
JobSpec js = JobSpec.builder("test_job" +
suffix).withVersion(suffix).withConfig(jobConfig).
withTemplate(new URI("job" + suffix)).build();
- SpecExecutor specExecutor =
MockedSpecExecutor.createDummySpecExecutor(new URI("job" + i));
+ SpecExecutor specExecutor =
MockedSpecExecutor.createDummySpecExecutor(new URI(
+ ConfigUtils.getString(additionalConfig,
ConfigurationKeys.SPECEXECUTOR_INSTANCE_URI_KEY,"job" + i)));
JobExecutionPlan jobExecutionPlan = new JobExecutionPlan(js,
specExecutor);
jobExecutionPlans.add(jobExecutionPlan);
}
@@ -736,6 +737,9 @@ public class DagManagerTest {
Assert.assertEquals(this.dagToJobs.size(), 1);
Assert.assertTrue(this.dags.containsKey(dagId1));
+ String slakilledMeterName =
MetricRegistry.name(ServiceMetricNames.GOBBLIN_SERVICE_PREFIX, "job0",
ServiceMetricNames.START_SLA_EXCEEDED_FLOWS_METER);
+
Assert.assertEquals(metricContext.getParent().get().getMeters().get(slakilledMeterName).getCount(),
1);
+
// Cleanup
this._dagManagerThread.run();
this._dagManagerThread.run();
@@ -745,6 +749,51 @@ public class DagManagerTest {
}
@Test (dependsOnMethods = "testJobStartSLAKilledDag")
+ public void testJobKilledSLAMetricsArePerExecutor() throws
URISyntaxException, IOException {
+ long flowExecutionId = System.currentTimeMillis();
+ Config executorOneConfig = ConfigFactory.empty()
+ .withValue(ConfigurationKeys.SPECEXECUTOR_INSTANCE_URI_KEY,
ConfigValueFactory.fromAnyRef("executorOne"))
+ .withValue(ConfigurationKeys.FLOW_EXECUTION_ID_KEY,
ConfigValueFactory.fromAnyRef(flowExecutionId));
+ Config executorTwoConfig =
ConfigFactory.empty().withValue(ConfigurationKeys.SPECEXECUTOR_INSTANCE_URI_KEY,
ConfigValueFactory.fromAnyRef("executorTwo"));
+ List<Dag<JobExecutionPlan>> dagList = buildDagList(2, "user",
executorOneConfig);
+ dagList.add(buildDag("2", flowExecutionId, "FINISH_RUNNING", 1, "user",
executorTwoConfig));
+
+ //Add a dag to the queue of dags
+ this.queue.offer(dagList.get(0));
+ this.queue.offer(dagList.get(1));
+ this.queue.offer(dagList.get(2));;
+ // The start time should be 16 minutes ago, which is past the start SLA so
the job should be cancelled
+ Iterator<JobStatus> jobStatusIterator1 =
+ getMockJobStatus("flow0", "group0", flowExecutionId, "job0", "group0",
String.valueOf(ExecutionStatus.ORCHESTRATED),
+ false, flowExecutionId - 16 * 60 * 1000);
+ Iterator<JobStatus> jobStatusIterator2 =
+ getMockJobStatus("flow1", "flow1", flowExecutionId+1, "job0",
"group1", String.valueOf(ExecutionStatus.ORCHESTRATED),
+ false, flowExecutionId - 16 * 60 * 1000);
+ Iterator<JobStatus> jobStatusIterator3 =
+ getMockJobStatus("flow2", "flow2", flowExecutionId+1, "job0",
"group2", String.valueOf(ExecutionStatus.ORCHESTRATED),
+ false, flowExecutionId - 16 * 60 * 1000);
+
+ Mockito.when(_jobStatusRetriever
+ .getJobStatusesForFlowExecution(Mockito.anyString(),
Mockito.anyString(), Mockito.anyLong(), Mockito.anyString(),
Mockito.anyString())).
+ thenReturn(jobStatusIterator1).
+ thenReturn(jobStatusIterator2).
+ thenReturn(jobStatusIterator3);
+
+ // Run the thread once. All 3 jobs should be emitted an sla exceeded event
+ this._dagManagerThread.run();
+
+ String slakilledMeterName1 =
MetricRegistry.name(ServiceMetricNames.GOBBLIN_SERVICE_PREFIX, "executorOne",
ServiceMetricNames.START_SLA_EXCEEDED_FLOWS_METER);
+ String slakilledMeterName2 =
MetricRegistry.name(ServiceMetricNames.GOBBLIN_SERVICE_PREFIX, "executorTwo",
ServiceMetricNames.START_SLA_EXCEEDED_FLOWS_METER);
+
Assert.assertEquals(metricContext.getParent().get().getMeters().get(slakilledMeterName1).getCount(),
2);
+
Assert.assertEquals(metricContext.getParent().get().getMeters().get(slakilledMeterName2).getCount(),
1);
+ // Cleanup
+ this._dagManagerThread.run();
+ Assert.assertEquals(this.dags.size(), 0);
+ Assert.assertEquals(this.jobToDag.size(), 0);
+ Assert.assertEquals(this.dagToJobs.size(), 0);
+ }
+
+ @Test (dependsOnMethods = "testJobKilledSLAMetricsArePerExecutor")
public void testDagManagerWithBadFlowSLAConfig() throws URISyntaxException,
IOException {
long flowExecutionId = System.currentTimeMillis();
String flowGroup = "group0";