This is an automated email from the ASF dual-hosted git repository.

kuyu pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/incubator-gobblin.git


The following commit(s) were added to refs/heads/master by this push:
     new 9029a89  [GOBBLIN-1141] add support for common job properties in helix 
job scheduler
9029a89 is described below

commit 9029a89b85ef373f78d603b14d6aaa75998f3356
Author: Arjun <[email protected]>
AuthorDate: Mon May 4 15:59:18 2020 -0700

    [GOBBLIN-1141] add support for common job properties in helix job scheduler
    
    add support for common job properties in helix job
    scheduler
    
    address review comments
    
    address review comments
    
    Closes #2977 from arjun4084346/useClusterConfigs
---
 .../gobblin/cluster/GobblinClusterManager.java       | 19 +++++++++----------
 .../gobblin/cluster/GobblinHelixJobScheduler.java    | 20 ++++++++++----------
 2 files changed, 19 insertions(+), 20 deletions(-)

diff --git 
a/gobblin-cluster/src/main/java/org/apache/gobblin/cluster/GobblinClusterManager.java
 
b/gobblin-cluster/src/main/java/org/apache/gobblin/cluster/GobblinClusterManager.java
index 2c3f49f..011db74 100644
--- 
a/gobblin-cluster/src/main/java/org/apache/gobblin/cluster/GobblinClusterManager.java
+++ 
b/gobblin-cluster/src/main/java/org/apache/gobblin/cluster/GobblinClusterManager.java
@@ -140,25 +140,25 @@ public class GobblinClusterManager implements 
ApplicationLauncher, StandardMetri
   @Getter
   protected final Config config;
 
-  public GobblinClusterManager(String clusterName, String applicationId, 
Config config,
+  public GobblinClusterManager(String clusterName, String applicationId, 
Config sysConfig,
       Optional<Path> appWorkDirOptional) throws Exception {
     this.clusterName = clusterName;
-    this.config = config;
-    this.isStandaloneMode = ConfigUtils.getBoolean(config, 
GobblinClusterConfigurationKeys.STANDALONE_CLUSTER_MODE_KEY,
+    this.config = sysConfig;
+    this.isStandaloneMode = ConfigUtils.getBoolean(sysConfig, 
GobblinClusterConfigurationKeys.STANDALONE_CLUSTER_MODE_KEY,
         GobblinClusterConfigurationKeys.DEFAULT_STANDALONE_CLUSTER_MODE);
 
     this.applicationId = applicationId;
 
-    //Set system properties passed in via application config. As an example, 
Helix uses System#getProperty() for ZK configuration
+    // Set system properties passed in via application config. As an example, 
Helix uses System#getProperty() for ZK configuration
     // overrides such as sessionTimeout. In this case, the overrides specified
     // in the application configuration have to be extracted and set before 
initializing HelixManager.
-    HelixUtils.setSystemProperties(config);
+    HelixUtils.setSystemProperties(sysConfig);
 
     initializeHelixManager();
 
-    this.fs = GobblinClusterUtils.buildFileSystem(config, new Configuration());
+    this.fs = GobblinClusterUtils.buildFileSystem(sysConfig, new 
Configuration());
     this.appWorkDir = appWorkDirOptional.isPresent() ? appWorkDirOptional.get()
-        : GobblinClusterUtils.getAppWorkDirPathFromConfig(config, this.fs, 
clusterName, applicationId);
+        : GobblinClusterUtils.getAppWorkDirPathFromConfig(sysConfig, this.fs, 
clusterName, applicationId);
     LOGGER.info("Configured GobblinClusterManager work dir to: {}", 
this.appWorkDir);
 
     initializeAppLauncherAndServices();
@@ -361,10 +361,9 @@ public class GobblinClusterManager implements 
ApplicationLauncher, StandardMetri
   /**
    * Build the {@link GobblinHelixJobScheduler} for the Application Master.
    */
-  private GobblinHelixJobScheduler buildGobblinHelixJobScheduler(Config 
config, Path appWorkDir,
+  private GobblinHelixJobScheduler buildGobblinHelixJobScheduler(Config 
sysConfig, Path appWorkDir,
       List<? extends Tag<?>> metadataTags, SchedulerService schedulerService) 
throws Exception {
-    Properties properties = ConfigUtils.configToProperties(config);
-    return new GobblinHelixJobScheduler(properties,
+    return new GobblinHelixJobScheduler(sysConfig,
         this.multiManager.getJobClusterHelixManager(),
         this.multiManager.getTaskDriverHelixManager(),
         this.eventBus,
diff --git 
a/gobblin-cluster/src/main/java/org/apache/gobblin/cluster/GobblinHelixJobScheduler.java
 
b/gobblin-cluster/src/main/java/org/apache/gobblin/cluster/GobblinHelixJobScheduler.java
index 20d0a3d..43438d8 100644
--- 
a/gobblin-cluster/src/main/java/org/apache/gobblin/cluster/GobblinHelixJobScheduler.java
+++ 
b/gobblin-cluster/src/main/java/org/apache/gobblin/cluster/GobblinHelixJobScheduler.java
@@ -87,8 +87,9 @@ import org.apache.gobblin.util.PropertiesUtils;
 public class GobblinHelixJobScheduler extends JobScheduler implements 
StandardMetricsBridge{
 
   private static final Logger LOGGER = 
LoggerFactory.getLogger(GobblinHelixJobScheduler.class);
+  private static final String COMMON_JOB_PROPS = "gobblin.common.job.props";
 
-  private final Properties properties;
+  private final Properties commonJobProperties;
   private final HelixManager jobHelixManager;
   private final Optional<HelixManager> taskDriverHelixManager;
   private final EventBus eventBus;
@@ -109,7 +110,7 @@ public class GobblinHelixJobScheduler extends JobScheduler 
implements StandardMe
   private boolean startServicesCompleted;
   private final long helixJobStopTimeoutMillis;
 
-  public GobblinHelixJobScheduler(Properties properties,
+  public GobblinHelixJobScheduler(Config sysConfig,
                                   HelixManager jobHelixManager,
                                   Optional<HelixManager> 
taskDriverHelixManager,
                                   EventBus eventBus,
@@ -117,8 +118,8 @@ public class GobblinHelixJobScheduler extends JobScheduler 
implements StandardMe
                                   SchedulerService schedulerService,
                                   MutableJobCatalog jobCatalog) throws 
Exception {
 
-    super(properties, schedulerService);
-    this.properties = properties;
+    super(ConfigUtils.configToProperties(sysConfig), schedulerService);
+    this.commonJobProperties = 
ConfigUtils.configToProperties(ConfigUtils.getConfigOrEmpty(sysConfig, 
COMMON_JOB_PROPS));
     this.jobHelixManager = jobHelixManager;
     this.taskDriverHelixManager = taskDriverHelixManager;
     this.eventBus = eventBus;
@@ -128,8 +129,7 @@ public class GobblinHelixJobScheduler extends JobScheduler 
implements StandardMe
     this.jobCatalog = jobCatalog;
     this.metricContext = Instrumented.getMetricContext(new 
org.apache.gobblin.configuration.State(properties), this.getClass());
 
-    Config jobConfig = ConfigUtils.propertiesToConfig(this.properties);
-    int metricsWindowSizeInMin = ConfigUtils.getInt(jobConfig,
+    int metricsWindowSizeInMin = ConfigUtils.getInt(sysConfig,
                                                     
ConfigurationKeys.METRIC_TIMER_WINDOW_SIZE_IN_MINUTES,
                                                     
ConfigurationKeys.DEFAULT_METRIC_TIMER_WINDOW_SIZE_IN_MINUTES);
 
@@ -155,10 +155,10 @@ public class GobblinHelixJobScheduler extends 
JobScheduler implements StandardMe
 
     this.startServicesCompleted = false;
 
-    this.helixJobStopTimeoutMillis = ConfigUtils.getLong(jobConfig, 
GobblinClusterConfigurationKeys.HELIX_JOB_STOP_TIMEOUT_SECONDS,
+    this.helixJobStopTimeoutMillis = ConfigUtils.getLong(sysConfig, 
GobblinClusterConfigurationKeys.HELIX_JOB_STOP_TIMEOUT_SECONDS,
         
GobblinClusterConfigurationKeys.DEFAULT_HELIX_JOB_STOP_TIMEOUT_SECONDS) * 1000;
 
-    this.helixWorkflowListingTimeoutMillis = ConfigUtils.getLong(jobConfig, 
GobblinClusterConfigurationKeys.HELIX_WORKFLOW_LISTING_TIMEOUT_SECONDS,
+    this.helixWorkflowListingTimeoutMillis = ConfigUtils.getLong(sysConfig, 
GobblinClusterConfigurationKeys.HELIX_WORKFLOW_LISTING_TIMEOUT_SECONDS,
         
GobblinClusterConfigurationKeys.DEFAULT_HELIX_WORKFLOW_LISTING_TIMEOUT_SECONDS) 
* 1000;
 
   }
@@ -304,10 +304,10 @@ public class GobblinHelixJobScheduler extends 
JobScheduler implements StandardMe
     String jobUri = newJobArrival.getJobName();
     LOGGER.info("Received new job configuration of job " + jobUri);
     try {
-      Properties jobProps = new Properties();
+      Properties jobProps = new Properties(this.commonJobProperties);
       jobProps.putAll(newJobArrival.getJobConfig());
 
-      // set uri so that we can delete it later
+      // set uri so that we can delete this job later
       jobProps.setProperty(GobblinClusterConfigurationKeys.JOB_SPEC_URI, 
jobUri);
 
       this.jobSchedulerMetrics.updateTimeBeforeJobScheduling(jobProps);

Reply via email to