This is an automated email from the ASF dual-hosted git repository.
vivekrai pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/gobblin.git
The following commit(s) were added to refs/heads/master by this push:
new 756c5801a9 [GOBBLIN-2227] Make dag action and spec store monitor
initialization config driven (#4141)
756c5801a9 is described below
commit 756c5801a902f96547d40ce4166b5eda2576a815
Author: Vivek Rai <[email protected]>
AuthorDate: Fri Sep 19 11:55:18 2025 +0530
[GOBBLIN-2227] Make dag action and spec store monitor initialization config
driven (#4141)
* make consumer initialization config driven
* add header lines
* added java docs
---
.../modules/core/GobblinServiceGuiceModule.java | 12 ++++----
.../modules/core/GobblinServiceManager.java | 12 ++++----
.../service/monitoring/DagActionChangeMonitor.java | 33 ++++++++++++++++++++++
.../monitoring/DagActionStoreChangeMonitor.java | 3 +-
...nagementDagActionStoreChangeMonitorFactory.java | 28 ++++++++++++------
.../service/monitoring/JobStatusMonitor.java | 27 ++++++++++++++++++
.../service/monitoring/KafkaJobStatusMonitor.java | 2 +-
.../monitoring/KafkaJobStatusMonitorFactory.java | 11 ++++----
.../service/monitoring/SpecChangeMonitor.java | 33 ++++++++++++++++++++++
.../service/monitoring/SpecStoreChangeMonitor.java | 3 +-
.../monitoring/SpecStoreChangeMonitorFactory.java | 20 ++++++++-----
11 files changed, 148 insertions(+), 36 deletions(-)
diff --git
a/gobblin-service/src/main/java/org/apache/gobblin/service/modules/core/GobblinServiceGuiceModule.java
b/gobblin-service/src/main/java/org/apache/gobblin/service/modules/core/GobblinServiceGuiceModule.java
index b78db5b1f7..9e2ba6f79e 100644
---
a/gobblin-service/src/main/java/org/apache/gobblin/service/modules/core/GobblinServiceGuiceModule.java
+++
b/gobblin-service/src/main/java/org/apache/gobblin/service/modules/core/GobblinServiceGuiceModule.java
@@ -82,15 +82,15 @@ import
org.apache.gobblin.service.modules.topology.TopologySpecFactory;
import
org.apache.gobblin.service.modules.troubleshooter.MySqlMultiContextIssueRepository;
import
org.apache.gobblin.service.modules.utils.FlowCompilationValidationHelper;
import org.apache.gobblin.service.modules.utils.SharedFlowMetricsSingleton;
-import
org.apache.gobblin.service.monitoring.DagManagementDagActionStoreChangeMonitor;
+import org.apache.gobblin.service.monitoring.DagActionChangeMonitor;
import
org.apache.gobblin.service.monitoring.DagManagementDagActionStoreChangeMonitorFactory;
import org.apache.gobblin.service.monitoring.FlowStatusGenerator;
import org.apache.gobblin.service.monitoring.FsJobStatusRetriever;
import org.apache.gobblin.service.monitoring.GitConfigMonitor;
+import org.apache.gobblin.service.monitoring.JobStatusMonitor;
import org.apache.gobblin.service.monitoring.JobStatusRetriever;
-import org.apache.gobblin.service.monitoring.KafkaJobStatusMonitor;
import org.apache.gobblin.service.monitoring.KafkaJobStatusMonitorFactory;
-import org.apache.gobblin.service.monitoring.SpecStoreChangeMonitor;
+import org.apache.gobblin.service.monitoring.SpecChangeMonitor;
import org.apache.gobblin.service.monitoring.SpecStoreChangeMonitorFactory;
import org.apache.gobblin.util.ClassAliasResolver;
import org.apache.gobblin.util.ConfigUtils;
@@ -159,7 +159,7 @@ public class GobblinServiceGuiceModule implements Module {
binder.bind(DagActionReminderScheduler.class);
binder.bind(DagActionStore.class).to(MysqlDagActionStore.class);
- binder.bind(DagManagementDagActionStoreChangeMonitor.class).toProvider(
+ binder.bind(DagActionChangeMonitor.class).toProvider(
DagManagementDagActionStoreChangeMonitorFactory.class).in(Singleton.class);
binder.bind(DagManagement.class).to(DagManagementTaskStreamImpl.class);
binder.bind(DagManagementStateStore.class).to(MySqlDagManagementStateStore.class);
@@ -169,7 +169,7 @@ public class GobblinServiceGuiceModule implements Module {
binder.bind(DagProcessingEngineMetrics.class);
binder.bind(FlowLaunchHandler.class);
binder.bind(MultiActiveLeaseArbiter.class).toProvider(DagActionProcessingMultiActiveLeaseArbiterFactory.class);
-
binder.bind(SpecStoreChangeMonitor.class).toProvider(SpecStoreChangeMonitorFactory.class).in(Singleton.class);
+
binder.bind(SpecChangeMonitor.class).toProvider(SpecStoreChangeMonitorFactory.class).in(Singleton.class);
binder.bind(RequesterService.class).to(NoopRequesterService.class);
binder.bind(SharedFlowMetricsSingleton.class);
@@ -188,7 +188,7 @@ public class GobblinServiceGuiceModule implements Module {
}
if (serviceConfig.isJobStatusMonitorEnabled()) {
-
binder.bind(KafkaJobStatusMonitor.class).toProvider(KafkaJobStatusMonitorFactory.class).in(Singleton.class);
+
binder.bind(JobStatusMonitor.class).toProvider(KafkaJobStatusMonitorFactory.class).in(Singleton.class);
binder.bind(ErrorClassifier.class);
binder.bind(ErrorPatternStore.class)
.to(getClassByNameOrAlias(ErrorPatternStore.class,
serviceConfig.getInnerConfig(),
diff --git
a/gobblin-service/src/main/java/org/apache/gobblin/service/modules/core/GobblinServiceManager.java
b/gobblin-service/src/main/java/org/apache/gobblin/service/modules/core/GobblinServiceManager.java
index d36afdeeec..9bab575350 100644
---
a/gobblin-service/src/main/java/org/apache/gobblin/service/modules/core/GobblinServiceManager.java
+++
b/gobblin-service/src/main/java/org/apache/gobblin/service/modules/core/GobblinServiceManager.java
@@ -77,11 +77,11 @@ import
org.apache.gobblin.service.modules.restli.FlowConfigsV2ResourceHandler;
import org.apache.gobblin.service.modules.restli.FlowExecutionResourceHandler;
import org.apache.gobblin.service.modules.scheduler.GobblinServiceJobScheduler;
import org.apache.gobblin.service.modules.topology.TopologySpecFactory;
-import
org.apache.gobblin.service.monitoring.DagManagementDagActionStoreChangeMonitor;
+import org.apache.gobblin.service.monitoring.DagActionChangeMonitor;
import org.apache.gobblin.service.monitoring.FlowStatusGenerator;
import org.apache.gobblin.service.monitoring.GitConfigMonitor;
-import org.apache.gobblin.service.monitoring.KafkaJobStatusMonitor;
-import org.apache.gobblin.service.monitoring.SpecStoreChangeMonitor;
+import org.apache.gobblin.service.monitoring.JobStatusMonitor;
+import org.apache.gobblin.service.monitoring.SpecChangeMonitor;
import org.apache.gobblin.util.ConfigUtils;
@@ -150,7 +150,7 @@ public class GobblinServiceManager implements
ApplicationLauncher {
protected GitConfigMonitor gitConfigMonitor;
@Inject(optional = true)
- protected KafkaJobStatusMonitor jobStatusMonitor;
+ protected JobStatusMonitor jobStatusMonitor;
@Inject
protected MultiContextIssueRepository issueRepository;
@@ -166,10 +166,10 @@ public class GobblinServiceManager implements
ApplicationLauncher {
protected D2Announcer d2Announcer;
@Inject(optional = true)
- protected SpecStoreChangeMonitor specStoreChangeMonitor;
+ protected SpecChangeMonitor specStoreChangeMonitor;
@Inject(optional = true)
- protected DagManagementDagActionStoreChangeMonitor
dagManagementDagActionStoreChangeMonitor;
+ protected DagActionChangeMonitor dagManagementDagActionStoreChangeMonitor;
@Inject(optional = true)
protected DagProcessingEngine dagProcessingEngine;
diff --git
a/gobblin-service/src/main/java/org/apache/gobblin/service/monitoring/DagActionChangeMonitor.java
b/gobblin-service/src/main/java/org/apache/gobblin/service/monitoring/DagActionChangeMonitor.java
new file mode 100644
index 0000000000..fd5d216844
--- /dev/null
+++
b/gobblin-service/src/main/java/org/apache/gobblin/service/monitoring/DagActionChangeMonitor.java
@@ -0,0 +1,33 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.gobblin.service.monitoring;
+
+import com.google.common.util.concurrent.Service;
+
+
+/**
+ * A marker interface for dag action change monitors to generalize
initialization in {@link
org.apache.gobblin.service.modules.core.GobblinServiceManager#dagManagementDagActionStoreChangeMonitor}
+ */
+public interface DagActionChangeMonitor extends Service {
+
+ /**
+ * Set the monitor to active state where it can start processing events.
+ * Should be called from {@link
org.apache.gobblin.service.modules.core.GobblinServiceManager} after the
service is started.
+ */
+ void setActive();
+}
diff --git
a/gobblin-service/src/main/java/org/apache/gobblin/service/monitoring/DagActionStoreChangeMonitor.java
b/gobblin-service/src/main/java/org/apache/gobblin/service/monitoring/DagActionStoreChangeMonitor.java
index 5b35b0bf7d..9a7a6c80e4 100644
---
a/gobblin-service/src/main/java/org/apache/gobblin/service/monitoring/DagActionStoreChangeMonitor.java
+++
b/gobblin-service/src/main/java/org/apache/gobblin/service/monitoring/DagActionStoreChangeMonitor.java
@@ -59,7 +59,7 @@ import org.apache.gobblin.util.ExecutorsUtils;
* connector between the API and execution layers of GaaS.
*/
@Slf4j
-public abstract class DagActionStoreChangeMonitor extends
HighLevelConsumer<String, DagActionStoreChangeEvent> {
+public abstract class DagActionStoreChangeMonitor extends
HighLevelConsumer<String, DagActionStoreChangeEvent> implements
DagActionChangeMonitor {
public static final String DAG_ACTION_CHANGE_MONITOR_PREFIX =
"dagActionChangeStore";
// Metrics
@@ -166,6 +166,7 @@ public abstract class DagActionStoreChangeMonitor extends
HighLevelConsumer<Stri
This method should be called once by the {@link GobblinServiceManager} only
after the FlowGraph and
SpecCompiler are initialized and running.
*/
+ @Override
public synchronized void setActive() {
if (this.isActive) {
return;
diff --git
a/gobblin-service/src/main/java/org/apache/gobblin/service/monitoring/DagManagementDagActionStoreChangeMonitorFactory.java
b/gobblin-service/src/main/java/org/apache/gobblin/service/monitoring/DagManagementDagActionStoreChangeMonitorFactory.java
index 0c79fc437a..ec4c62f77d 100644
---
a/gobblin-service/src/main/java/org/apache/gobblin/service/monitoring/DagManagementDagActionStoreChangeMonitorFactory.java
+++
b/gobblin-service/src/main/java/org/apache/gobblin/service/monitoring/DagManagementDagActionStoreChangeMonitorFactory.java
@@ -30,14 +30,17 @@ import
org.apache.gobblin.service.modules.orchestration.DagManagement;
import
org.apache.gobblin.service.modules.orchestration.DagManagementStateStore;
import
org.apache.gobblin.service.modules.orchestration.task.DagProcessingEngineMetrics;
import org.apache.gobblin.util.ConfigUtils;
+import org.apache.gobblin.util.reflection.GobblinConstructorUtils;
/**
- * A factory implementation that returns a {@link
DagManagementDagActionStoreChangeMonitor} instance.
+ * A factory implementation that returns a {@link DagActionChangeMonitor}
instance.
*/
@Slf4j
-public class DagManagementDagActionStoreChangeMonitorFactory implements
Provider<DagManagementDagActionStoreChangeMonitor> {
- static final String DAG_ACTION_STORE_CHANGE_MONITOR_NUM_THREADS_KEY =
"numThreads";
+public class DagManagementDagActionStoreChangeMonitorFactory implements
Provider<DagActionChangeMonitor> {
+ private static final String DAG_ACTION_STORE_CHANGE_MONITOR_CLASS_KEY =
"class";
+ private static final String DEFAULT_DAG_ACTION_STORE_CHANGE_MONITOR_CLASS =
DagManagementDagActionStoreChangeMonitor.class.getName();
+ private static final String DAG_ACTION_STORE_CHANGE_MONITOR_NUM_THREADS_KEY
= "numThreads";
private final Config config;
private final DagManagementStateStore dagManagementStateStore;
@@ -55,18 +58,25 @@ public class
DagManagementDagActionStoreChangeMonitorFactory implements Provider
this.dagProcEngineMetrics = dagProcEngineMetrics;
}
- private DagManagementDagActionStoreChangeMonitor
createDagActionStoreMonitor() {
+ private DagActionChangeMonitor createDagActionStoreMonitor() throws
ReflectiveOperationException {
Config dagActionStoreChangeConfig =
this.config.getConfig(DagActionStoreChangeMonitor.DAG_ACTION_CHANGE_MONITOR_PREFIX);
- log.info("DagActionStore will be initialized with config {}",
dagActionStoreChangeConfig);
+ Class<?> dagActionStoreChangeMonitorClass = Class.forName(
+ ConfigUtils.getString(dagActionStoreChangeConfig,
DAG_ACTION_STORE_CHANGE_MONITOR_CLASS_KEY,
DEFAULT_DAG_ACTION_STORE_CHANGE_MONITOR_CLASS));
+ log.info("DagActionStore `{}` will be initialized with config {}",
dagActionStoreChangeMonitorClass, dagActionStoreChangeConfig);
int numThreads = ConfigUtils.getInt(dagActionStoreChangeConfig,
DAG_ACTION_STORE_CHANGE_MONITOR_NUM_THREADS_KEY, 5);
- return new
DagManagementDagActionStoreChangeMonitor(dagActionStoreChangeConfig,
numThreads, dagManagementStateStore,
- this.dagManagement, this.dagActionReminderScheduler,
dagProcEngineMetrics);
+ return (DagActionChangeMonitor)
GobblinConstructorUtils.invokeLongestConstructor(dagActionStoreChangeMonitorClass,
+ dagActionStoreChangeConfig, numThreads, dagManagementStateStore,
this.dagManagement,
+ this.dagActionReminderScheduler, dagProcEngineMetrics);
}
@Override
- public DagManagementDagActionStoreChangeMonitor get() {
- return createDagActionStoreMonitor();
+ public DagActionChangeMonitor get() {
+ try {
+ return createDagActionStoreMonitor();
+ } catch (ReflectiveOperationException e) {
+ throw new RuntimeException("Failed to initialize
DagActionStoreChangeMonitor due to ", e);
+ }
}
}
diff --git
a/gobblin-service/src/main/java/org/apache/gobblin/service/monitoring/JobStatusMonitor.java
b/gobblin-service/src/main/java/org/apache/gobblin/service/monitoring/JobStatusMonitor.java
new file mode 100644
index 0000000000..1ca6016daa
--- /dev/null
+++
b/gobblin-service/src/main/java/org/apache/gobblin/service/monitoring/JobStatusMonitor.java
@@ -0,0 +1,27 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.gobblin.service.monitoring;
+
+import com.google.common.util.concurrent.Service;
+
+
+/**
+ * A marker interface for job status monitors to generalize initialization in
{@link
org.apache.gobblin.service.modules.core.GobblinServiceManager#jobStatusMonitor}
+ */
+public interface JobStatusMonitor extends Service {
+}
diff --git
a/gobblin-service/src/main/java/org/apache/gobblin/service/monitoring/KafkaJobStatusMonitor.java
b/gobblin-service/src/main/java/org/apache/gobblin/service/monitoring/KafkaJobStatusMonitor.java
index be46b844a4..f7b202874d 100644
---
a/gobblin-service/src/main/java/org/apache/gobblin/service/monitoring/KafkaJobStatusMonitor.java
+++
b/gobblin-service/src/main/java/org/apache/gobblin/service/monitoring/KafkaJobStatusMonitor.java
@@ -89,7 +89,7 @@ import static
org.apache.gobblin.util.retry.RetryerFactory.RetryType;
* a {@link FileContextBasedFsStateStore}.
*/
@Slf4j
-public abstract class KafkaJobStatusMonitor extends HighLevelConsumer<byte[],
byte[]> {
+public abstract class KafkaJobStatusMonitor extends HighLevelConsumer<byte[],
byte[]> implements JobStatusMonitor {
public static final String JOB_STATUS_MONITOR_PREFIX = "jobStatusMonitor";
//We use table suffix that is different from the Gobblin job state store
suffix of jst to avoid confusion.
//gst refers to the state store suffix for GaaS-orchestrated Gobblin jobs.
diff --git
a/gobblin-service/src/main/java/org/apache/gobblin/service/monitoring/KafkaJobStatusMonitorFactory.java
b/gobblin-service/src/main/java/org/apache/gobblin/service/monitoring/KafkaJobStatusMonitorFactory.java
index 86ceaecfc7..23b46a7745 100644
---
a/gobblin-service/src/main/java/org/apache/gobblin/service/monitoring/KafkaJobStatusMonitorFactory.java
+++
b/gobblin-service/src/main/java/org/apache/gobblin/service/monitoring/KafkaJobStatusMonitorFactory.java
@@ -43,7 +43,7 @@ import
org.apache.gobblin.util.reflection.GobblinConstructorUtils;
* A factory implementation that returns a {@link KafkaJobStatusMonitor}
instance.
*/
@Slf4j
-public class KafkaJobStatusMonitorFactory implements
Provider<KafkaJobStatusMonitor> {
+public class KafkaJobStatusMonitorFactory implements
Provider<JobStatusMonitor> {
private static final String KAFKA_SSL_CONFIG_PREFIX_KEY =
"jobStatusMonitor.kafka.config";
private static final String DEFAULT_KAFKA_SSL_CONFIG_PREFIX =
"metrics.reporting.kafka.config";
@@ -65,7 +65,7 @@ public class KafkaJobStatusMonitorFactory implements
Provider<KafkaJobStatusMoni
this.errorClassifier = errorClassifier;
}
- private KafkaJobStatusMonitor createJobStatusMonitor()
+ private JobStatusMonitor createJobStatusMonitor()
throws ReflectiveOperationException {
Config jobStatusConfig =
config.getConfig(KafkaJobStatusMonitor.JOB_STATUS_MONITOR_PREFIX);
@@ -94,18 +94,19 @@ public class KafkaJobStatusMonitorFactory implements
Provider<KafkaJobStatusMoni
GaaSJobObservabilityEventProducer.DEFAULT_GAAS_OBSERVABILITY_EVENT_PRODUCER_CLASS));
GaaSJobObservabilityEventProducer observabilityEventProducer =
(GaaSJobObservabilityEventProducer)
GobblinConstructorUtils.invokeLongestConstructor(
observabilityEventProducerClassName,
ConfigUtils.configToState(config), this.issueRepository,
this.instrumentationEnabled);
+ log.info("JobStatusMonitor class `{}` will be initialized with config {}",
jobStatusMonitorClass, jobStatusConfig);
- return (KafkaJobStatusMonitor) GobblinConstructorUtils
+ return (JobStatusMonitor) GobblinConstructorUtils
.invokeLongestConstructor(jobStatusMonitorClass, topic,
jobStatusConfig, numThreads, jobIssueEventHandler, observabilityEventProducer,
dagManagementStateStore, errorClassifier);
}
@Override
- public KafkaJobStatusMonitor get() {
+ public JobStatusMonitor get() {
try {
return createJobStatusMonitor();
} catch (ReflectiveOperationException e) {
- throw new RuntimeException(e);
+ throw new RuntimeException("Failed to initialize JobStatusMonitor due to
", e);
}
}
}
\ No newline at end of file
diff --git
a/gobblin-service/src/main/java/org/apache/gobblin/service/monitoring/SpecChangeMonitor.java
b/gobblin-service/src/main/java/org/apache/gobblin/service/monitoring/SpecChangeMonitor.java
new file mode 100644
index 0000000000..feefe6a8c6
--- /dev/null
+++
b/gobblin-service/src/main/java/org/apache/gobblin/service/monitoring/SpecChangeMonitor.java
@@ -0,0 +1,33 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.gobblin.service.monitoring;
+
+import com.google.common.util.concurrent.Service;
+
+
+/**
+ * A marker interface for spec change monitors to generalize initialization in
{@link
org.apache.gobblin.service.modules.core.GobblinServiceManager#specStoreChangeMonitor}
+ */
+public interface SpecChangeMonitor extends Service {
+
+ /**
+ * Set the monitor to active state where it can start processing events.
+ * Should be called from {@link
org.apache.gobblin.service.modules.core.GobblinServiceManager} after the
service is started.
+ */
+ void setActive();
+}
diff --git
a/gobblin-service/src/main/java/org/apache/gobblin/service/monitoring/SpecStoreChangeMonitor.java
b/gobblin-service/src/main/java/org/apache/gobblin/service/monitoring/SpecStoreChangeMonitor.java
index 5c183ea326..c15d43d3e8 100644
---
a/gobblin-service/src/main/java/org/apache/gobblin/service/monitoring/SpecStoreChangeMonitor.java
+++
b/gobblin-service/src/main/java/org/apache/gobblin/service/monitoring/SpecStoreChangeMonitor.java
@@ -51,7 +51,7 @@ import
org.apache.gobblin.service.modules.scheduler.GobblinServiceJobScheduler;
* a connector between the API and execution layers of GaaS.
*/
@Slf4j
-public class SpecStoreChangeMonitor extends HighLevelConsumer<String,
GenericStoreChangeEvent> {
+public class SpecStoreChangeMonitor extends HighLevelConsumer<String,
GenericStoreChangeEvent> implements SpecChangeMonitor {
public static final String SPEC_STORE_CHANGE_MONITOR_PREFIX =
"specStoreChangeMonitor";
// Metrics
@@ -108,6 +108,7 @@ public class SpecStoreChangeMonitor extends
HighLevelConsumer<String, GenericSto
This method should be called once by the {@link GobblinServiceManager} only
after the Scheduler is active to ensure
calls to onAddSpec don't fail specCompilation.
*/
+ @Override
public synchronized void setActive() {
if (this.isActive) {
return;
diff --git
a/gobblin-service/src/main/java/org/apache/gobblin/service/monitoring/SpecStoreChangeMonitorFactory.java
b/gobblin-service/src/main/java/org/apache/gobblin/service/monitoring/SpecStoreChangeMonitorFactory.java
index 4d27713688..025b3c2996 100644
---
a/gobblin-service/src/main/java/org/apache/gobblin/service/monitoring/SpecStoreChangeMonitorFactory.java
+++
b/gobblin-service/src/main/java/org/apache/gobblin/service/monitoring/SpecStoreChangeMonitorFactory.java
@@ -28,14 +28,17 @@ import lombok.extern.slf4j.Slf4j;
import org.apache.gobblin.runtime.spec_catalog.FlowCatalog;
import org.apache.gobblin.service.modules.scheduler.GobblinServiceJobScheduler;
import org.apache.gobblin.util.ConfigUtils;
+import org.apache.gobblin.util.reflection.GobblinConstructorUtils;
/**
- * A factory implementation that returns a {@link SpecStoreChangeMonitor}
instance.
+ * A factory implementation that returns a {@link SpecChangeMonitor} instance.
*/
@Slf4j
-public class SpecStoreChangeMonitorFactory implements
Provider<SpecStoreChangeMonitor> {
- static final String SPEC_STORE_CHANGE_MONITOR_NUM_THREADS_KEY = "numThreads";
+public class SpecStoreChangeMonitorFactory implements
Provider<SpecChangeMonitor> {
+ private static final String SPEC_STORE_CHANGE_MONITOR_CLASS_KEY = "class";
+ private static final String DEFAULT_SPEC_STORE_CHANGE_MONITOR_CLASS =
SpecStoreChangeMonitor.class.getName();
+ private static final String SPEC_STORE_CHANGE_MONITOR_NUM_THREADS_KEY =
"numThreads";
private final Config config;
private FlowCatalog flowCatalog;
@@ -48,19 +51,22 @@ public class SpecStoreChangeMonitorFactory implements
Provider<SpecStoreChangeMo
this.scheduler = scheduler;
}
- private SpecStoreChangeMonitor createSpecStoreChangeMonitor()
+ private SpecChangeMonitor createSpecStoreChangeMonitor()
throws ReflectiveOperationException {
Config specStoreChangeConfig =
this.config.getConfig(SpecStoreChangeMonitor.SPEC_STORE_CHANGE_MONITOR_PREFIX);
- log.info("SpecStoreChangeMonitor will be initialized with config {}",
specStoreChangeConfig);
+ Class<?> specStoreChangeMonitorClass = Class.forName(
+ ConfigUtils.getString(specStoreChangeConfig,
SPEC_STORE_CHANGE_MONITOR_CLASS_KEY, DEFAULT_SPEC_STORE_CHANGE_MONITOR_CLASS));
+ log.info("SpecStoreChangeMonitor class `{}` will be initialized with
config {}", specStoreChangeMonitorClass, specStoreChangeConfig);
String topic = ""; // Pass empty string because we expect underlying
client to dynamically determine the Kafka topic
int numThreads = ConfigUtils.getInt(specStoreChangeConfig,
SPEC_STORE_CHANGE_MONITOR_NUM_THREADS_KEY, 5);
- return new SpecStoreChangeMonitor(topic, specStoreChangeConfig,
this.flowCatalog, this.scheduler, numThreads);
+ return (SpecChangeMonitor)
GobblinConstructorUtils.invokeLongestConstructor(specStoreChangeMonitorClass,
topic,
+ specStoreChangeConfig, this.flowCatalog, this.scheduler, numThreads);
}
@Override
- public SpecStoreChangeMonitor get() {
+ public SpecChangeMonitor get() {
try {
return createSpecStoreChangeMonitor();
} catch (ReflectiveOperationException e) {