This is an automated email from the ASF dual-hosted git repository.

vivekrai pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/gobblin.git


The following commit(s) were added to refs/heads/master by this push:
     new 756c5801a9 [GOBBLIN-2227] Make dag action and spec store monitor 
initialization config driven (#4141)
756c5801a9 is described below

commit 756c5801a902f96547d40ce4166b5eda2576a815
Author: Vivek Rai <[email protected]>
AuthorDate: Fri Sep 19 11:55:18 2025 +0530

    [GOBBLIN-2227] Make dag action and spec store monitor initialization config 
driven (#4141)
    
    * make consumer initialization config driven
    
    * add header lines
    
    * added java docs
---
 .../modules/core/GobblinServiceGuiceModule.java    | 12 ++++----
 .../modules/core/GobblinServiceManager.java        | 12 ++++----
 .../service/monitoring/DagActionChangeMonitor.java | 33 ++++++++++++++++++++++
 .../monitoring/DagActionStoreChangeMonitor.java    |  3 +-
 ...nagementDagActionStoreChangeMonitorFactory.java | 28 ++++++++++++------
 .../service/monitoring/JobStatusMonitor.java       | 27 ++++++++++++++++++
 .../service/monitoring/KafkaJobStatusMonitor.java  |  2 +-
 .../monitoring/KafkaJobStatusMonitorFactory.java   | 11 ++++----
 .../service/monitoring/SpecChangeMonitor.java      | 33 ++++++++++++++++++++++
 .../service/monitoring/SpecStoreChangeMonitor.java |  3 +-
 .../monitoring/SpecStoreChangeMonitorFactory.java  | 20 ++++++++-----
 11 files changed, 148 insertions(+), 36 deletions(-)

diff --git 
a/gobblin-service/src/main/java/org/apache/gobblin/service/modules/core/GobblinServiceGuiceModule.java
 
b/gobblin-service/src/main/java/org/apache/gobblin/service/modules/core/GobblinServiceGuiceModule.java
index b78db5b1f7..9e2ba6f79e 100644
--- 
a/gobblin-service/src/main/java/org/apache/gobblin/service/modules/core/GobblinServiceGuiceModule.java
+++ 
b/gobblin-service/src/main/java/org/apache/gobblin/service/modules/core/GobblinServiceGuiceModule.java
@@ -82,15 +82,15 @@ import 
org.apache.gobblin.service.modules.topology.TopologySpecFactory;
 import 
org.apache.gobblin.service.modules.troubleshooter.MySqlMultiContextIssueRepository;
 import 
org.apache.gobblin.service.modules.utils.FlowCompilationValidationHelper;
 import org.apache.gobblin.service.modules.utils.SharedFlowMetricsSingleton;
-import 
org.apache.gobblin.service.monitoring.DagManagementDagActionStoreChangeMonitor;
+import org.apache.gobblin.service.monitoring.DagActionChangeMonitor;
 import 
org.apache.gobblin.service.monitoring.DagManagementDagActionStoreChangeMonitorFactory;
 import org.apache.gobblin.service.monitoring.FlowStatusGenerator;
 import org.apache.gobblin.service.monitoring.FsJobStatusRetriever;
 import org.apache.gobblin.service.monitoring.GitConfigMonitor;
+import org.apache.gobblin.service.monitoring.JobStatusMonitor;
 import org.apache.gobblin.service.monitoring.JobStatusRetriever;
-import org.apache.gobblin.service.monitoring.KafkaJobStatusMonitor;
 import org.apache.gobblin.service.monitoring.KafkaJobStatusMonitorFactory;
-import org.apache.gobblin.service.monitoring.SpecStoreChangeMonitor;
+import org.apache.gobblin.service.monitoring.SpecChangeMonitor;
 import org.apache.gobblin.service.monitoring.SpecStoreChangeMonitorFactory;
 import org.apache.gobblin.util.ClassAliasResolver;
 import org.apache.gobblin.util.ConfigUtils;
@@ -159,7 +159,7 @@ public class GobblinServiceGuiceModule implements Module {
 
     binder.bind(DagActionReminderScheduler.class);
     binder.bind(DagActionStore.class).to(MysqlDagActionStore.class);
-    binder.bind(DagManagementDagActionStoreChangeMonitor.class).toProvider(
+    binder.bind(DagActionChangeMonitor.class).toProvider(
         
DagManagementDagActionStoreChangeMonitorFactory.class).in(Singleton.class);
     binder.bind(DagManagement.class).to(DagManagementTaskStreamImpl.class);
     
binder.bind(DagManagementStateStore.class).to(MySqlDagManagementStateStore.class);
@@ -169,7 +169,7 @@ public class GobblinServiceGuiceModule implements Module {
     binder.bind(DagProcessingEngineMetrics.class);
     binder.bind(FlowLaunchHandler.class);
     
binder.bind(MultiActiveLeaseArbiter.class).toProvider(DagActionProcessingMultiActiveLeaseArbiterFactory.class);
-    
binder.bind(SpecStoreChangeMonitor.class).toProvider(SpecStoreChangeMonitorFactory.class).in(Singleton.class);
+    
binder.bind(SpecChangeMonitor.class).toProvider(SpecStoreChangeMonitorFactory.class).in(Singleton.class);
 
     binder.bind(RequesterService.class).to(NoopRequesterService.class);
     binder.bind(SharedFlowMetricsSingleton.class);
@@ -188,7 +188,7 @@ public class GobblinServiceGuiceModule implements Module {
     }
 
     if (serviceConfig.isJobStatusMonitorEnabled()) {
-      
binder.bind(KafkaJobStatusMonitor.class).toProvider(KafkaJobStatusMonitorFactory.class).in(Singleton.class);
+      
binder.bind(JobStatusMonitor.class).toProvider(KafkaJobStatusMonitorFactory.class).in(Singleton.class);
       binder.bind(ErrorClassifier.class);
       binder.bind(ErrorPatternStore.class)
           .to(getClassByNameOrAlias(ErrorPatternStore.class, 
serviceConfig.getInnerConfig(),
diff --git 
a/gobblin-service/src/main/java/org/apache/gobblin/service/modules/core/GobblinServiceManager.java
 
b/gobblin-service/src/main/java/org/apache/gobblin/service/modules/core/GobblinServiceManager.java
index d36afdeeec..9bab575350 100644
--- 
a/gobblin-service/src/main/java/org/apache/gobblin/service/modules/core/GobblinServiceManager.java
+++ 
b/gobblin-service/src/main/java/org/apache/gobblin/service/modules/core/GobblinServiceManager.java
@@ -77,11 +77,11 @@ import 
org.apache.gobblin.service.modules.restli.FlowConfigsV2ResourceHandler;
 import org.apache.gobblin.service.modules.restli.FlowExecutionResourceHandler;
 import org.apache.gobblin.service.modules.scheduler.GobblinServiceJobScheduler;
 import org.apache.gobblin.service.modules.topology.TopologySpecFactory;
-import 
org.apache.gobblin.service.monitoring.DagManagementDagActionStoreChangeMonitor;
+import org.apache.gobblin.service.monitoring.DagActionChangeMonitor;
 import org.apache.gobblin.service.monitoring.FlowStatusGenerator;
 import org.apache.gobblin.service.monitoring.GitConfigMonitor;
-import org.apache.gobblin.service.monitoring.KafkaJobStatusMonitor;
-import org.apache.gobblin.service.monitoring.SpecStoreChangeMonitor;
+import org.apache.gobblin.service.monitoring.JobStatusMonitor;
+import org.apache.gobblin.service.monitoring.SpecChangeMonitor;
 import org.apache.gobblin.util.ConfigUtils;
 
 
@@ -150,7 +150,7 @@ public class GobblinServiceManager implements 
ApplicationLauncher {
   protected GitConfigMonitor gitConfigMonitor;
 
   @Inject(optional = true)
-  protected KafkaJobStatusMonitor jobStatusMonitor;
+  protected JobStatusMonitor jobStatusMonitor;
 
   @Inject
   protected MultiContextIssueRepository issueRepository;
@@ -166,10 +166,10 @@ public class GobblinServiceManager implements 
ApplicationLauncher {
   protected D2Announcer d2Announcer;
 
   @Inject(optional = true)
-  protected SpecStoreChangeMonitor specStoreChangeMonitor;
+  protected SpecChangeMonitor specStoreChangeMonitor;
 
   @Inject(optional = true)
-  protected DagManagementDagActionStoreChangeMonitor 
dagManagementDagActionStoreChangeMonitor;
+  protected DagActionChangeMonitor dagManagementDagActionStoreChangeMonitor;
 
   @Inject(optional = true)
   protected DagProcessingEngine dagProcessingEngine;
diff --git 
a/gobblin-service/src/main/java/org/apache/gobblin/service/monitoring/DagActionChangeMonitor.java
 
b/gobblin-service/src/main/java/org/apache/gobblin/service/monitoring/DagActionChangeMonitor.java
new file mode 100644
index 0000000000..fd5d216844
--- /dev/null
+++ 
b/gobblin-service/src/main/java/org/apache/gobblin/service/monitoring/DagActionChangeMonitor.java
@@ -0,0 +1,33 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.gobblin.service.monitoring;
+
+import com.google.common.util.concurrent.Service;
+
+
+/**
+ * A marker interface for dag action change monitors to generalize 
initialization in {@link 
org.apache.gobblin.service.modules.core.GobblinServiceManager#dagManagementDagActionStoreChangeMonitor}
+ */
+public interface DagActionChangeMonitor extends Service {
+
+  /**
+   * Set the monitor to active state where it can start processing events.
+   * Should be called from {@link 
org.apache.gobblin.service.modules.core.GobblinServiceManager} after the 
service is started.
+   */
+  void setActive();
+}
diff --git 
a/gobblin-service/src/main/java/org/apache/gobblin/service/monitoring/DagActionStoreChangeMonitor.java
 
b/gobblin-service/src/main/java/org/apache/gobblin/service/monitoring/DagActionStoreChangeMonitor.java
index 5b35b0bf7d..9a7a6c80e4 100644
--- 
a/gobblin-service/src/main/java/org/apache/gobblin/service/monitoring/DagActionStoreChangeMonitor.java
+++ 
b/gobblin-service/src/main/java/org/apache/gobblin/service/monitoring/DagActionStoreChangeMonitor.java
@@ -59,7 +59,7 @@ import org.apache.gobblin.util.ExecutorsUtils;
  * connector between the API and execution layers of GaaS.
  */
 @Slf4j
-public abstract class DagActionStoreChangeMonitor extends 
HighLevelConsumer<String, DagActionStoreChangeEvent> {
+public abstract class DagActionStoreChangeMonitor extends 
HighLevelConsumer<String, DagActionStoreChangeEvent> implements 
DagActionChangeMonitor {
   public static final String DAG_ACTION_CHANGE_MONITOR_PREFIX = 
"dagActionChangeStore";
 
   // Metrics
@@ -166,6 +166,7 @@ public abstract class DagActionStoreChangeMonitor extends 
HighLevelConsumer<Stri
    This method should be called once by the {@link GobblinServiceManager} only 
after the FlowGraph and
    SpecCompiler are initialized and running.
    */
+  @Override
   public synchronized void setActive() {
     if (this.isActive) {
       return;
diff --git 
a/gobblin-service/src/main/java/org/apache/gobblin/service/monitoring/DagManagementDagActionStoreChangeMonitorFactory.java
 
b/gobblin-service/src/main/java/org/apache/gobblin/service/monitoring/DagManagementDagActionStoreChangeMonitorFactory.java
index 0c79fc437a..ec4c62f77d 100644
--- 
a/gobblin-service/src/main/java/org/apache/gobblin/service/monitoring/DagManagementDagActionStoreChangeMonitorFactory.java
+++ 
b/gobblin-service/src/main/java/org/apache/gobblin/service/monitoring/DagManagementDagActionStoreChangeMonitorFactory.java
@@ -30,14 +30,17 @@ import 
org.apache.gobblin.service.modules.orchestration.DagManagement;
 import 
org.apache.gobblin.service.modules.orchestration.DagManagementStateStore;
 import 
org.apache.gobblin.service.modules.orchestration.task.DagProcessingEngineMetrics;
 import org.apache.gobblin.util.ConfigUtils;
+import org.apache.gobblin.util.reflection.GobblinConstructorUtils;
 
 
 /**
- * A factory implementation that returns a {@link 
DagManagementDagActionStoreChangeMonitor} instance.
+ * A factory implementation that returns a {@link DagActionChangeMonitor} 
instance.
  */
 @Slf4j
-public class DagManagementDagActionStoreChangeMonitorFactory implements 
Provider<DagManagementDagActionStoreChangeMonitor> {
-  static final String DAG_ACTION_STORE_CHANGE_MONITOR_NUM_THREADS_KEY = 
"numThreads";
+public class DagManagementDagActionStoreChangeMonitorFactory implements 
Provider<DagActionChangeMonitor> {
+  private static final String DAG_ACTION_STORE_CHANGE_MONITOR_CLASS_KEY = 
"class";
+  private static final String DEFAULT_DAG_ACTION_STORE_CHANGE_MONITOR_CLASS = 
DagManagementDagActionStoreChangeMonitor.class.getName();
+  private static final String DAG_ACTION_STORE_CHANGE_MONITOR_NUM_THREADS_KEY 
= "numThreads";
 
   private final Config config;
   private final DagManagementStateStore dagManagementStateStore;
@@ -55,18 +58,25 @@ public class 
DagManagementDagActionStoreChangeMonitorFactory implements Provider
     this.dagProcEngineMetrics = dagProcEngineMetrics;
   }
 
-  private DagManagementDagActionStoreChangeMonitor 
createDagActionStoreMonitor() {
+  private DagActionChangeMonitor createDagActionStoreMonitor() throws 
ReflectiveOperationException {
     Config dagActionStoreChangeConfig = 
this.config.getConfig(DagActionStoreChangeMonitor.DAG_ACTION_CHANGE_MONITOR_PREFIX);
-    log.info("DagActionStore will be initialized with config {}", 
dagActionStoreChangeConfig);
+    Class<?> dagActionStoreChangeMonitorClass = Class.forName(
+        ConfigUtils.getString(dagActionStoreChangeConfig, 
DAG_ACTION_STORE_CHANGE_MONITOR_CLASS_KEY, 
DEFAULT_DAG_ACTION_STORE_CHANGE_MONITOR_CLASS));
+    log.info("DagActionStore `{}` will be initialized with config {}", 
dagActionStoreChangeMonitorClass, dagActionStoreChangeConfig);
 
     int numThreads = ConfigUtils.getInt(dagActionStoreChangeConfig, 
DAG_ACTION_STORE_CHANGE_MONITOR_NUM_THREADS_KEY, 5);
 
-    return new 
DagManagementDagActionStoreChangeMonitor(dagActionStoreChangeConfig, 
numThreads, dagManagementStateStore,
-        this.dagManagement, this.dagActionReminderScheduler, 
dagProcEngineMetrics);
+    return (DagActionChangeMonitor) 
GobblinConstructorUtils.invokeLongestConstructor(dagActionStoreChangeMonitorClass,
+        dagActionStoreChangeConfig, numThreads, dagManagementStateStore, 
this.dagManagement,
+        this.dagActionReminderScheduler, dagProcEngineMetrics);
   }
 
   @Override
-  public DagManagementDagActionStoreChangeMonitor get() {
-    return createDagActionStoreMonitor();
+  public DagActionChangeMonitor get() {
+    try {
+      return createDagActionStoreMonitor();
+    } catch (ReflectiveOperationException e) {
+      throw new RuntimeException("Failed to initialize 
DagActionStoreChangeMonitor due to ", e);
+    }
   }
 }
diff --git 
a/gobblin-service/src/main/java/org/apache/gobblin/service/monitoring/JobStatusMonitor.java
 
b/gobblin-service/src/main/java/org/apache/gobblin/service/monitoring/JobStatusMonitor.java
new file mode 100644
index 0000000000..1ca6016daa
--- /dev/null
+++ 
b/gobblin-service/src/main/java/org/apache/gobblin/service/monitoring/JobStatusMonitor.java
@@ -0,0 +1,27 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.gobblin.service.monitoring;
+
+import com.google.common.util.concurrent.Service;
+
+
+/**
+ * A marker interface for job status monitors to generalize initialization in 
{@link 
org.apache.gobblin.service.modules.core.GobblinServiceManager#jobStatusMonitor}
+ */
+public interface JobStatusMonitor extends Service {
+}
diff --git 
a/gobblin-service/src/main/java/org/apache/gobblin/service/monitoring/KafkaJobStatusMonitor.java
 
b/gobblin-service/src/main/java/org/apache/gobblin/service/monitoring/KafkaJobStatusMonitor.java
index be46b844a4..f7b202874d 100644
--- 
a/gobblin-service/src/main/java/org/apache/gobblin/service/monitoring/KafkaJobStatusMonitor.java
+++ 
b/gobblin-service/src/main/java/org/apache/gobblin/service/monitoring/KafkaJobStatusMonitor.java
@@ -89,7 +89,7 @@ import static 
org.apache.gobblin.util.retry.RetryerFactory.RetryType;
  * a {@link FileContextBasedFsStateStore}.
  */
 @Slf4j
-public abstract class KafkaJobStatusMonitor extends HighLevelConsumer<byte[], 
byte[]> {
+public abstract class KafkaJobStatusMonitor extends HighLevelConsumer<byte[], 
byte[]> implements JobStatusMonitor {
   public static final String JOB_STATUS_MONITOR_PREFIX = "jobStatusMonitor";
   //We use table suffix that is different from the Gobblin job state store 
suffix of jst to avoid confusion.
   //gst refers to the state store suffix for GaaS-orchestrated Gobblin jobs.
diff --git 
a/gobblin-service/src/main/java/org/apache/gobblin/service/monitoring/KafkaJobStatusMonitorFactory.java
 
b/gobblin-service/src/main/java/org/apache/gobblin/service/monitoring/KafkaJobStatusMonitorFactory.java
index 86ceaecfc7..23b46a7745 100644
--- 
a/gobblin-service/src/main/java/org/apache/gobblin/service/monitoring/KafkaJobStatusMonitorFactory.java
+++ 
b/gobblin-service/src/main/java/org/apache/gobblin/service/monitoring/KafkaJobStatusMonitorFactory.java
@@ -43,7 +43,7 @@ import 
org.apache.gobblin.util.reflection.GobblinConstructorUtils;
  * A factory implementation that returns a {@link KafkaJobStatusMonitor} 
instance.
  */
 @Slf4j
-public class KafkaJobStatusMonitorFactory implements 
Provider<KafkaJobStatusMonitor> {
+public class KafkaJobStatusMonitorFactory implements 
Provider<JobStatusMonitor> {
   private static final String KAFKA_SSL_CONFIG_PREFIX_KEY = 
"jobStatusMonitor.kafka.config";
   private static final String DEFAULT_KAFKA_SSL_CONFIG_PREFIX = 
"metrics.reporting.kafka.config";
 
@@ -65,7 +65,7 @@ public class KafkaJobStatusMonitorFactory implements 
Provider<KafkaJobStatusMoni
     this.errorClassifier = errorClassifier;
   }
 
-  private KafkaJobStatusMonitor createJobStatusMonitor()
+  private JobStatusMonitor createJobStatusMonitor()
       throws ReflectiveOperationException {
     Config jobStatusConfig = 
config.getConfig(KafkaJobStatusMonitor.JOB_STATUS_MONITOR_PREFIX);
 
@@ -94,18 +94,19 @@ public class KafkaJobStatusMonitorFactory implements 
Provider<KafkaJobStatusMoni
         
GaaSJobObservabilityEventProducer.DEFAULT_GAAS_OBSERVABILITY_EVENT_PRODUCER_CLASS));
     GaaSJobObservabilityEventProducer observabilityEventProducer = 
(GaaSJobObservabilityEventProducer) 
GobblinConstructorUtils.invokeLongestConstructor(
         observabilityEventProducerClassName, 
ConfigUtils.configToState(config), this.issueRepository, 
this.instrumentationEnabled);
+    log.info("JobStatusMonitor class `{}` will be initialized with config {}", 
jobStatusMonitorClass, jobStatusConfig);
 
-    return (KafkaJobStatusMonitor) GobblinConstructorUtils
+    return (JobStatusMonitor) GobblinConstructorUtils
         .invokeLongestConstructor(jobStatusMonitorClass, topic, 
jobStatusConfig, numThreads, jobIssueEventHandler, observabilityEventProducer,
             dagManagementStateStore, errorClassifier);
   }
 
   @Override
-  public KafkaJobStatusMonitor get() {
+  public JobStatusMonitor get() {
     try {
       return createJobStatusMonitor();
     } catch (ReflectiveOperationException e) {
-      throw new RuntimeException(e);
+      throw new RuntimeException("Failed to initialize JobStatusMonitor due to 
", e);
     }
   }
 }
\ No newline at end of file
diff --git 
a/gobblin-service/src/main/java/org/apache/gobblin/service/monitoring/SpecChangeMonitor.java
 
b/gobblin-service/src/main/java/org/apache/gobblin/service/monitoring/SpecChangeMonitor.java
new file mode 100644
index 0000000000..feefe6a8c6
--- /dev/null
+++ 
b/gobblin-service/src/main/java/org/apache/gobblin/service/monitoring/SpecChangeMonitor.java
@@ -0,0 +1,33 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.gobblin.service.monitoring;
+
+import com.google.common.util.concurrent.Service;
+
+
+/**
+ * A marker interface for spec change monitors to generalize initialization in 
{@link 
org.apache.gobblin.service.modules.core.GobblinServiceManager#specStoreChangeMonitor}
+ */
+public interface SpecChangeMonitor extends Service {
+
+  /**
+   * Set the monitor to active state where it can start processing events.
+   * Should be called from {@link 
org.apache.gobblin.service.modules.core.GobblinServiceManager} after the 
service is started.
+   */
+  void setActive();
+}
diff --git 
a/gobblin-service/src/main/java/org/apache/gobblin/service/monitoring/SpecStoreChangeMonitor.java
 
b/gobblin-service/src/main/java/org/apache/gobblin/service/monitoring/SpecStoreChangeMonitor.java
index 5c183ea326..c15d43d3e8 100644
--- 
a/gobblin-service/src/main/java/org/apache/gobblin/service/monitoring/SpecStoreChangeMonitor.java
+++ 
b/gobblin-service/src/main/java/org/apache/gobblin/service/monitoring/SpecStoreChangeMonitor.java
@@ -51,7 +51,7 @@ import 
org.apache.gobblin.service.modules.scheduler.GobblinServiceJobScheduler;
  * a connector between the API and execution layers of GaaS.
  */
 @Slf4j
-public class SpecStoreChangeMonitor extends HighLevelConsumer<String, 
GenericStoreChangeEvent> {
+public class SpecStoreChangeMonitor extends HighLevelConsumer<String, 
GenericStoreChangeEvent> implements SpecChangeMonitor {
   public static final String SPEC_STORE_CHANGE_MONITOR_PREFIX = 
"specStoreChangeMonitor";
 
   // Metrics
@@ -108,6 +108,7 @@ public class SpecStoreChangeMonitor extends 
HighLevelConsumer<String, GenericSto
    This method should be called once by the {@link GobblinServiceManager} only 
after the Scheduler is active to ensure
    calls to onAddSpec don't fail specCompilation.
    */
+  @Override
   public synchronized void setActive() {
     if (this.isActive) {
       return;
diff --git 
a/gobblin-service/src/main/java/org/apache/gobblin/service/monitoring/SpecStoreChangeMonitorFactory.java
 
b/gobblin-service/src/main/java/org/apache/gobblin/service/monitoring/SpecStoreChangeMonitorFactory.java
index 4d27713688..025b3c2996 100644
--- 
a/gobblin-service/src/main/java/org/apache/gobblin/service/monitoring/SpecStoreChangeMonitorFactory.java
+++ 
b/gobblin-service/src/main/java/org/apache/gobblin/service/monitoring/SpecStoreChangeMonitorFactory.java
@@ -28,14 +28,17 @@ import lombok.extern.slf4j.Slf4j;
 import org.apache.gobblin.runtime.spec_catalog.FlowCatalog;
 import org.apache.gobblin.service.modules.scheduler.GobblinServiceJobScheduler;
 import org.apache.gobblin.util.ConfigUtils;
+import org.apache.gobblin.util.reflection.GobblinConstructorUtils;
 
 
 /**
- * A factory implementation that returns a {@link SpecStoreChangeMonitor} 
instance.
+ * A factory implementation that returns a {@link SpecChangeMonitor} instance.
  */
 @Slf4j
-public class SpecStoreChangeMonitorFactory implements 
Provider<SpecStoreChangeMonitor> {
-  static final String SPEC_STORE_CHANGE_MONITOR_NUM_THREADS_KEY = "numThreads";
+public class SpecStoreChangeMonitorFactory implements 
Provider<SpecChangeMonitor> {
+  private static final String SPEC_STORE_CHANGE_MONITOR_CLASS_KEY = "class";
+  private static final String DEFAULT_SPEC_STORE_CHANGE_MONITOR_CLASS = 
SpecStoreChangeMonitor.class.getName();
+  private static final String SPEC_STORE_CHANGE_MONITOR_NUM_THREADS_KEY = 
"numThreads";
 
   private final Config config;
   private FlowCatalog flowCatalog;
@@ -48,19 +51,22 @@ public class SpecStoreChangeMonitorFactory implements 
Provider<SpecStoreChangeMo
     this.scheduler = scheduler;
   }
 
-  private SpecStoreChangeMonitor createSpecStoreChangeMonitor()
+  private SpecChangeMonitor createSpecStoreChangeMonitor()
       throws ReflectiveOperationException {
     Config specStoreChangeConfig = 
this.config.getConfig(SpecStoreChangeMonitor.SPEC_STORE_CHANGE_MONITOR_PREFIX);
-    log.info("SpecStoreChangeMonitor will be initialized with config {}", 
specStoreChangeConfig);
+    Class<?> specStoreChangeMonitorClass = Class.forName(
+        ConfigUtils.getString(specStoreChangeConfig, 
SPEC_STORE_CHANGE_MONITOR_CLASS_KEY, DEFAULT_SPEC_STORE_CHANGE_MONITOR_CLASS));
+    log.info("SpecStoreChangeMonitor class `{}` will be initialized with 
config {}", specStoreChangeMonitorClass, specStoreChangeConfig);
 
     String topic = ""; // Pass empty string because we expect underlying 
client to dynamically determine the Kafka topic
     int numThreads = ConfigUtils.getInt(specStoreChangeConfig, 
SPEC_STORE_CHANGE_MONITOR_NUM_THREADS_KEY, 5);
 
-    return new SpecStoreChangeMonitor(topic, specStoreChangeConfig, 
this.flowCatalog, this.scheduler, numThreads);
+    return (SpecChangeMonitor) 
GobblinConstructorUtils.invokeLongestConstructor(specStoreChangeMonitorClass, 
topic,
+        specStoreChangeConfig, this.flowCatalog, this.scheduler, numThreads);
   }
 
   @Override
-  public SpecStoreChangeMonitor get() {
+  public SpecChangeMonitor get() {
     try {
       return createSpecStoreChangeMonitor();
     } catch (ReflectiveOperationException e) {

Reply via email to