[
https://issues.apache.org/jira/browse/GOBBLIN-1560?focusedWorklogId=663715&page=com.atlassian.jira.plugin.system.issuetabpanels:worklog-tabpanel#worklog-663715
]
ASF GitHub Bot logged work on GOBBLIN-1560:
-------------------------------------------
Author: ASF GitHub Bot
Created on: 11/Oct/21 18:53
Start Date: 11/Oct/21 18:53
Worklog Time Spent: 10m
Work Description: phet commented on a change in pull request #3411:
URL: https://github.com/apache/gobblin/pull/3411#discussion_r726399881
##########
File path:
gobblin-service/src/main/java/org/apache/gobblin/service/monitoring/KafkaJobStatusMonitor.java
##########
@@ -118,6 +126,10 @@
private final Retryer<Void> persistJobStatusRetryer;
+ private final ScheduledExecutorService scheduler;
+
+ private final ArrayList<ScheduledFuture> futuresList;
Review comment:
with these (and a few other names), I recommend naming semantically,
rather than merely after the abstraction; e.g. `metricRemovalScheduler`,
`pendingRemovalFutures`, etc.
choice is yours... I find that better conveys context--even anticipating
disambiguation, if/when we again use the same tool (i.e. class) to unrelated
ends.
(e.g. the other `ScheduledExecutorService scheduledExecutorService` just
above would similarly minimize present confusion potential w/ such naming.)
##########
File path:
gobblin-service/src/main/java/org/apache/gobblin/service/monitoring/KafkaJobStatusMonitor.java
##########
@@ -148,6 +160,10 @@ public KafkaJobStatusMonitor(String topic, Config config,
int numThreads, JobIss
}
}
}));
+
+ // Used to remove gauges in the background
+ this.scheduler = Executors.newScheduledThreadPool(1);
+ this.futuresList = new ArrayList<>();
Review comment:
`Collections.synchronizedList`? ...don't believe `ArrayList` is
certified thread-safe.
##########
File path:
gobblin-service/src/main/java/org/apache/gobblin/service/monitoring/KafkaJobStatusMonitor.java
##########
@@ -346,3 +372,18 @@ public static long getExecutionIdFromTableName(String
tableName) {
protected abstract org.apache.gobblin.configuration.State
parseJobStatus(GobblinTrackingEvent event);
}
+
+@Slf4j
+class RunnableWithMetricContext implements Runnable {
+ private MetricContext _metricContext;
+ private String gaugeToRemove;
+
+ public RunnableWithMetricContext(MetricContext providedContext, String
gaugeName) {
+ this._metricContext = providedContext;
+ this.gaugeToRemove = gaugeName;
+ }
+
+ public void run() {
Review comment:
generally a best practice to wrap executor tasks in a `catch (Throwable
t)` (I presume `.remove` could throw something...). since you're not checking
the `Future` returned for exceptions, logging there would be our only way to
learn specifics of an issue.
##########
File path:
gobblin-service/src/main/java/org/apache/gobblin/service/monitoring/KafkaJobStatusMonitor.java
##########
@@ -169,6 +185,11 @@ public void shutDown() {
this.scheduledExecutorService.shutdown();
try {
this.scheduledExecutorService.awaitTermination(30, TimeUnit.SECONDS);
+
+ // Upon shutdown, cancel any pending removals of gauges
Review comment:
actually, thinking about what you're doing w/ `.cancel(true)`, maybe you
just want `.shutdownNow()` and dispense w/ need for bookkeeping `futuresList`
##########
File path:
gobblin-service/src/main/java/org/apache/gobblin/service/monitoring/KafkaJobStatusMonitor.java
##########
@@ -169,6 +185,11 @@ public void shutDown() {
this.scheduledExecutorService.shutdown();
try {
this.scheduledExecutorService.awaitTermination(30, TimeUnit.SECONDS);
+
+ // Upon shutdown, cancel any pending removals of gauges
Review comment:
`.shutdown()` on the (new) `scheduler`, so it no longer accepts tasks?
(but if logically certain that none more could be submitted, explain
reasoning here w/ comment, so clear not left out.)
##########
File path:
gobblin-service/src/main/java/org/apache/gobblin/service/monitoring/KafkaJobStatusMonitor.java
##########
@@ -323,6 +344,11 @@ private void emitWorkUnitCountMetric(GobblinTrackingEvent
event) {
() -> this.flowNameGroupToWorkUnitCount.get(workUnitCountName));
this.getMetricContext().register(workUnitCountName, gauge);
}
+
+ // Schedule this gauge to be removed after emission
+ Runnable task = new RunnableWithMetricContext(this.getMetricContext(),
workUnitCountName);
+ ScheduledFuture<?> future = this.scheduler.schedule(task,
EXPECTED_TIME_FOR_METRICS_REPORTING, TimeUnit.SECONDS);
+ this.futuresList.add(future);;
Review comment:
is there a `remove` to bound growth?
##########
File path:
gobblin-service/src/main/java/org/apache/gobblin/service/monitoring/KafkaJobStatusMonitor.java
##########
@@ -323,6 +344,11 @@ private void emitWorkUnitCountMetric(GobblinTrackingEvent
event) {
() -> this.flowNameGroupToWorkUnitCount.get(workUnitCountName));
this.getMetricContext().register(workUnitCountName, gauge);
}
+
+ // Schedule this gauge to be removed after emission
+ Runnable task = new RunnableWithMetricContext(this.getMetricContext(),
workUnitCountName);
+ ScheduledFuture<?> future = this.scheduler.schedule(task,
EXPECTED_TIME_FOR_METRICS_REPORTING, TimeUnit.SECONDS);
Review comment:
there's really the larger issue of conflating the WU counts for separate
jobs that are part of the same flow. the corner case of missed metrics may not
be worth pursuing as much as would differentiating multiple jobs in the same
flow (since there is the max 1/min limitation on reporting, so metrics of two
jobs anyway collide when rapid sequencing).
##########
File path:
gobblin-service/src/main/java/org/apache/gobblin/service/monitoring/KafkaJobStatusMonitor.java
##########
@@ -323,6 +344,11 @@ private void emitWorkUnitCountMetric(GobblinTrackingEvent
event) {
() -> this.flowNameGroupToWorkUnitCount.get(workUnitCountName));
this.getMetricContext().register(workUnitCountName, gauge);
}
+
+ // Schedule this gauge to be removed after emission
+ Runnable task = new RunnableWithMetricContext(this.getMetricContext(),
workUnitCountName);
Review comment:
YMMV, but may not need a heavier-weight named abstraction... you could
consider a lambda closing over these vars. (entirely up to you.)
##########
File path:
gobblin-service/src/main/java/org/apache/gobblin/service/monitoring/KafkaJobStatusMonitor.java
##########
@@ -97,6 +103,8 @@
private static final String KAFKA_AUTO_OFFSET_RESET_KEY =
"auto.offset.reset";
private static final String KAFKA_AUTO_OFFSET_RESET_SMALLEST = "smallest";
+ private static final int EXPECTED_TIME_FOR_METRICS_REPORTING = 90;
Review comment:
are you concerned they may not have yet been sent (when only 1.5x
reporting interval)?
--
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
To unsubscribe, e-mail: [email protected]
For queries about this service, please contact Infrastructure at:
[email protected]
Issue Time Tracking
-------------------
Worklog Id: (was: 663715)
Time Spent: 1h 20m (was: 1h 10m)
> Fix WorkUnit Count Reporting to InGraphs
> -----------------------------------------
>
> Key: GOBBLIN-1560
> URL: https://issues.apache.org/jira/browse/GOBBLIN-1560
> Project: Apache Gobblin
> Issue Type: Improvement
> Components: gobblin-service
> Reporter: Urmi Mustafi
> Assignee: Abhishek Tiwari
> Priority: Major
> Time Spent: 1h 20m
> Remaining Estimate: 0h
>
> At the moment, once a gauge for work unit count is set. The value will be
> continually emitted to inGraphs until the value is changed, making it
> confusing for users viewing inGraphs such as
> [https://ingraphs.prod.linkedin.com/range/%25%25%25prod-lva1.product.disre-service-deployable2/graph/SharedGobblinServiceLva1/GobblinService.ktwo.k2-war-snap-sas-ad_page_sets-with-ret-1.WorkUnitsCreated.rrd?fabrics=prod-lva1]
> to tell what the most recently set value is. Instead we will unregister this
> gauge after an interval we expect the value to be emitted to inGraphs so the
> graph will appear as a set of data points.
--
This message was sent by Atlassian Jira
(v8.3.4#803005)