This is an automated email from the ASF dual-hosted git repository.

zihanli58 pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/gobblin.git


The following commit(s) were added to refs/heads/master by this push:
     new 283752c04 [GOBBLIN-1641] Add meter for sla exceeded flows (#3502)
283752c04 is described below

commit 283752c0461416277f739add8327440b60fbd540
Author: William Lo <[email protected]>
AuthorDate: Wed May 11 13:48:40 2022 -0700

    [GOBBLIN-1641] Add meter for sla exceeded flows (#3502)
    
    * Add meter for sla exceeded flows
    
    * fix tests
    
    * Fix test nullpointer
    
    * Address review + augment tests
---
 .../apache/gobblin/metrics/ServiceMetricNames.java |  1 +
 .../service/modules/orchestration/DagManager.java  | 28 +++++++++--
 .../modules/orchestration/DagManagerUtils.java     |  4 ++
 .../modules/orchestration/DagManagerTest.java      | 55 ++++++++++++++++++++--
 4 files changed, 81 insertions(+), 7 deletions(-)

diff --git 
a/gobblin-metrics-libs/gobblin-metrics/src/main/java/org/apache/gobblin/metrics/ServiceMetricNames.java
 
b/gobblin-metrics-libs/gobblin-metrics/src/main/java/org/apache/gobblin/metrics/ServiceMetricNames.java
index fb3442559..3bd4083a1 100644
--- 
a/gobblin-metrics-libs/gobblin-metrics/src/main/java/org/apache/gobblin/metrics/ServiceMetricNames.java
+++ 
b/gobblin-metrics-libs/gobblin-metrics/src/main/java/org/apache/gobblin/metrics/ServiceMetricNames.java
@@ -41,6 +41,7 @@ public class ServiceMetricNames {
   public static final String DELETE_FLOW_METER = "DeleteFlow";
   public static final String RUN_IMMEDIATELY_FLOW_METER = "RunImmediatelyFlow";
   public static final String SUCCESSFUL_FLOW_METER = "SuccessfulFlows";
+  public static final String START_SLA_EXCEEDED_FLOWS_METER = 
"StartSLAExceededFlows";
   public static final String FAILED_FLOW_METER = "FailedFlows";
   public static final String SCHEDULED_FLOW_METER = GOBBLIN_SERVICE_PREFIX + 
".ScheduledFlows";
   public static final String NON_SCHEDULED_FLOW_METER = GOBBLIN_SERVICE_PREFIX 
+ ".NonScheduledFlows";
diff --git 
a/gobblin-service/src/main/java/org/apache/gobblin/service/modules/orchestration/DagManager.java
 
b/gobblin-service/src/main/java/org/apache/gobblin/service/modules/orchestration/DagManager.java
index eeb6b70b3..d106aaddb 100644
--- 
a/gobblin-service/src/main/java/org/apache/gobblin/service/modules/orchestration/DagManager.java
+++ 
b/gobblin-service/src/main/java/org/apache/gobblin/service/modules/orchestration/DagManager.java
@@ -357,6 +357,8 @@ public class DagManager extends AbstractIdleService {
 
         ContextAwareMeter allSuccessfulMeter = null;
         ContextAwareMeter allFailedMeter = null;
+        // TODO: create a map of RatioGauges for success/failed per executor, 
will require preprocessing of available executors
+        Map<String, ContextAwareMeter> startSlaExceededMeters = 
Maps.newConcurrentMap();
 
         if (instrumentationEnabled) {
           MetricContext metricContext = 
Instrumented.getMetricContext(ConfigUtils.configToState(ConfigFactory.empty()), 
getClass());
@@ -381,7 +383,7 @@ public class DagManager extends AbstractIdleService {
         for (int i = 0; i < numThreads; i++) {
           DagManagerThread dagManagerThread = new 
DagManagerThread(jobStatusRetriever, dagStateStore, failedDagStateStore,
               runQueue[i], cancelQueue[i], resumeQueue[i], 
instrumentationEnabled, failedDagIds, allSuccessfulMeter,
-              allFailedMeter, this.defaultJobStartSlaTimeMillis, quotaManager, 
i);
+              allFailedMeter, startSlaExceededMeters, 
this.defaultJobStartSlaTimeMillis, quotaManager, i);
           this.dagManagerThreads[i] = dagManagerThread;
           this.scheduledExecutorPool.scheduleAtFixedRate(dagManagerThread, 0, 
this.pollingInterval, TimeUnit.SECONDS);
         }
@@ -442,6 +444,7 @@ public class DagManager extends AbstractIdleService {
     private final ContextAwareMeter allFailedMeter;
     private static final Map<String, ContextAwareMeter> groupSuccessfulMeters 
= Maps.newConcurrentMap();
     private static final Map<String, ContextAwareMeter> groupFailureMeters = 
Maps.newConcurrentMap();
+    private final Map<String, ContextAwareMeter> startSlaExceededMeters;
     private final UserQuotaManager quotaManager;
     private final JobStatusRetriever jobStatusRetriever;
     private final DagStateStore dagStateStore;
@@ -457,7 +460,8 @@ public class DagManager extends AbstractIdleService {
     DagManagerThread(JobStatusRetriever jobStatusRetriever, DagStateStore 
dagStateStore, DagStateStore failedDagStateStore,
         BlockingQueue<Dag<JobExecutionPlan>> queue, BlockingQueue<String> 
cancelQueue, BlockingQueue<String> resumeQueue,
         boolean instrumentationEnabled, Set<String> failedDagIds, 
ContextAwareMeter allSuccessfulMeter,
-        ContextAwareMeter allFailedMeter, Long defaultJobStartSla, 
UserQuotaManager quotaManager, int dagMangerThreadId) {
+        ContextAwareMeter allFailedMeter, Map<String, ContextAwareMeter> 
startSlaExceededMeters,
+        Long defaultJobStartSla, UserQuotaManager quotaManager, int 
dagMangerThreadId) {
       this.jobStatusRetriever = jobStatusRetriever;
       this.dagStateStore = dagStateStore;
       this.failedDagStateStore = failedDagStateStore;
@@ -469,6 +473,7 @@ public class DagManager extends AbstractIdleService {
       this.allFailedMeter = allFailedMeter;
       this.defaultJobStartSlaTimeMillis = defaultJobStartSla;
       this.quotaManager = quotaManager;
+      this.startSlaExceededMeters = startSlaExceededMeters;
 
       if (instrumentationEnabled) {
         this.metricContext = 
Instrumented.getMetricContext(ConfigUtils.configToState(ConfigFactory.empty()), 
getClass());
@@ -805,7 +810,9 @@ public class DagManager extends AbstractIdleService {
         String dagId = DagManagerUtils.generateDagId(node);
         
this.dags.get(dagId).setFlowEvent(TimingEvent.FlowTimings.FLOW_START_DEADLINE_EXCEEDED);
         this.dags.get(dagId).setMessage("Flow killed because no update 
received for " + timeOutForJobStart + " ms after orchestration");
-
+        if (this.metricContext != null) {
+          this.getExecutorMeterForDag(node, 
ServiceMetricNames.START_SLA_EXCEEDED_FLOWS_METER, 
startSlaExceededMeters).mark();
+        }
         return true;
       } else {
         return false;
@@ -1061,7 +1068,7 @@ public class DagManager extends AbstractIdleService {
           MetricRegistry.name(
               ServiceMetricNames.GOBBLIN_SERVICE_PREFIX,
               ServiceMetricNames.RUNNING_FLOWS_COUNTER,
-              dagNode.getValue().getSpecExecutor().getUri().toString()));
+              DagManagerUtils.getSpecExecutorName(dagNode)));
     }
 
     private List<ContextAwareCounter> 
getRunningJobsCounterForUser(DagNode<JobExecutionPlan> dagNode) {
@@ -1098,6 +1105,19 @@ public class DagManager extends AbstractIdleService {
           group -> 
metricContext.contextAwareMeter(MetricRegistry.name(ServiceMetricNames.GOBBLIN_SERVICE_PREFIX,
 group, meterName)));
     }
 
+    /**
+     * Used to track metrics for different specExecutors to detect issues with 
the specExecutor itself
+     * @param dagNode
+     * @param meterName
+     * @param meterMap
+     * @return
+     */
+    private ContextAwareMeter getExecutorMeterForDag(DagNode<JobExecutionPlan> 
dagNode, String meterName, Map<String, ContextAwareMeter> meterMap) {
+      String executorName = DagManagerUtils.getSpecExecutorName(dagNode);
+      return meterMap.computeIfAbsent(executorName,
+          executorUri -> 
metricContext.contextAwareMeter(MetricRegistry.name(ServiceMetricNames.GOBBLIN_SERVICE_PREFIX,
 executorUri, meterName)));
+    }
+
     /**
      * Perform clean up. Remove a dag from the dagstore if the dag is complete 
and update internal state.
      */
diff --git 
a/gobblin-service/src/main/java/org/apache/gobblin/service/modules/orchestration/DagManagerUtils.java
 
b/gobblin-service/src/main/java/org/apache/gobblin/service/modules/orchestration/DagManagerUtils.java
index 17b4fd3e7..5ec95793a 100644
--- 
a/gobblin-service/src/main/java/org/apache/gobblin/service/modules/orchestration/DagManagerUtils.java
+++ 
b/gobblin-service/src/main/java/org/apache/gobblin/service/modules/orchestration/DagManagerUtils.java
@@ -317,6 +317,10 @@ public class DagManagerUtils {
     return ConfigUtils.getBoolean(getDagJobConfig(dag), 
ConfigurationKeys.GOBBLIN_FLOW_ISADHOC,false);
   }
 
+  static String getSpecExecutorName(DagNode<JobExecutionPlan> dagNode) {
+    return dagNode.getValue().getSpecExecutor().getUri().toString();
+  }
+
   static void emitFlowEvent(Optional<EventSubmitter> eventSubmitter, 
Dag<JobExecutionPlan> dag, String flowEvent) {
     if (eventSubmitter.isPresent() && !dag.isEmpty()) {
       // Every dag node will contain the same flow metadata
diff --git 
a/gobblin-service/src/test/java/org/apache/gobblin/service/modules/orchestration/DagManagerTest.java
 
b/gobblin-service/src/test/java/org/apache/gobblin/service/modules/orchestration/DagManagerTest.java
index c098228ba..53f4d3284 100644
--- 
a/gobblin-service/src/test/java/org/apache/gobblin/service/modules/orchestration/DagManagerTest.java
+++ 
b/gobblin-service/src/test/java/org/apache/gobblin/service/modules/orchestration/DagManagerTest.java
@@ -101,7 +101,7 @@ public class DagManagerTest {
     this._gobblinServiceQuotaManager = new UserQuotaManager(quotaConfig);
     this._dagManagerThread = new 
DagManager.DagManagerThread(_jobStatusRetriever, _dagStateStore, 
_failedDagStateStore, queue, cancelQueue,
         resumeQueue, true, new HashSet<>(), 
metricContext.contextAwareMeter("successMeter"),
-        metricContext.contextAwareMeter("failedMeter"), START_SLA_DEFAULT, 
_gobblinServiceQuotaManager, 0);
+        metricContext.contextAwareMeter("failedMeter"), new HashMap<>(), 
START_SLA_DEFAULT, _gobblinServiceQuotaManager, 0);
 
     Field jobToDagField = 
DagManager.DagManagerThread.class.getDeclaredField("jobToDag");
     jobToDagField.setAccessible(true);
@@ -127,7 +127,7 @@ public class DagManagerTest {
   static List<Dag<JobExecutionPlan>> buildDagList(int numDags, String 
proxyUser, Config additionalConfig) throws URISyntaxException{
     List<Dag<JobExecutionPlan>> dagList = new ArrayList<>();
     for (int i = 0; i < numDags; i++) {
-      dagList.add(buildDag(Integer.toString(i),  System.currentTimeMillis(), 
DagManager.FailureOption.FINISH_ALL_POSSIBLE.name(), 1,
+      dagList.add(buildDag(Integer.toString(i), System.currentTimeMillis(), 
DagManager.FailureOption.FINISH_ALL_POSSIBLE.name(), 1,
           proxyUser, additionalConfig));
     }
     return dagList;
@@ -172,7 +172,8 @@ public class DagManagerTest {
       }
       JobSpec js = JobSpec.builder("test_job" + 
suffix).withVersion(suffix).withConfig(jobConfig).
           withTemplate(new URI("job" + suffix)).build();
-      SpecExecutor specExecutor = 
MockedSpecExecutor.createDummySpecExecutor(new URI("job" + i));
+      SpecExecutor specExecutor = 
MockedSpecExecutor.createDummySpecExecutor(new URI(
+          ConfigUtils.getString(additionalConfig, 
ConfigurationKeys.SPECEXECUTOR_INSTANCE_URI_KEY,"job" + i)));
       JobExecutionPlan jobExecutionPlan = new JobExecutionPlan(js, 
specExecutor);
       jobExecutionPlans.add(jobExecutionPlan);
     }
@@ -736,6 +737,9 @@ public class DagManagerTest {
     Assert.assertEquals(this.dagToJobs.size(), 1);
     Assert.assertTrue(this.dags.containsKey(dagId1));
 
+    String slakilledMeterName = 
MetricRegistry.name(ServiceMetricNames.GOBBLIN_SERVICE_PREFIX, "job0", 
ServiceMetricNames.START_SLA_EXCEEDED_FLOWS_METER);
+    
Assert.assertEquals(metricContext.getParent().get().getMeters().get(slakilledMeterName).getCount(),
 1);
+
     // Cleanup
     this._dagManagerThread.run();
     this._dagManagerThread.run();
@@ -745,6 +749,51 @@ public class DagManagerTest {
   }
 
   @Test (dependsOnMethods = "testJobStartSLAKilledDag")
+  public void testJobKilledSLAMetricsArePerExecutor() throws 
URISyntaxException, IOException {
+    long flowExecutionId = System.currentTimeMillis();
+    Config executorOneConfig = ConfigFactory.empty()
+        .withValue(ConfigurationKeys.SPECEXECUTOR_INSTANCE_URI_KEY, 
ConfigValueFactory.fromAnyRef("executorOne"))
+        .withValue(ConfigurationKeys.FLOW_EXECUTION_ID_KEY, 
ConfigValueFactory.fromAnyRef(flowExecutionId));
+    Config executorTwoConfig = 
ConfigFactory.empty().withValue(ConfigurationKeys.SPECEXECUTOR_INSTANCE_URI_KEY,
 ConfigValueFactory.fromAnyRef("executorTwo"));
+    List<Dag<JobExecutionPlan>> dagList = buildDagList(2, "user", 
executorOneConfig);
+    dagList.add(buildDag("2", flowExecutionId, "FINISH_RUNNING", 1, "user", 
executorTwoConfig));
+
+    //Add a dag to the queue of dags
+    this.queue.offer(dagList.get(0));
+    this.queue.offer(dagList.get(1));
+    this.queue.offer(dagList.get(2));;
+    // The start time should be 16 minutes ago, which is past the start SLA so 
the job should be cancelled
+    Iterator<JobStatus> jobStatusIterator1 =
+        getMockJobStatus("flow0", "group0", flowExecutionId, "job0", "group0", 
String.valueOf(ExecutionStatus.ORCHESTRATED),
+            false, flowExecutionId - 16 * 60 * 1000);
+    Iterator<JobStatus> jobStatusIterator2 =
+        getMockJobStatus("flow1", "flow1", flowExecutionId+1, "job0", 
"group1", String.valueOf(ExecutionStatus.ORCHESTRATED),
+            false, flowExecutionId - 16 * 60 * 1000);
+    Iterator<JobStatus> jobStatusIterator3 =
+        getMockJobStatus("flow2", "flow2", flowExecutionId+1, "job0", 
"group2", String.valueOf(ExecutionStatus.ORCHESTRATED),
+            false, flowExecutionId - 16 * 60 * 1000);
+
+    Mockito.when(_jobStatusRetriever
+        .getJobStatusesForFlowExecution(Mockito.anyString(), 
Mockito.anyString(), Mockito.anyLong(), Mockito.anyString(), 
Mockito.anyString())).
+        thenReturn(jobStatusIterator1).
+        thenReturn(jobStatusIterator2).
+        thenReturn(jobStatusIterator3);
+
+    // Run the thread once. All 3 jobs should be emitted an sla exceeded event
+    this._dagManagerThread.run();
+
+    String slakilledMeterName1 = 
MetricRegistry.name(ServiceMetricNames.GOBBLIN_SERVICE_PREFIX, "executorOne", 
ServiceMetricNames.START_SLA_EXCEEDED_FLOWS_METER);
+    String slakilledMeterName2 = 
MetricRegistry.name(ServiceMetricNames.GOBBLIN_SERVICE_PREFIX, "executorTwo", 
ServiceMetricNames.START_SLA_EXCEEDED_FLOWS_METER);
+    
Assert.assertEquals(metricContext.getParent().get().getMeters().get(slakilledMeterName1).getCount(),
 2);
+    
Assert.assertEquals(metricContext.getParent().get().getMeters().get(slakilledMeterName2).getCount(),
 1);
+    // Cleanup
+    this._dagManagerThread.run();
+    Assert.assertEquals(this.dags.size(), 0);
+    Assert.assertEquals(this.jobToDag.size(), 0);
+    Assert.assertEquals(this.dagToJobs.size(), 0);
+  }
+
+  @Test (dependsOnMethods = "testJobKilledSLAMetricsArePerExecutor")
   public void testDagManagerWithBadFlowSLAConfig() throws URISyntaxException, 
IOException {
     long flowExecutionId = System.currentTimeMillis();
     String flowGroup = "group0";

Reply via email to