[
https://issues.apache.org/jira/browse/GOBBLIN-1641?focusedWorklogId=766897&page=com.atlassian.jira.plugin.system.issuetabpanels:worklog-tabpanel#worklog-766897
]
ASF GitHub Bot logged work on GOBBLIN-1641:
-------------------------------------------
Author: ASF GitHub Bot
Created on: 05/May/22 19:55
Start Date: 05/May/22 19:55
Worklog Time Spent: 10m
Work Description: phet commented on code in PR #3502:
URL: https://github.com/apache/gobblin/pull/3502#discussion_r866250207
##########
gobblin-service/src/main/java/org/apache/gobblin/service/modules/orchestration/DagManager.java:
##########
@@ -457,7 +460,8 @@ public static class DagManagerThread implements Runnable {
DagManagerThread(JobStatusRetriever jobStatusRetriever, DagStateStore
dagStateStore, DagStateStore failedDagStateStore,
BlockingQueue<Dag<JobExecutionPlan>> queue, BlockingQueue<String>
cancelQueue, BlockingQueue<String> resumeQueue,
boolean instrumentationEnabled, Set<String> failedDagIds,
ContextAwareMeter allSuccessfulMeter,
- ContextAwareMeter allFailedMeter, Long defaultJobStartSla,
UserQuotaManager quotaManager, int dagMangerThreadId) {
+ ContextAwareMeter allFailedMeter, Map<String, ContextAwareMeter>
startSlaExceededMeters,
Review Comment:
not mandatory now, but seeing there are already three `CAMeter`-related
params makes me wonder whether a higher-level of abstraction might be in order
(to collect them together).
perhaps wait and see?
##########
gobblin-service/src/main/java/org/apache/gobblin/service/modules/orchestration/DagManager.java:
##########
@@ -1098,6 +1105,19 @@ private ContextAwareMeter getGroupMeterForDag(String
dagId, String meterName, Ma
group ->
metricContext.contextAwareMeter(MetricRegistry.name(ServiceMetricNames.GOBBLIN_SERVICE_PREFIX,
group, meterName)));
}
+ /**
+ * Used to track metrics for different specExecutors to detect issues with
the specExecutor itself
+ * @param dagNode
+ * @param meterName
+ * @param meterMap
+ * @return
+ */
+ private ContextAwareMeter getExecutorMeterForDag(DagNode<JobExecutionPlan>
dagNode, String meterName, Map<String, ContextAwareMeter> meterMap) {
+ String executorName =
dagNode.getValue().getSpecExecutor().getUri().toString();
+ return meterMap.computeIfAbsent(executorName,
+ executorUri ->
metricContext.contextAwareMeter(MetricRegistry.name(ServiceMetricNames.GOBBLIN_SERVICE_PREFIX,
executorUri, meterName)));
Review Comment:
to turn this on its head, the map you're carrying around and passing into
here is basically just function memoization for this `computeIfAbsent` call.
up to you, but I might raise the level of abstraction by presenting this as
a cache of `ContextAwareMeter`s that delegates read-through (initialization) to
the `MetricRegistry`.
##########
gobblin-service/src/main/java/org/apache/gobblin/service/modules/orchestration/DagManager.java:
##########
@@ -357,6 +357,8 @@ public synchronized void setActive(boolean active) {
ContextAwareMeter allSuccessfulMeter = null;
ContextAwareMeter allFailedMeter = null;
+ // TODO: create a map of RatioGauges for success/failed per executor,
will require preprocessing of available executors
+ Map<String, ContextAwareMeter> startSlaExceededMeters =
Maps.newConcurrentMap();
Review Comment:
not clear what the `String` key reflects... hence where the todo fits
against what's implemented now
my naming convention (clearly not what's used already in the project) is to
name maps with a `byX` suffix, so e.g. `startSlaExceededMetersByExecutorId`
UPDATE: now, after reading the entire PR, I understand the TODO as the
intent to expand into other per-executor metrics. the way you phrased it at
the implementation level initially distracted me. so then, two Qs:
a. why is the pre-processing required, rather than the JIT
initialization/vivication you do now (e.g. the `computeIfAbsent`)?
b. any other per-executor stats in mind?
##########
gobblin-service/src/main/java/org/apache/gobblin/service/modules/orchestration/DagManager.java:
##########
@@ -1098,6 +1105,19 @@ private ContextAwareMeter getGroupMeterForDag(String
dagId, String meterName, Ma
group ->
metricContext.contextAwareMeter(MetricRegistry.name(ServiceMetricNames.GOBBLIN_SERVICE_PREFIX,
group, meterName)));
}
+ /**
+ * Used to track metrics for different specExecutors to detect issues with
the specExecutor itself
+ * @param dagNode
+ * @param meterName
+ * @param meterMap
+ * @return
+ */
+ private ContextAwareMeter getExecutorMeterForDag(DagNode<JobExecutionPlan>
dagNode, String meterName, Map<String, ContextAwareMeter> meterMap) {
+ String executorName =
dagNode.getValue().getSpecExecutor().getUri().toString();
Review Comment:
if we lack a `DagManagerUtils.getExecutorName`, should we add?
##########
gobblin-service/src/test/java/org/apache/gobblin/service/modules/orchestration/DagManagerTest.java:
##########
@@ -736,6 +736,9 @@ public void testJobStartSLAKilledDag() throws
URISyntaxException, IOException {
Assert.assertEquals(this.dagToJobs.size(), 1);
Assert.assertTrue(this.dags.containsKey(dagId1));
+ String slakilledMeterName =
MetricRegistry.name(ServiceMetricNames.GOBBLIN_SERVICE_PREFIX, "job0",
ServiceMetricNames.START_SLA_EXCEEDED_FLOWS_METER);
+
Assert.assertEquals(metricContext.getParent().get().getMeters().get(slakilledMeterName).getCount(),
1);
Review Comment:
would it be worth verifying that meters are per-executor, not a single
aggregation?
Issue Time Tracking
-------------------
Worklog Id: (was: 766897)
Time Spent: 0.5h (was: 20m)
> Create metrics for sla exceeded flows
> -------------------------------------
>
> Key: GOBBLIN-1641
> URL: https://issues.apache.org/jira/browse/GOBBLIN-1641
> Project: Apache Gobblin
> Issue Type: Task
> Components: gobblin-service
> Reporter: William Lo
> Assignee: Abhishek Tiwari
> Priority: Major
> Time Spent: 0.5h
> Remaining Estimate: 0h
>
> Currently the dag manager does not have metrics around how many flows are
> being exceeded per executor, so we should track this.
--
This message was sent by Atlassian Jira
(v8.20.7#820007)