This is an automated email from the ASF dual-hosted git repository.

wlo pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/gobblin.git


The following commit(s) were added to refs/heads/master by this push:
     new 1ca494435 [GOBBLIN-2037] Start DagActionMonitor functionality after 
its dependencies (#3916)
1ca494435 is described below

commit 1ca4944359fae3a7348a56427a2c38555811e977
Author: umustafi <[email protected]>
AuthorDate: Fri Apr 5 15:11:52 2024 -0700

    [GOBBLIN-2037] Start DagActionMonitor functionality after its dependencies 
(#3916)
    
    * Start DagActionMonitor functionality after its dependencies
    
    * Startup specStoreChangeMonitor later as well
    
    ---------
    
    Co-authored-by: Urmi Mustafi <[email protected]>
---
 .../modules/core/GobblinServiceManager.java        | 21 +++++-------
 .../monitoring/DagActionStoreChangeMonitor.java    | 40 +++++++++++++++-------
 .../DagActionStoreChangeMonitorFactory.java        |  1 -
 ...nagementDagActionStoreChangeMonitorFactory.java |  1 -
 .../service/monitoring/SpecStoreChangeMonitor.java | 23 +++++++++++++
 5 files changed, 59 insertions(+), 27 deletions(-)

diff --git 
a/gobblin-service/src/main/java/org/apache/gobblin/service/modules/core/GobblinServiceManager.java
 
b/gobblin-service/src/main/java/org/apache/gobblin/service/modules/core/GobblinServiceManager.java
index a5e8f102e..02b969a53 100644
--- 
a/gobblin-service/src/main/java/org/apache/gobblin/service/modules/core/GobblinServiceManager.java
+++ 
b/gobblin-service/src/main/java/org/apache/gobblin/service/modules/core/GobblinServiceManager.java
@@ -271,7 +271,8 @@ public class GobblinServiceManager implements 
ApplicationLauncher, StandardMetri
       throw new RuntimeException(String.format("getClass called to obtain %s 
without calling create method to "
           + "initialize GobblinServiceGuiceModule.", classToGet));
     }
-    Injector injector = Guice.createInjector(Stage.PRODUCTION, 
GOBBLIN_SERVICE_GUICE_MODULE);
+    // Use development stage to enable more verbose error messages and runtime 
checks
+    Injector injector = Guice.createInjector(Stage.DEVELOPMENT, 
GOBBLIN_SERVICE_GUICE_MODULE);
     return injector.getInstance(classToGet);
   }
 
@@ -450,18 +451,6 @@ public class GobblinServiceManager implements 
ApplicationLauncher, StandardMetri
     this.eventBus.register(this);
     this.serviceLauncher.start();
 
-    // Wait until spec consumer service is running to set scheduler to active
-    if (this.configuration.isWarmStandbyEnabled()) {
-      while (!this.specStoreChangeMonitor.isRunning()) {
-        try {
-          LOGGER.info("Waiting for SpecStoreChangeMonitor to be started...");
-          Thread.sleep(10);
-        } catch (InterruptedException e) {
-          LOGGER.warn("Interrupted while waiting for SpecStoreChangeMonitor to 
be started");
-        }
-      }
-    }
-
     if (this.helixManager.isPresent()) {
       // Subscribe to leadership changes
       this.helixManager.get().addControllerListener((ControllerChangeListener) 
this::handleLeadershipChange);
@@ -537,6 +526,12 @@ public class GobblinServiceManager implements 
ApplicationLauncher, StandardMetri
       this.dagManager.setActive(true);
       this.eventBus.register(this.dagManager);
     }
+
+    // Activate both monitors last as they're dependent on the SpecCompiler, 
Scheduler, and DagManager being active
+    if (configuration.isWarmStandbyEnabled()) {
+      this.specStoreChangeMonitor.setActive();
+      this.dagActionStoreChangeMonitor.setActive();
+    }
   }
 
   @Override
diff --git 
a/gobblin-service/src/main/java/org/apache/gobblin/service/monitoring/DagActionStoreChangeMonitor.java
 
b/gobblin-service/src/main/java/org/apache/gobblin/service/monitoring/DagActionStoreChangeMonitor.java
index 40bbb06bf..c9258c013 100644
--- 
a/gobblin-service/src/main/java/org/apache/gobblin/service/monitoring/DagActionStoreChangeMonitor.java
+++ 
b/gobblin-service/src/main/java/org/apache/gobblin/service/monitoring/DagActionStoreChangeMonitor.java
@@ -99,6 +99,8 @@ public class DagActionStoreChangeMonitor extends 
HighLevelConsumer {
   @VisibleForTesting
   protected FlowCatalog flowCatalog;
   protected DagActionStore dagActionStore;
+  @Getter
+  private volatile boolean isActive;
 
   // Note that the topic is an empty string (rather than null to avoid NPE) 
because this monitor relies on the consumer
   // client itself to determine all Kafka related information dynamically 
rather than through the config.
@@ -147,20 +149,34 @@ public class DagActionStoreChangeMonitor extends 
HighLevelConsumer {
   }
 
   /*
-   Override this method to do the same sequence as the parent class, except 
create metrics. Instead, we create metrics
-   earlier upon class initialization because they are used immediately as dag 
actions are loaded and processed from
-   the DagActionStore.
+   Override this method to do nothing, instead we create metrics upon class 
initialization and start processing the
+   queues and load dag actions from the DagActionStore after #setActive is 
called to make sure dependent services are
+   initialized properly.
   */
   @Override
-  protected void startUp() {
-    // Method that starts threads that processes queues
-    processQueues();
-    // Main thread that constantly polls messages from kafka
-    consumerExecutor.execute(() -> {
-      while (!shutdownRequested) {
-        consume();
-      }
-    });
+  protected void startUp() {}
+
+  /*
+   This method should be called once by the {@link GobblinServiceManager} only 
after the DagManager, FlowGraph and
+   SpecCompiler are initialized and running.
+   */
+  public synchronized void setActive() {
+    if (this.isActive) {
+      return;
+    }
+
+    if (isActive) {
+      this.isActive = true;
+      initializeMonitor();
+      // Method that starts threads that processes queues
+      processQueues();
+      // Main thread that constantly polls messages from kafka
+      consumerExecutor.execute(() -> {
+        while (!shutdownRequested) {
+          consume();
+        }
+      });
+    }
   }
 
   @Override
diff --git 
a/gobblin-service/src/main/java/org/apache/gobblin/service/monitoring/DagActionStoreChangeMonitorFactory.java
 
b/gobblin-service/src/main/java/org/apache/gobblin/service/monitoring/DagActionStoreChangeMonitorFactory.java
index 6277360e9..767f2b76d 100644
--- 
a/gobblin-service/src/main/java/org/apache/gobblin/service/monitoring/DagActionStoreChangeMonitorFactory.java
+++ 
b/gobblin-service/src/main/java/org/apache/gobblin/service/monitoring/DagActionStoreChangeMonitorFactory.java
@@ -74,7 +74,6 @@ public class DagActionStoreChangeMonitorFactory implements 
Provider<DagActionSto
   @Override
   public DagActionStoreChangeMonitor get() {
     DagActionStoreChangeMonitor changeMonitor = createDagActionStoreMonitor();
-    changeMonitor.initializeMonitor();
     return changeMonitor;
   }
 }
diff --git 
a/gobblin-service/src/main/java/org/apache/gobblin/service/monitoring/DagManagementDagActionStoreChangeMonitorFactory.java
 
b/gobblin-service/src/main/java/org/apache/gobblin/service/monitoring/DagManagementDagActionStoreChangeMonitorFactory.java
index af5146027..53b1f6836 100644
--- 
a/gobblin-service/src/main/java/org/apache/gobblin/service/monitoring/DagManagementDagActionStoreChangeMonitorFactory.java
+++ 
b/gobblin-service/src/main/java/org/apache/gobblin/service/monitoring/DagManagementDagActionStoreChangeMonitorFactory.java
@@ -74,7 +74,6 @@ public class DagManagementDagActionStoreChangeMonitorFactory 
implements Provider
   @Override
   public DagActionStoreChangeMonitor get() {
     DagActionStoreChangeMonitor changeMonitor = createDagActionStoreMonitor();
-    changeMonitor.initializeMonitor();
     return changeMonitor;
   }
 }
diff --git 
a/gobblin-service/src/main/java/org/apache/gobblin/service/monitoring/SpecStoreChangeMonitor.java
 
b/gobblin-service/src/main/java/org/apache/gobblin/service/monitoring/SpecStoreChangeMonitor.java
index 8b197a352..86cb911d1 100644
--- 
a/gobblin-service/src/main/java/org/apache/gobblin/service/monitoring/SpecStoreChangeMonitor.java
+++ 
b/gobblin-service/src/main/java/org/apache/gobblin/service/monitoring/SpecStoreChangeMonitor.java
@@ -78,6 +78,7 @@ public class SpecStoreChangeMonitor extends HighLevelConsumer 
{
   protected FlowCatalog flowCatalog;
 
   protected GobblinServiceJobScheduler scheduler;
+  private volatile boolean isActive;
 
   // Note that the topic is an empty string (rather than null to avoid NPE) 
because this monitor relies on the consumer
   // client itself to determine all Kafka related information dynamically 
rather than through the config.
@@ -97,6 +98,28 @@ public class SpecStoreChangeMonitor extends 
HighLevelConsumer {
     return;
   }
 
+  /*
+ Override this method to do nothing, instead only start processing the queues 
after #setActive is called to make sure
+ dependent services are initialized properly.
+*/
+  @Override
+  protected void startUp() {}
+
+  /*
+   This method should be called once by the {@link GobblinServiceManager} only 
after the Scheduler is active to ensure
+   calls to onAddSpec don't fail specCompilation.
+   */
+  public synchronized void setActive() {
+    if (this.isActive) {
+      return;
+    }
+
+    if (isActive) {
+      this.isActive = true;
+      super.startUp();
+    }
+  }
+
   @Override
   /*
   Note that although this class is multi-threaded and will call this message 
for multiple threads (each having a queue

Reply via email to