phet commented on code in PR #3502:
URL: https://github.com/apache/gobblin/pull/3502#discussion_r866250207
##########
gobblin-service/src/main/java/org/apache/gobblin/service/modules/orchestration/DagManager.java:
##########
@@ -457,7 +460,8 @@ public static class DagManagerThread implements Runnable {
DagManagerThread(JobStatusRetriever jobStatusRetriever, DagStateStore
dagStateStore, DagStateStore failedDagStateStore,
BlockingQueue<Dag<JobExecutionPlan>> queue, BlockingQueue<String>
cancelQueue, BlockingQueue<String> resumeQueue,
boolean instrumentationEnabled, Set<String> failedDagIds,
ContextAwareMeter allSuccessfulMeter,
- ContextAwareMeter allFailedMeter, Long defaultJobStartSla,
UserQuotaManager quotaManager, int dagMangerThreadId) {
+ ContextAwareMeter allFailedMeter, Map<String, ContextAwareMeter>
startSlaExceededMeters,
Review Comment:
not mandatory now, but seeing there are already three `CAMeter`-related
params makes me wonder whether a higher-level of abstraction might be in order
(to collect them together).
perhaps wait and see?
##########
gobblin-service/src/main/java/org/apache/gobblin/service/modules/orchestration/DagManager.java:
##########
@@ -1098,6 +1105,19 @@ private ContextAwareMeter getGroupMeterForDag(String
dagId, String meterName, Ma
group ->
metricContext.contextAwareMeter(MetricRegistry.name(ServiceMetricNames.GOBBLIN_SERVICE_PREFIX,
group, meterName)));
}
+ /**
+ * Used to track metrics for different specExecutors to detect issues with
the specExecutor itself
+ * @param dagNode
+ * @param meterName
+ * @param meterMap
+ * @return
+ */
+ private ContextAwareMeter getExecutorMeterForDag(DagNode<JobExecutionPlan>
dagNode, String meterName, Map<String, ContextAwareMeter> meterMap) {
+ String executorName =
dagNode.getValue().getSpecExecutor().getUri().toString();
+ return meterMap.computeIfAbsent(executorName,
+ executorUri ->
metricContext.contextAwareMeter(MetricRegistry.name(ServiceMetricNames.GOBBLIN_SERVICE_PREFIX,
executorUri, meterName)));
Review Comment:
to turn this on its head, the map you're carrying around and passing into
here is basically just function memoization for this `computeIfAbsent` call.
up to you, but I might raise the level of abstraction by presenting this as
a cache of `ContextAwareMeter`s that delegates read-through (initialization) to
the `MetricRegistry`.
##########
gobblin-service/src/main/java/org/apache/gobblin/service/modules/orchestration/DagManager.java:
##########
@@ -357,6 +357,8 @@ public synchronized void setActive(boolean active) {
ContextAwareMeter allSuccessfulMeter = null;
ContextAwareMeter allFailedMeter = null;
+ // TODO: create a map of RatioGauges for success/failed per executor,
will require preprocessing of available executors
+ Map<String, ContextAwareMeter> startSlaExceededMeters =
Maps.newConcurrentMap();
Review Comment:
not clear what the `String` key reflects... hence where the todo fits
against what's implemented now
my naming convention (clearly not what's used already in the project) is to
name maps with a `byX` suffix, so e.g. `startSlaExceededMetersByExecutorId`
UPDATE: now, after reading the entire PR, I understand the TODO as the
intent to expand into other per-executor metrics. the way you phrased it at
the implementation level initially distracted me. so then, two Qs:
a. why is the pre-processing required, rather than the JIT
initialization/vivication you do now (e.g. the `computeIfAbsent`)?
b. any other per-executor stats in mind?
##########
gobblin-service/src/main/java/org/apache/gobblin/service/modules/orchestration/DagManager.java:
##########
@@ -1098,6 +1105,19 @@ private ContextAwareMeter getGroupMeterForDag(String
dagId, String meterName, Ma
group ->
metricContext.contextAwareMeter(MetricRegistry.name(ServiceMetricNames.GOBBLIN_SERVICE_PREFIX,
group, meterName)));
}
+ /**
+ * Used to track metrics for different specExecutors to detect issues with
the specExecutor itself
+ * @param dagNode
+ * @param meterName
+ * @param meterMap
+ * @return
+ */
+ private ContextAwareMeter getExecutorMeterForDag(DagNode<JobExecutionPlan>
dagNode, String meterName, Map<String, ContextAwareMeter> meterMap) {
+ String executorName =
dagNode.getValue().getSpecExecutor().getUri().toString();
Review Comment:
if we lack a `DagManagerUtils.getExecutorName`, should we add?
##########
gobblin-service/src/test/java/org/apache/gobblin/service/modules/orchestration/DagManagerTest.java:
##########
@@ -736,6 +736,9 @@ public void testJobStartSLAKilledDag() throws
URISyntaxException, IOException {
Assert.assertEquals(this.dagToJobs.size(), 1);
Assert.assertTrue(this.dags.containsKey(dagId1));
+ String slakilledMeterName =
MetricRegistry.name(ServiceMetricNames.GOBBLIN_SERVICE_PREFIX, "job0",
ServiceMetricNames.START_SLA_EXCEEDED_FLOWS_METER);
+
Assert.assertEquals(metricContext.getParent().get().getMeters().get(slakilledMeterName).getCount(),
1);
Review Comment:
would it be worth verifying that meters are per-executor, not a single
aggregation?
--
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
To unsubscribe, e-mail: [email protected]
For queries about this service, please contact Infrastructure at:
[email protected]