phet commented on code in PR #3502:
URL: https://github.com/apache/gobblin/pull/3502#discussion_r866250207


##########
gobblin-service/src/main/java/org/apache/gobblin/service/modules/orchestration/DagManager.java:
##########
@@ -457,7 +460,8 @@ public static class DagManagerThread implements Runnable {
     DagManagerThread(JobStatusRetriever jobStatusRetriever, DagStateStore 
dagStateStore, DagStateStore failedDagStateStore,
         BlockingQueue<Dag<JobExecutionPlan>> queue, BlockingQueue<String> 
cancelQueue, BlockingQueue<String> resumeQueue,
         boolean instrumentationEnabled, Set<String> failedDagIds, 
ContextAwareMeter allSuccessfulMeter,
-        ContextAwareMeter allFailedMeter, Long defaultJobStartSla, 
UserQuotaManager quotaManager, int dagMangerThreadId) {
+        ContextAwareMeter allFailedMeter, Map<String, ContextAwareMeter> 
startSlaExceededMeters,

Review Comment:
   not mandatory now, but seeing there are already three `CAMeter`-related 
params makes me wonder whether a higher-level of abstraction might be in order 
(to collect them together).
   
   perhaps wait and see?



##########
gobblin-service/src/main/java/org/apache/gobblin/service/modules/orchestration/DagManager.java:
##########
@@ -1098,6 +1105,19 @@ private ContextAwareMeter getGroupMeterForDag(String 
dagId, String meterName, Ma
           group -> 
metricContext.contextAwareMeter(MetricRegistry.name(ServiceMetricNames.GOBBLIN_SERVICE_PREFIX,
 group, meterName)));
     }
 
+    /**
+     * Used to track metrics for different specExecutors to detect issues with 
the specExecutor itself
+     * @param dagNode
+     * @param meterName
+     * @param meterMap
+     * @return
+     */
+    private ContextAwareMeter getExecutorMeterForDag(DagNode<JobExecutionPlan> 
dagNode, String meterName, Map<String, ContextAwareMeter> meterMap) {
+      String executorName = 
dagNode.getValue().getSpecExecutor().getUri().toString();
+      return meterMap.computeIfAbsent(executorName,
+          executorUri -> 
metricContext.contextAwareMeter(MetricRegistry.name(ServiceMetricNames.GOBBLIN_SERVICE_PREFIX,
 executorUri, meterName)));

Review Comment:
   to turn this on its head, the map you're carrying around and passing into 
here is basically just function memoization for this `computeIfAbsent` call.
   
   up to you, but I might raise the level of abstraction by presenting this as 
a cache of `ContextAwareMeter`s that delegates read-through (initialization) to 
the `MetricRegistry`.



##########
gobblin-service/src/main/java/org/apache/gobblin/service/modules/orchestration/DagManager.java:
##########
@@ -357,6 +357,8 @@ public synchronized void setActive(boolean active) {
 
         ContextAwareMeter allSuccessfulMeter = null;
         ContextAwareMeter allFailedMeter = null;
+        // TODO: create a map of RatioGauges for success/failed per executor, 
will require preprocessing of available executors
+        Map<String, ContextAwareMeter> startSlaExceededMeters = 
Maps.newConcurrentMap();

Review Comment:
   not clear what the `String` key reflects... hence where the todo fits 
against what's implemented now
   
   my naming convention (clearly not what's used already in the project) is to 
name maps with a `byX` suffix, so e.g. `startSlaExceededMetersByExecutorId`
   
   UPDATE: now, after reading the entire PR, I understand the TODO as the 
intent to expand into other per-executor metrics.  the way you phrased it at 
the implementation level initially distracted me.  so then, two Qs:
   a. why is the pre-processing required, rather than the JIT 
initialization/vivication you do now (e.g. the `computeIfAbsent`)?
   b. any other per-executor stats in mind?



##########
gobblin-service/src/main/java/org/apache/gobblin/service/modules/orchestration/DagManager.java:
##########
@@ -1098,6 +1105,19 @@ private ContextAwareMeter getGroupMeterForDag(String 
dagId, String meterName, Ma
           group -> 
metricContext.contextAwareMeter(MetricRegistry.name(ServiceMetricNames.GOBBLIN_SERVICE_PREFIX,
 group, meterName)));
     }
 
+    /**
+     * Used to track metrics for different specExecutors to detect issues with 
the specExecutor itself
+     * @param dagNode
+     * @param meterName
+     * @param meterMap
+     * @return
+     */
+    private ContextAwareMeter getExecutorMeterForDag(DagNode<JobExecutionPlan> 
dagNode, String meterName, Map<String, ContextAwareMeter> meterMap) {
+      String executorName = 
dagNode.getValue().getSpecExecutor().getUri().toString();

Review Comment:
   if we lack a `DagManagerUtils.getExecutorName`, should we add?



##########
gobblin-service/src/test/java/org/apache/gobblin/service/modules/orchestration/DagManagerTest.java:
##########
@@ -736,6 +736,9 @@ public void testJobStartSLAKilledDag() throws 
URISyntaxException, IOException {
     Assert.assertEquals(this.dagToJobs.size(), 1);
     Assert.assertTrue(this.dags.containsKey(dagId1));
 
+    String slakilledMeterName = 
MetricRegistry.name(ServiceMetricNames.GOBBLIN_SERVICE_PREFIX, "job0", 
ServiceMetricNames.START_SLA_EXCEEDED_FLOWS_METER);
+    
Assert.assertEquals(metricContext.getParent().get().getMeters().get(slakilledMeterName).getCount(),
 1);

Review Comment:
   would it be worth verifying that meters are per-executor, not a single 
aggregation?



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: [email protected]

For queries about this service, please contact Infrastructure at:
[email protected]

Reply via email to