This is an automated email from the ASF dual-hosted git repository.
kuyu pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/incubator-gobblin.git
The following commit(s) were added to refs/heads/master by this push:
new 9029a89 [GOBBLIN-1141] add support for common job properties in helix
job scheduler
9029a89 is described below
commit 9029a89b85ef373f78d603b14d6aaa75998f3356
Author: Arjun <[email protected]>
AuthorDate: Mon May 4 15:59:18 2020 -0700
[GOBBLIN-1141] add support for common job properties in helix job scheduler
add support for common job properties in helix job
scheduler
address review comments
address review comments
Closes #2977 from arjun4084346/useClusterConfigs
---
.../gobblin/cluster/GobblinClusterManager.java | 19 +++++++++----------
.../gobblin/cluster/GobblinHelixJobScheduler.java | 20 ++++++++++----------
2 files changed, 19 insertions(+), 20 deletions(-)
diff --git
a/gobblin-cluster/src/main/java/org/apache/gobblin/cluster/GobblinClusterManager.java
b/gobblin-cluster/src/main/java/org/apache/gobblin/cluster/GobblinClusterManager.java
index 2c3f49f..011db74 100644
---
a/gobblin-cluster/src/main/java/org/apache/gobblin/cluster/GobblinClusterManager.java
+++
b/gobblin-cluster/src/main/java/org/apache/gobblin/cluster/GobblinClusterManager.java
@@ -140,25 +140,25 @@ public class GobblinClusterManager implements
ApplicationLauncher, StandardMetri
@Getter
protected final Config config;
- public GobblinClusterManager(String clusterName, String applicationId,
Config config,
+ public GobblinClusterManager(String clusterName, String applicationId,
Config sysConfig,
Optional<Path> appWorkDirOptional) throws Exception {
this.clusterName = clusterName;
- this.config = config;
- this.isStandaloneMode = ConfigUtils.getBoolean(config,
GobblinClusterConfigurationKeys.STANDALONE_CLUSTER_MODE_KEY,
+ this.config = sysConfig;
+ this.isStandaloneMode = ConfigUtils.getBoolean(sysConfig,
GobblinClusterConfigurationKeys.STANDALONE_CLUSTER_MODE_KEY,
GobblinClusterConfigurationKeys.DEFAULT_STANDALONE_CLUSTER_MODE);
this.applicationId = applicationId;
- //Set system properties passed in via application config. As an example,
Helix uses System#getProperty() for ZK configuration
+ // Set system properties passed in via application config. As an example,
Helix uses System#getProperty() for ZK configuration
// overrides such as sessionTimeout. In this case, the overrides specified
// in the application configuration have to be extracted and set before
initializing HelixManager.
- HelixUtils.setSystemProperties(config);
+ HelixUtils.setSystemProperties(sysConfig);
initializeHelixManager();
- this.fs = GobblinClusterUtils.buildFileSystem(config, new Configuration());
+ this.fs = GobblinClusterUtils.buildFileSystem(sysConfig, new
Configuration());
this.appWorkDir = appWorkDirOptional.isPresent() ? appWorkDirOptional.get()
- : GobblinClusterUtils.getAppWorkDirPathFromConfig(config, this.fs,
clusterName, applicationId);
+ : GobblinClusterUtils.getAppWorkDirPathFromConfig(sysConfig, this.fs,
clusterName, applicationId);
LOGGER.info("Configured GobblinClusterManager work dir to: {}",
this.appWorkDir);
initializeAppLauncherAndServices();
@@ -361,10 +361,9 @@ public class GobblinClusterManager implements
ApplicationLauncher, StandardMetri
/**
* Build the {@link GobblinHelixJobScheduler} for the Application Master.
*/
- private GobblinHelixJobScheduler buildGobblinHelixJobScheduler(Config
config, Path appWorkDir,
+ private GobblinHelixJobScheduler buildGobblinHelixJobScheduler(Config
sysConfig, Path appWorkDir,
List<? extends Tag<?>> metadataTags, SchedulerService schedulerService)
throws Exception {
- Properties properties = ConfigUtils.configToProperties(config);
- return new GobblinHelixJobScheduler(properties,
+ return new GobblinHelixJobScheduler(sysConfig,
this.multiManager.getJobClusterHelixManager(),
this.multiManager.getTaskDriverHelixManager(),
this.eventBus,
diff --git
a/gobblin-cluster/src/main/java/org/apache/gobblin/cluster/GobblinHelixJobScheduler.java
b/gobblin-cluster/src/main/java/org/apache/gobblin/cluster/GobblinHelixJobScheduler.java
index 20d0a3d..43438d8 100644
---
a/gobblin-cluster/src/main/java/org/apache/gobblin/cluster/GobblinHelixJobScheduler.java
+++
b/gobblin-cluster/src/main/java/org/apache/gobblin/cluster/GobblinHelixJobScheduler.java
@@ -87,8 +87,9 @@ import org.apache.gobblin.util.PropertiesUtils;
public class GobblinHelixJobScheduler extends JobScheduler implements
StandardMetricsBridge{
private static final Logger LOGGER =
LoggerFactory.getLogger(GobblinHelixJobScheduler.class);
+ private static final String COMMON_JOB_PROPS = "gobblin.common.job.props";
- private final Properties properties;
+ private final Properties commonJobProperties;
private final HelixManager jobHelixManager;
private final Optional<HelixManager> taskDriverHelixManager;
private final EventBus eventBus;
@@ -109,7 +110,7 @@ public class GobblinHelixJobScheduler extends JobScheduler
implements StandardMe
private boolean startServicesCompleted;
private final long helixJobStopTimeoutMillis;
- public GobblinHelixJobScheduler(Properties properties,
+ public GobblinHelixJobScheduler(Config sysConfig,
HelixManager jobHelixManager,
Optional<HelixManager>
taskDriverHelixManager,
EventBus eventBus,
@@ -117,8 +118,8 @@ public class GobblinHelixJobScheduler extends JobScheduler
implements StandardMe
SchedulerService schedulerService,
MutableJobCatalog jobCatalog) throws
Exception {
- super(properties, schedulerService);
- this.properties = properties;
+ super(ConfigUtils.configToProperties(sysConfig), schedulerService);
+ this.commonJobProperties =
ConfigUtils.configToProperties(ConfigUtils.getConfigOrEmpty(sysConfig,
COMMON_JOB_PROPS));
this.jobHelixManager = jobHelixManager;
this.taskDriverHelixManager = taskDriverHelixManager;
this.eventBus = eventBus;
@@ -128,8 +129,7 @@ public class GobblinHelixJobScheduler extends JobScheduler
implements StandardMe
this.jobCatalog = jobCatalog;
this.metricContext = Instrumented.getMetricContext(new
org.apache.gobblin.configuration.State(properties), this.getClass());
- Config jobConfig = ConfigUtils.propertiesToConfig(this.properties);
- int metricsWindowSizeInMin = ConfigUtils.getInt(jobConfig,
+ int metricsWindowSizeInMin = ConfigUtils.getInt(sysConfig,
ConfigurationKeys.METRIC_TIMER_WINDOW_SIZE_IN_MINUTES,
ConfigurationKeys.DEFAULT_METRIC_TIMER_WINDOW_SIZE_IN_MINUTES);
@@ -155,10 +155,10 @@ public class GobblinHelixJobScheduler extends
JobScheduler implements StandardMe
this.startServicesCompleted = false;
- this.helixJobStopTimeoutMillis = ConfigUtils.getLong(jobConfig,
GobblinClusterConfigurationKeys.HELIX_JOB_STOP_TIMEOUT_SECONDS,
+ this.helixJobStopTimeoutMillis = ConfigUtils.getLong(sysConfig,
GobblinClusterConfigurationKeys.HELIX_JOB_STOP_TIMEOUT_SECONDS,
GobblinClusterConfigurationKeys.DEFAULT_HELIX_JOB_STOP_TIMEOUT_SECONDS) * 1000;
- this.helixWorkflowListingTimeoutMillis = ConfigUtils.getLong(jobConfig,
GobblinClusterConfigurationKeys.HELIX_WORKFLOW_LISTING_TIMEOUT_SECONDS,
+ this.helixWorkflowListingTimeoutMillis = ConfigUtils.getLong(sysConfig,
GobblinClusterConfigurationKeys.HELIX_WORKFLOW_LISTING_TIMEOUT_SECONDS,
GobblinClusterConfigurationKeys.DEFAULT_HELIX_WORKFLOW_LISTING_TIMEOUT_SECONDS)
* 1000;
}
@@ -304,10 +304,10 @@ public class GobblinHelixJobScheduler extends
JobScheduler implements StandardMe
String jobUri = newJobArrival.getJobName();
LOGGER.info("Received new job configuration of job " + jobUri);
try {
- Properties jobProps = new Properties();
+ Properties jobProps = new Properties(this.commonJobProperties);
jobProps.putAll(newJobArrival.getJobConfig());
- // set uri so that we can delete it later
+ // set uri so that we can delete this job later
jobProps.setProperty(GobblinClusterConfigurationKeys.JOB_SPEC_URI,
jobUri);
this.jobSchedulerMetrics.updateTimeBeforeJobScheduling(jobProps);