This is an automated email from the ASF dual-hosted git repository.

wlo pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/gobblin.git


The following commit(s) were added to refs/heads/master by this push:
     new 495bdafe5 [GOBBLIN-1934] Monitor High Level Consumer queue size (#3805)
495bdafe5 is described below

commit 495bdafe58fa7105e725af55e01d38a5afc3d4d8
Author: umustafi <[email protected]>
AuthorDate: Tue Oct 24 14:01:44 2023 -0700

    [GOBBLIN-1934] Monitor High Level Consumer queue size (#3805)
    
    * Emit metrics to monitor high level consumer queue size
    
    * Empty commit to trigger tests
    
    * Use BlockingQueue.size() func instead of atomic integer array
    
    * Remove unused import & add DagActionChangeMonitor prefix to metric
    
    * Refactor to avoid repeating code
    
    * Make protected variables private where possible
    
    * Fix white space
    
    ---------
    
    Co-authored-by: Urmi Mustafi <[email protected]>
---
 .../gobblin/runtime/kafka/HighLevelConsumer.java   | 24 ++++++++++++++++++++--
 .../gobblin/runtime/metrics/RuntimeMetrics.java    |  1 +
 .../monitoring/DagActionStoreChangeMonitor.java    |  9 ++++++--
 3 files changed, 30 insertions(+), 4 deletions(-)

diff --git 
a/gobblin-runtime/src/main/java/org/apache/gobblin/runtime/kafka/HighLevelConsumer.java
 
b/gobblin-runtime/src/main/java/org/apache/gobblin/runtime/kafka/HighLevelConsumer.java
index 7b494201e..1cba016bd 100644
--- 
a/gobblin-runtime/src/main/java/org/apache/gobblin/runtime/kafka/HighLevelConsumer.java
+++ 
b/gobblin-runtime/src/main/java/org/apache/gobblin/runtime/kafka/HighLevelConsumer.java
@@ -51,6 +51,7 @@ import org.apache.gobblin.kafka.client.DecodeableKafkaRecord;
 import org.apache.gobblin.kafka.client.GobblinConsumerRebalanceListener;
 import org.apache.gobblin.kafka.client.GobblinKafkaConsumerClient;
 import org.apache.gobblin.kafka.client.KafkaConsumerRecord;
+import org.apache.gobblin.metrics.ContextAwareGauge;
 import org.apache.gobblin.metrics.MetricContext;
 import org.apache.gobblin.metrics.Tag;
 import org.apache.gobblin.runtime.metrics.RuntimeMetrics;
@@ -101,6 +102,7 @@ public abstract class HighLevelConsumer<K,V> extends 
AbstractIdleService {
   private final ScheduledExecutorService consumerExecutor;
   private final ExecutorService queueExecutor;
   private final BlockingQueue[] queues;
+  private ContextAwareGauge[] queueSizeGauges;
   private final AtomicInteger recordsProcessed;
   private final Map<KafkaPartition, Long> partitionOffsetsToCommit;
   private final boolean enableAutoCommit;
@@ -127,7 +129,7 @@ public abstract class HighLevelConsumer<K,V> extends 
AbstractIdleService {
     this.consumerExecutor = 
Executors.newSingleThreadScheduledExecutor(ExecutorsUtils.newThreadFactory(Optional.of(log),
 Optional.of("HighLevelConsumerThread")));
     this.queueExecutor = Executors.newFixedThreadPool(this.numThreads, 
ExecutorsUtils.newThreadFactory(Optional.of(log), 
Optional.of("QueueProcessor-%d")));
     this.queues = new LinkedBlockingQueue[numThreads];
-    for(int i=0; i<queues.length; i++) {
+    for(int i = 0; i < queues.length; i++) {
       this.queues[i] = new LinkedBlockingQueue();
     }
     this.recordsProcessed = new AtomicInteger(0);
@@ -196,7 +198,24 @@ public abstract class HighLevelConsumer<K,V> extends 
AbstractIdleService {
    * this method to instantiate their own metrics.
    */
   protected void createMetrics() {
-    this.messagesRead = 
this.metricContext.counter(RuntimeMetrics.GOBBLIN_KAFKA_HIGH_LEVEL_CONSUMER_MESSAGES_READ);
+    String prefix = getMetricsPrefix();
+    this.messagesRead = this.metricContext.counter(prefix +
+        RuntimeMetrics.GOBBLIN_KAFKA_HIGH_LEVEL_CONSUMER_MESSAGES_READ);
+    this.queueSizeGauges = new ContextAwareGauge[numThreads];
+    for (int i = 0; i < numThreads; i++) {
+      // An 'effectively' final variable is needed inside the lambda 
expression below
+      int finalI = i;
+      this.queueSizeGauges[i] = this.metricContext.newContextAwareGauge(prefix 
+
+          RuntimeMetrics.GOBBLIN_KAFKA_HIGH_LEVEL_CONSUMER_QUEUE_SIZE_PREFIX + 
"-" + i,
+          () -> queues[finalI].size());
+    }
+  }
+
+  /**
+   * Used by child classes to distinguish prefixes from one another
+   */
+  protected String getMetricsPrefix() {
+    return "";
   }
 
   /**
@@ -237,6 +256,7 @@ public abstract class HighLevelConsumer<K,V> extends 
AbstractIdleService {
   private void consume() {
     try {
       Iterator<KafkaConsumerRecord> itr = gobblinKafkaConsumerClient.consume();
+      // TODO: we may be committing too early and only want to commit after 
process messages
       if(!enableAutoCommit) {
         commitOffsets();
       }
diff --git 
a/gobblin-runtime/src/main/java/org/apache/gobblin/runtime/metrics/RuntimeMetrics.java
 
b/gobblin-runtime/src/main/java/org/apache/gobblin/runtime/metrics/RuntimeMetrics.java
index 8fc1258ab..dc4e26e91 100644
--- 
a/gobblin-runtime/src/main/java/org/apache/gobblin/runtime/metrics/RuntimeMetrics.java
+++ 
b/gobblin-runtime/src/main/java/org/apache/gobblin/runtime/metrics/RuntimeMetrics.java
@@ -28,6 +28,7 @@ public class RuntimeMetrics {
   // Metric names
   public static final String GOBBLIN_KAFKA_HIGH_LEVEL_CONSUMER_MESSAGES_READ =
       "gobblin.kafka.highLevelConsumer.messagesRead";
+  public static final String 
GOBBLIN_KAFKA_HIGH_LEVEL_CONSUMER_QUEUE_SIZE_PREFIX = 
"gobblin.kafka.highLevelConsumer.queueSize";
   public static final String GOBBLIN_JOB_MONITOR_KAFKA_TOTAL_SPECS = 
"gobblin.jobMonitor.kafka.totalSpecs";
   public static final String GOBBLIN_JOB_MONITOR_KAFKA_NEW_SPECS = 
"gobblin.jobMonitor.kafka.newSpecs";
   public static final String GOBBLIN_JOB_MONITOR_KAFKA_UPDATED_SPECS = 
"gobblin.jobMonitor.kafka.updatedSpecs";
diff --git 
a/gobblin-service/src/main/java/org/apache/gobblin/service/monitoring/DagActionStoreChangeMonitor.java
 
b/gobblin-service/src/main/java/org/apache/gobblin/service/monitoring/DagActionStoreChangeMonitor.java
index e5a2d090d..6855ca669 100644
--- 
a/gobblin-service/src/main/java/org/apache/gobblin/service/monitoring/DagActionStoreChangeMonitor.java
+++ 
b/gobblin-service/src/main/java/org/apache/gobblin/service/monitoring/DagActionStoreChangeMonitor.java
@@ -34,7 +34,6 @@ import lombok.extern.slf4j.Slf4j;
 import org.apache.gobblin.kafka.client.DecodeableKafkaRecord;
 import org.apache.gobblin.metrics.ContextAwareGauge;
 import org.apache.gobblin.metrics.ContextAwareMeter;
-import org.apache.gobblin.metrics.ServiceMetricNames;
 import org.apache.gobblin.runtime.api.DagActionStore;
 import org.apache.gobblin.runtime.api.FlowSpec;
 import org.apache.gobblin.runtime.api.SpecNotFoundException;
@@ -217,7 +216,8 @@ public class DagActionStoreChangeMonitor extends 
HighLevelConsumer {
 
   @Override
   protected void createMetrics() {
-    super.messagesRead = 
this.getMetricContext().counter(ServiceMetricNames.GOBBLIN_SERVICE_PREFIX + 
RuntimeMetrics.GOBBLIN_KAFKA_HIGH_LEVEL_CONSUMER_MESSAGES_READ);
+    super.createMetrics();
+    // Dag Action specific metrics
     this.killsInvoked = 
this.getMetricContext().contextAwareMeter(RuntimeMetrics.GOBBLIN_DAG_ACTION_STORE_MONITOR_KILLS_INVOKED);
     this.resumesInvoked = 
this.getMetricContext().contextAwareMeter(RuntimeMetrics.GOBBLIN_DAG_ACTION_STORE_MONITOR_RESUMES_INVOKED);
     this.flowsLaunched = 
this.getMetricContext().contextAwareMeter(RuntimeMetrics.GOBBLIN_DAG_ACTION_STORE_MONITOR_FLOWS_LAUNCHED);
@@ -228,4 +228,9 @@ public class DagActionStoreChangeMonitor extends 
HighLevelConsumer {
     this.produceToConsumeDelayMillis = 
this.getMetricContext().newContextAwareGauge(RuntimeMetrics.GOBBLIN_DAG_ACTION_STORE_PRODUCE_TO_CONSUME_DELAY_MILLIS,
 () -> produceToConsumeDelayValue);
     this.getMetricContext().register(this.produceToConsumeDelayMillis);
   }
+
+  @Override
+  protected String getMetricsPrefix() {
+    return RuntimeMetrics.DAG_ACTION_STORE_MONITOR_PREFIX + ".";
+  }
 }

Reply via email to