This is an automated email from the ASF dual-hosted git repository.

arjun4084346 pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/gobblin.git


The following commit(s) were added to refs/heads/master by this push:
     new e392c97de [GOBBLIN-1978] Initialize dag action store monitor metrics 
(#3851)
e392c97de is described below

commit e392c97debfbcd3cd8e687517f5eb1bf904286d7
Author: umustafi <[email protected]>
AuthorDate: Fri Dec 22 11:03:27 2023 -0800

    [GOBBLIN-1978] Initialize dag action store monitor metrics (#3851)
    
    * Create dagAction handling metrics when monitor is initialized
    
    * Add warning comment
    
    * Initialize all metrics and context earlier
    
    ---------
    
    Co-authored-by: Urmi Mustafi <[email protected]>
---
 .../gobblin/runtime/kafka/HighLevelConsumer.java   |  6 +++---
 .../monitoring/DagActionStoreChangeMonitor.java    | 23 ++++++++++++++++++++++
 2 files changed, 26 insertions(+), 3 deletions(-)

diff --git 
a/gobblin-runtime/src/main/java/org/apache/gobblin/runtime/kafka/HighLevelConsumer.java
 
b/gobblin-runtime/src/main/java/org/apache/gobblin/runtime/kafka/HighLevelConsumer.java
index 1cba016bd..82a45ceaf 100644
--- 
a/gobblin-runtime/src/main/java/org/apache/gobblin/runtime/kafka/HighLevelConsumer.java
+++ 
b/gobblin-runtime/src/main/java/org/apache/gobblin/runtime/kafka/HighLevelConsumer.java
@@ -99,7 +99,7 @@ public abstract class HighLevelConsumer<K,V> extends 
AbstractIdleService {
   protected Counter messagesRead;
   @Getter
   private final GobblinKafkaConsumerClient gobblinKafkaConsumerClient;
-  private final ScheduledExecutorService consumerExecutor;
+  protected final ScheduledExecutorService consumerExecutor;
   private final ExecutorService queueExecutor;
   private final BlockingQueue[] queues;
   private ContextAwareGauge[] queueSizeGauges;
@@ -253,7 +253,7 @@ public abstract class HighLevelConsumer<K,V> extends 
AbstractIdleService {
    * Note: All records from a KafkaPartition are added to the same queue.
    * A queue can contain records from multiple partitions if partitions > 
numThreads(queues)
    */
-  private void consume() {
+  protected void consume() {
     try {
       Iterator<KafkaConsumerRecord> itr = gobblinKafkaConsumerClient.consume();
       // TODO: we may be committing too early and only want to commit after 
process messages
@@ -275,7 +275,7 @@ public abstract class HighLevelConsumer<K,V> extends 
AbstractIdleService {
    * Assigns a queue to each thread of the {@link #queueExecutor}
    * Note: Assumption here is that {@link #numThreads} is same a number of 
queues
    */
-  private void processQueues() {
+  protected void processQueues() {
     for(BlockingQueue queue : queues) {
       queueExecutor.execute(new QueueProcessor(queue));
     }
diff --git 
a/gobblin-service/src/main/java/org/apache/gobblin/service/monitoring/DagActionStoreChangeMonitor.java
 
b/gobblin-service/src/main/java/org/apache/gobblin/service/monitoring/DagActionStoreChangeMonitor.java
index 34ebaa722..647bb8089 100644
--- 
a/gobblin-service/src/main/java/org/apache/gobblin/service/monitoring/DagActionStoreChangeMonitor.java
+++ 
b/gobblin-service/src/main/java/org/apache/gobblin/service/monitoring/DagActionStoreChangeMonitor.java
@@ -110,6 +110,12 @@ public class DagActionStoreChangeMonitor extends 
HighLevelConsumer {
     this.orchestrator = orchestrator;
     this.dagActionStore = dagActionStore;
     this.isMultiActiveSchedulerEnabled = isMultiActiveSchedulerEnabled;
+
+    /*
+    Metrics need to be created before initializeMonitor() below is called (or 
more specifically handleDagAction() is
+    called on any dagAction)
+     */
+    buildMetricsContextAndMetrics();
   }
 
   @Override
@@ -136,6 +142,23 @@ public class DagActionStoreChangeMonitor extends 
HighLevelConsumer {
     }
   }
 
+  /*
+   Override this method to do the same sequence as the parent class, except 
create metrics. Instead, we create metrics
+   earlier upon class initialization because they are used immediately as dag 
actions are loaded and processed from
+   the DagActionStore.
+  */
+  @Override
+  protected void startUp() {
+    // Method that starts threads that processes queues
+    processQueues();
+    // Main thread that constantly polls messages from kafka
+    consumerExecutor.execute(() -> {
+      while (!shutdownRequested) {
+        consume();
+      }
+    });
+  }
+
   @Override
   /*
   This class is multithreaded and this method will be called by multiple 
threads, however any given message will be

Reply via email to