phet commented on a change in pull request #3411:
URL: https://github.com/apache/gobblin/pull/3411#discussion_r726399881



##########
File path: 
gobblin-service/src/main/java/org/apache/gobblin/service/monitoring/KafkaJobStatusMonitor.java
##########
@@ -118,6 +126,10 @@
 
   private final Retryer<Void> persistJobStatusRetryer;
 
+  private final ScheduledExecutorService scheduler;
+
+  private final ArrayList<ScheduledFuture> futuresList;

Review comment:
       with these (and a few other names), I recommend naming semantically, 
rather than merely after the abstraction;  e.g. `metricRemovalScheduler`, 
`pendingRemovalFutures`, etc.
   
   choice is yours... I find that better conveys context--even anticipating 
disambiguation, if/when we again use the same tool (i.e. class) to unrelated 
ends.
   
   (e.g. the other `ScheduledExecutorService scheduledExecutorService` just 
above would similarly minimize present confusion potential w/ such naming.)

##########
File path: 
gobblin-service/src/main/java/org/apache/gobblin/service/monitoring/KafkaJobStatusMonitor.java
##########
@@ -148,6 +160,10 @@ public KafkaJobStatusMonitor(String topic, Config config, 
int numThreads, JobIss
             }
           }
         }));
+
+    // Used to remove gauges in the background
+    this.scheduler = Executors.newScheduledThreadPool(1);
+    this.futuresList = new ArrayList<>();

Review comment:
       `Collections.synchronizedList`?  ...don't believe `ArrayList` is 
certified thread-safe.

##########
File path: 
gobblin-service/src/main/java/org/apache/gobblin/service/monitoring/KafkaJobStatusMonitor.java
##########
@@ -346,3 +372,18 @@ public static long getExecutionIdFromTableName(String 
tableName) {
   protected abstract org.apache.gobblin.configuration.State 
parseJobStatus(GobblinTrackingEvent event);
 
 }
+
+@Slf4j
+class RunnableWithMetricContext implements Runnable {
+  private MetricContext _metricContext;
+  private String gaugeToRemove;
+
+  public RunnableWithMetricContext(MetricContext providedContext, String 
gaugeName) {
+    this._metricContext = providedContext;
+    this.gaugeToRemove = gaugeName;
+  }
+
+  public void run() {

Review comment:
       generally a best practice to wrap executor tasks in a `catch (Throwable 
t)` (I presume `.remove` could throw something...).  since you're not checking 
the `Future` returned for exceptions, logging there would be our only way to 
learn specifics of an issue.

##########
File path: 
gobblin-service/src/main/java/org/apache/gobblin/service/monitoring/KafkaJobStatusMonitor.java
##########
@@ -169,6 +185,11 @@ public void shutDown() {
      this.scheduledExecutorService.shutdown();
      try {
        this.scheduledExecutorService.awaitTermination(30, TimeUnit.SECONDS);
+
+       // Upon shutdown, cancel any pending removals of gauges

Review comment:
       actually, thinking about what you're doing w/ `.cancel(true)`, maybe you 
just want `.shutdownNow()` and dispense w/ need for bookkeeping `futuresList`

##########
File path: 
gobblin-service/src/main/java/org/apache/gobblin/service/monitoring/KafkaJobStatusMonitor.java
##########
@@ -169,6 +185,11 @@ public void shutDown() {
      this.scheduledExecutorService.shutdown();
      try {
        this.scheduledExecutorService.awaitTermination(30, TimeUnit.SECONDS);
+
+       // Upon shutdown, cancel any pending removals of gauges

Review comment:
       `.shutdown()` on the (new) `scheduler`, so it no longer accepts tasks?
   
   (but if logically certain that none more could be submitted, explain 
reasoning here w/ comment, so clear not left out.)

##########
File path: 
gobblin-service/src/main/java/org/apache/gobblin/service/monitoring/KafkaJobStatusMonitor.java
##########
@@ -323,6 +344,11 @@ private void emitWorkUnitCountMetric(GobblinTrackingEvent 
event) {
           () -> this.flowNameGroupToWorkUnitCount.get(workUnitCountName));
       this.getMetricContext().register(workUnitCountName, gauge);
     }
+
+   // Schedule this gauge to be removed after emission
+    Runnable task = new RunnableWithMetricContext(this.getMetricContext(), 
workUnitCountName);
+    ScheduledFuture<?> future = this.scheduler.schedule(task, 
EXPECTED_TIME_FOR_METRICS_REPORTING, TimeUnit.SECONDS);
+    this.futuresList.add(future);;

Review comment:
       is there a `remove` to bound growth?

##########
File path: 
gobblin-service/src/main/java/org/apache/gobblin/service/monitoring/KafkaJobStatusMonitor.java
##########
@@ -323,6 +344,11 @@ private void emitWorkUnitCountMetric(GobblinTrackingEvent 
event) {
           () -> this.flowNameGroupToWorkUnitCount.get(workUnitCountName));
       this.getMetricContext().register(workUnitCountName, gauge);
     }
+
+   // Schedule this gauge to be removed after emission
+    Runnable task = new RunnableWithMetricContext(this.getMetricContext(), 
workUnitCountName);
+    ScheduledFuture<?> future = this.scheduler.schedule(task, 
EXPECTED_TIME_FOR_METRICS_REPORTING, TimeUnit.SECONDS);

Review comment:
       there's really the larger issue of conflating the WU counts for separate 
jobs that are part of the same flow.  the corner case of missed metrics may not 
be worth pursuing as much as would differentiating multiple jobs in the same 
flow (since there is the max 1/min limitation on reporting, so metrics of two 
jobs anyway collide when rapid sequencing).

##########
File path: 
gobblin-service/src/main/java/org/apache/gobblin/service/monitoring/KafkaJobStatusMonitor.java
##########
@@ -323,6 +344,11 @@ private void emitWorkUnitCountMetric(GobblinTrackingEvent 
event) {
           () -> this.flowNameGroupToWorkUnitCount.get(workUnitCountName));
       this.getMetricContext().register(workUnitCountName, gauge);
     }
+
+   // Schedule this gauge to be removed after emission
+    Runnable task = new RunnableWithMetricContext(this.getMetricContext(), 
workUnitCountName);

Review comment:
       YMMV, but may not need a  heavier-weight named abstraction... you could 
consider a lambda closing over these vars.  (entirely up to you.)

##########
File path: 
gobblin-service/src/main/java/org/apache/gobblin/service/monitoring/KafkaJobStatusMonitor.java
##########
@@ -97,6 +103,8 @@
   private static final String KAFKA_AUTO_OFFSET_RESET_KEY = 
"auto.offset.reset";
   private static final String KAFKA_AUTO_OFFSET_RESET_SMALLEST = "smallest";
 
+  private static final int EXPECTED_TIME_FOR_METRICS_REPORTING = 90;

Review comment:
       are you concerned they may not have yet been sent (when only 1.5x 
reporting interval)?




-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: [email protected]

For queries about this service, please contact Infrastructure at:
[email protected]


Reply via email to