umustafi commented on code in PR #3841:
URL: https://github.com/apache/gobblin/pull/3841#discussion_r1416401050


##########
gobblin-service/src/main/java/org/apache/gobblin/service/monitoring/DagActionStoreChangeMonitor.java:
##########
@@ -85,19 +89,24 @@ public String load(String key) throws Exception {
   @Getter
   @VisibleForTesting
   protected FlowCatalog flowCatalog;
+  protected DagActionStore dagActionStore;
 
   // Note that the topic is an empty string (rather than null to avoid NPE) 
because this monitor relies on the consumer
   // client itself to determine all Kafka related information dynamically 
rather than through the config.
   public DagActionStoreChangeMonitor(String topic, Config config, DagManager 
dagManager, int numThreads,
-      FlowCatalog flowCatalog, Orchestrator orchestrator, boolean 
isMultiActiveSchedulerEnabled) {
+      FlowCatalog flowCatalog, Orchestrator orchestrator, DagActionStore 
dagActionStore,
+      boolean isMultiActiveSchedulerEnabled) {
     // Differentiate group id for each host
     super(topic, config.withValue(GROUP_ID_KEY,
         ConfigValueFactory.fromAnyRef(DAG_ACTION_CHANGE_MONITOR_PREFIX + 
UUID.randomUUID().toString())),
         numThreads);
     this.dagManager = dagManager;
     this.flowCatalog = flowCatalog;
     this.orchestrator = orchestrator;
+    this.dagActionStore = dagActionStore;
     this.isMultiActiveSchedulerEnabled = isMultiActiveSchedulerEnabled;
+
+    initializeMonitor();

Review Comment:
   Good pt, I didn't intend to block on processMessage calls while the 
initializing is done. Moved. 



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: [email protected]

For queries about this service, please contact Infrastructure at:
[email protected]

Reply via email to