This is an automated email from the ASF dual-hosted git repository.
wlo pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/gobblin.git
The following commit(s) were added to refs/heads/master by this push:
new 1ca494435 [GOBBLIN-2037] Start DagActionMonitor functionality after
its dependencies (#3916)
1ca494435 is described below
commit 1ca4944359fae3a7348a56427a2c38555811e977
Author: umustafi <[email protected]>
AuthorDate: Fri Apr 5 15:11:52 2024 -0700
[GOBBLIN-2037] Start DagActionMonitor functionality after its dependencies
(#3916)
* Start DagActionMonitor functionality after its dependencies
* Startup specStoreChangeMonitor later as well
---------
Co-authored-by: Urmi Mustafi <[email protected]>
---
.../modules/core/GobblinServiceManager.java | 21 +++++-------
.../monitoring/DagActionStoreChangeMonitor.java | 40 +++++++++++++++-------
.../DagActionStoreChangeMonitorFactory.java | 1 -
...nagementDagActionStoreChangeMonitorFactory.java | 1 -
.../service/monitoring/SpecStoreChangeMonitor.java | 23 +++++++++++++
5 files changed, 59 insertions(+), 27 deletions(-)
diff --git
a/gobblin-service/src/main/java/org/apache/gobblin/service/modules/core/GobblinServiceManager.java
b/gobblin-service/src/main/java/org/apache/gobblin/service/modules/core/GobblinServiceManager.java
index a5e8f102e..02b969a53 100644
---
a/gobblin-service/src/main/java/org/apache/gobblin/service/modules/core/GobblinServiceManager.java
+++
b/gobblin-service/src/main/java/org/apache/gobblin/service/modules/core/GobblinServiceManager.java
@@ -271,7 +271,8 @@ public class GobblinServiceManager implements
ApplicationLauncher, StandardMetri
throw new RuntimeException(String.format("getClass called to obtain %s
without calling create method to "
+ "initialize GobblinServiceGuiceModule.", classToGet));
}
- Injector injector = Guice.createInjector(Stage.PRODUCTION,
GOBBLIN_SERVICE_GUICE_MODULE);
+ // Use development stage to enable more verbose error messages and runtime
checks
+ Injector injector = Guice.createInjector(Stage.DEVELOPMENT,
GOBBLIN_SERVICE_GUICE_MODULE);
return injector.getInstance(classToGet);
}
@@ -450,18 +451,6 @@ public class GobblinServiceManager implements
ApplicationLauncher, StandardMetri
this.eventBus.register(this);
this.serviceLauncher.start();
- // Wait until spec consumer service is running to set scheduler to active
- if (this.configuration.isWarmStandbyEnabled()) {
- while (!this.specStoreChangeMonitor.isRunning()) {
- try {
- LOGGER.info("Waiting for SpecStoreChangeMonitor to be started...");
- Thread.sleep(10);
- } catch (InterruptedException e) {
- LOGGER.warn("Interrupted while waiting for SpecStoreChangeMonitor to
be started");
- }
- }
- }
-
if (this.helixManager.isPresent()) {
// Subscribe to leadership changes
this.helixManager.get().addControllerListener((ControllerChangeListener)
this::handleLeadershipChange);
@@ -537,6 +526,12 @@ public class GobblinServiceManager implements
ApplicationLauncher, StandardMetri
this.dagManager.setActive(true);
this.eventBus.register(this.dagManager);
}
+
+ // Activate both monitors last as they're dependent on the SpecCompiler,
Scheduler, and DagManager being active
+ if (configuration.isWarmStandbyEnabled()) {
+ this.specStoreChangeMonitor.setActive();
+ this.dagActionStoreChangeMonitor.setActive();
+ }
}
@Override
diff --git
a/gobblin-service/src/main/java/org/apache/gobblin/service/monitoring/DagActionStoreChangeMonitor.java
b/gobblin-service/src/main/java/org/apache/gobblin/service/monitoring/DagActionStoreChangeMonitor.java
index 40bbb06bf..c9258c013 100644
---
a/gobblin-service/src/main/java/org/apache/gobblin/service/monitoring/DagActionStoreChangeMonitor.java
+++
b/gobblin-service/src/main/java/org/apache/gobblin/service/monitoring/DagActionStoreChangeMonitor.java
@@ -99,6 +99,8 @@ public class DagActionStoreChangeMonitor extends
HighLevelConsumer {
@VisibleForTesting
protected FlowCatalog flowCatalog;
protected DagActionStore dagActionStore;
+ @Getter
+ private volatile boolean isActive;
// Note that the topic is an empty string (rather than null to avoid NPE)
because this monitor relies on the consumer
// client itself to determine all Kafka related information dynamically
rather than through the config.
@@ -147,20 +149,34 @@ public class DagActionStoreChangeMonitor extends
HighLevelConsumer {
}
/*
- Override this method to do the same sequence as the parent class, except
create metrics. Instead, we create metrics
- earlier upon class initialization because they are used immediately as dag
actions are loaded and processed from
- the DagActionStore.
+ Override this method to do nothing, instead we create metrics upon class
initialization and start processing the
+ queues and load dag actions from the DagActionStore after #setActive is
called to make sure dependent services are
+ initialized properly.
*/
@Override
- protected void startUp() {
- // Method that starts threads that processes queues
- processQueues();
- // Main thread that constantly polls messages from kafka
- consumerExecutor.execute(() -> {
- while (!shutdownRequested) {
- consume();
- }
- });
+ protected void startUp() {}
+
+ /*
+ This method should be called once by the {@link GobblinServiceManager} only
after the DagManager, FlowGraph and
+ SpecCompiler are initialized and running.
+ */
+ public synchronized void setActive() {
+ if (this.isActive) {
+ return;
+ }
+
+ if (isActive) {
+ this.isActive = true;
+ initializeMonitor();
+ // Method that starts threads that processes queues
+ processQueues();
+ // Main thread that constantly polls messages from kafka
+ consumerExecutor.execute(() -> {
+ while (!shutdownRequested) {
+ consume();
+ }
+ });
+ }
}
@Override
diff --git
a/gobblin-service/src/main/java/org/apache/gobblin/service/monitoring/DagActionStoreChangeMonitorFactory.java
b/gobblin-service/src/main/java/org/apache/gobblin/service/monitoring/DagActionStoreChangeMonitorFactory.java
index 6277360e9..767f2b76d 100644
---
a/gobblin-service/src/main/java/org/apache/gobblin/service/monitoring/DagActionStoreChangeMonitorFactory.java
+++
b/gobblin-service/src/main/java/org/apache/gobblin/service/monitoring/DagActionStoreChangeMonitorFactory.java
@@ -74,7 +74,6 @@ public class DagActionStoreChangeMonitorFactory implements
Provider<DagActionSto
@Override
public DagActionStoreChangeMonitor get() {
DagActionStoreChangeMonitor changeMonitor = createDagActionStoreMonitor();
- changeMonitor.initializeMonitor();
return changeMonitor;
}
}
diff --git
a/gobblin-service/src/main/java/org/apache/gobblin/service/monitoring/DagManagementDagActionStoreChangeMonitorFactory.java
b/gobblin-service/src/main/java/org/apache/gobblin/service/monitoring/DagManagementDagActionStoreChangeMonitorFactory.java
index af5146027..53b1f6836 100644
---
a/gobblin-service/src/main/java/org/apache/gobblin/service/monitoring/DagManagementDagActionStoreChangeMonitorFactory.java
+++
b/gobblin-service/src/main/java/org/apache/gobblin/service/monitoring/DagManagementDagActionStoreChangeMonitorFactory.java
@@ -74,7 +74,6 @@ public class DagManagementDagActionStoreChangeMonitorFactory
implements Provider
@Override
public DagActionStoreChangeMonitor get() {
DagActionStoreChangeMonitor changeMonitor = createDagActionStoreMonitor();
- changeMonitor.initializeMonitor();
return changeMonitor;
}
}
diff --git
a/gobblin-service/src/main/java/org/apache/gobblin/service/monitoring/SpecStoreChangeMonitor.java
b/gobblin-service/src/main/java/org/apache/gobblin/service/monitoring/SpecStoreChangeMonitor.java
index 8b197a352..86cb911d1 100644
---
a/gobblin-service/src/main/java/org/apache/gobblin/service/monitoring/SpecStoreChangeMonitor.java
+++
b/gobblin-service/src/main/java/org/apache/gobblin/service/monitoring/SpecStoreChangeMonitor.java
@@ -78,6 +78,7 @@ public class SpecStoreChangeMonitor extends HighLevelConsumer
{
protected FlowCatalog flowCatalog;
protected GobblinServiceJobScheduler scheduler;
+ private volatile boolean isActive;
// Note that the topic is an empty string (rather than null to avoid NPE)
because this monitor relies on the consumer
// client itself to determine all Kafka related information dynamically
rather than through the config.
@@ -97,6 +98,28 @@ public class SpecStoreChangeMonitor extends
HighLevelConsumer {
return;
}
+ /*
+ Override this method to do nothing, instead only start processing the queues
after #setActive is called to make sure
+ dependent services are initialized properly.
+*/
+ @Override
+ protected void startUp() {}
+
+ /*
+ This method should be called once by the {@link GobblinServiceManager} only
after the Scheduler is active to ensure
+ calls to onAddSpec don't fail specCompilation.
+ */
+ public synchronized void setActive() {
+ if (this.isActive) {
+ return;
+ }
+
+ if (isActive) {
+ this.isActive = true;
+ super.startUp();
+ }
+ }
+
@Override
/*
Note that although this class is multi-threaded and will call this message
for multiple threads (each having a queue