This is an automated email from the ASF dual-hosted git repository.
suvasude pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/incubator-gobblin.git
The following commit(s) were added to refs/heads/master by this push:
new 0e637c0 [GOBBLIN-779] make job status retriever configurable
0e637c0 is described below
commit 0e637c0a355c0a506934996abeecdf1ffe75928f
Author: Arjun <[email protected]>
AuthorDate: Wed May 22 20:37:58 2019 -0700
[GOBBLIN-779] make job status retriever configurable
Closes #2643 from arjun4084346/jobStatusRetriever
---
.../gobblin/service/modules/orchestration/DagManager.java | 12 ++----------
.../gobblin/service/monitoring/KafkaJobStatusMonitor.java | 1 -
2 files changed, 2 insertions(+), 11 deletions(-)
diff --git
a/gobblin-service/src/main/java/org/apache/gobblin/service/modules/orchestration/DagManager.java
b/gobblin-service/src/main/java/org/apache/gobblin/service/modules/orchestration/DagManager.java
index 2e2360b..5210778 100644
---
a/gobblin-service/src/main/java/org/apache/gobblin/service/modules/orchestration/DagManager.java
+++
b/gobblin-service/src/main/java/org/apache/gobblin/service/modules/orchestration/DagManager.java
@@ -154,18 +154,10 @@ public class DagManager extends AbstractIdleService {
this.scheduledExecutorPool = Executors.newScheduledThreadPool(numThreads);
this.pollingInterval = ConfigUtils.getInt(config,
JOB_STATUS_POLLING_INTERVAL_KEY, DEFAULT_JOB_STATUS_POLLING_INTERVAL);
this.instrumentationEnabled = instrumentationEnabled;
- boolean jobStatusMonitorEnabled =
- ConfigUtils.getBoolean(config,
KafkaJobStatusMonitor.JOB_STATUS_MONITOR_ENABLED_KEY, true);
try {
- Class jobStatusRetrieverClass;
- if (jobStatusMonitorEnabled) {
- jobStatusRetrieverClass =
Class.forName(DEFAULT_JOB_STATUS_RETRIEVER_CLASS);
- this.jobStatusMonitor = new
KafkaJobStatusMonitorFactory().createJobStatusMonitor(config);
- } else {
- jobStatusRetrieverClass =
Class.forName(config.getString(JOB_STATUS_RETRIEVER_CLASS_KEY));
- this.jobStatusMonitor = null;
- }
+ Class jobStatusRetrieverClass =
Class.forName(ConfigUtils.getString(config, JOB_STATUS_RETRIEVER_CLASS_KEY,
DEFAULT_JOB_STATUS_RETRIEVER_CLASS));
+ this.jobStatusMonitor = new
KafkaJobStatusMonitorFactory().createJobStatusMonitor(config);
this.jobStatusRetriever =
(JobStatusRetriever)
GobblinConstructorUtils.invokeLongestConstructor(jobStatusRetrieverClass,
config);
} catch (ReflectiveOperationException e) {
diff --git
a/gobblin-service/src/main/java/org/apache/gobblin/service/monitoring/KafkaJobStatusMonitor.java
b/gobblin-service/src/main/java/org/apache/gobblin/service/monitoring/KafkaJobStatusMonitor.java
index c50705c..99c862a 100644
---
a/gobblin-service/src/main/java/org/apache/gobblin/service/monitoring/KafkaJobStatusMonitor.java
+++
b/gobblin-service/src/main/java/org/apache/gobblin/service/monitoring/KafkaJobStatusMonitor.java
@@ -54,7 +54,6 @@ import org.apache.gobblin.util.ConfigUtils;
@Slf4j
public abstract class KafkaJobStatusMonitor extends HighLevelConsumer<byte[],
byte[]> {
static final String JOB_STATUS_MONITOR_PREFIX = "jobStatusMonitor";
- public static final String JOB_STATUS_MONITOR_ENABLED_KEY =
JOB_STATUS_MONITOR_PREFIX + ".enabled";
//We use table suffix that is different from the Gobblin job state store
suffix of jst to avoid confusion.
//gst refers to the state store suffix for GaaS-orchestrated Gobblin jobs.
public static final String STATE_STORE_TABLE_SUFFIX = "gst";