This is an automated email from the ASF dual-hosted git repository.

suvasude pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/incubator-gobblin.git


The following commit(s) were added to refs/heads/master by this push:
     new 0e637c0  [GOBBLIN-779] make job status retriever configurable
0e637c0 is described below

commit 0e637c0a355c0a506934996abeecdf1ffe75928f
Author: Arjun <[email protected]>
AuthorDate: Wed May 22 20:37:58 2019 -0700

    [GOBBLIN-779] make job status retriever configurable
    
    Closes #2643 from arjun4084346/jobStatusRetriever
---
 .../gobblin/service/modules/orchestration/DagManager.java    | 12 ++----------
 .../gobblin/service/monitoring/KafkaJobStatusMonitor.java    |  1 -
 2 files changed, 2 insertions(+), 11 deletions(-)

diff --git 
a/gobblin-service/src/main/java/org/apache/gobblin/service/modules/orchestration/DagManager.java
 
b/gobblin-service/src/main/java/org/apache/gobblin/service/modules/orchestration/DagManager.java
index 2e2360b..5210778 100644
--- 
a/gobblin-service/src/main/java/org/apache/gobblin/service/modules/orchestration/DagManager.java
+++ 
b/gobblin-service/src/main/java/org/apache/gobblin/service/modules/orchestration/DagManager.java
@@ -154,18 +154,10 @@ public class DagManager extends AbstractIdleService {
     this.scheduledExecutorPool = Executors.newScheduledThreadPool(numThreads);
     this.pollingInterval = ConfigUtils.getInt(config, 
JOB_STATUS_POLLING_INTERVAL_KEY, DEFAULT_JOB_STATUS_POLLING_INTERVAL);
     this.instrumentationEnabled = instrumentationEnabled;
-    boolean jobStatusMonitorEnabled =
-        ConfigUtils.getBoolean(config, 
KafkaJobStatusMonitor.JOB_STATUS_MONITOR_ENABLED_KEY, true);
 
     try {
-      Class jobStatusRetrieverClass;
-      if (jobStatusMonitorEnabled) {
-        jobStatusRetrieverClass = 
Class.forName(DEFAULT_JOB_STATUS_RETRIEVER_CLASS);
-        this.jobStatusMonitor = new 
KafkaJobStatusMonitorFactory().createJobStatusMonitor(config);
-      } else {
-        jobStatusRetrieverClass = 
Class.forName(config.getString(JOB_STATUS_RETRIEVER_CLASS_KEY));
-        this.jobStatusMonitor = null;
-      }
+      Class jobStatusRetrieverClass = 
Class.forName(ConfigUtils.getString(config, JOB_STATUS_RETRIEVER_CLASS_KEY, 
DEFAULT_JOB_STATUS_RETRIEVER_CLASS));
+      this.jobStatusMonitor = new 
KafkaJobStatusMonitorFactory().createJobStatusMonitor(config);
       this.jobStatusRetriever =
           (JobStatusRetriever) 
GobblinConstructorUtils.invokeLongestConstructor(jobStatusRetrieverClass, 
config);
     } catch (ReflectiveOperationException e) {
diff --git 
a/gobblin-service/src/main/java/org/apache/gobblin/service/monitoring/KafkaJobStatusMonitor.java
 
b/gobblin-service/src/main/java/org/apache/gobblin/service/monitoring/KafkaJobStatusMonitor.java
index c50705c..99c862a 100644
--- 
a/gobblin-service/src/main/java/org/apache/gobblin/service/monitoring/KafkaJobStatusMonitor.java
+++ 
b/gobblin-service/src/main/java/org/apache/gobblin/service/monitoring/KafkaJobStatusMonitor.java
@@ -54,7 +54,6 @@ import org.apache.gobblin.util.ConfigUtils;
 @Slf4j
 public abstract class KafkaJobStatusMonitor extends HighLevelConsumer<byte[], 
byte[]> {
   static final String JOB_STATUS_MONITOR_PREFIX = "jobStatusMonitor";
-  public static final String JOB_STATUS_MONITOR_ENABLED_KEY =  
JOB_STATUS_MONITOR_PREFIX + ".enabled";
   //We use table suffix that is different from the Gobblin job state store 
suffix of jst to avoid confusion.
   //gst refers to the state store suffix for GaaS-orchestrated Gobblin jobs.
   public static final String STATE_STORE_TABLE_SUFFIX = "gst";

Reply via email to