Repository: incubator-gobblin Updated Branches: refs/heads/master 648c2a2ab -> a6ec4a9af
[GOBBLIN-584] Improve the Helix configuration naming. Closes #2450 from yukuai518/distmetrics Project: http://git-wip-us.apache.org/repos/asf/incubator-gobblin/repo Commit: http://git-wip-us.apache.org/repos/asf/incubator-gobblin/commit/a6ec4a9a Tree: http://git-wip-us.apache.org/repos/asf/incubator-gobblin/tree/a6ec4a9a Diff: http://git-wip-us.apache.org/repos/asf/incubator-gobblin/diff/a6ec4a9a Branch: refs/heads/master Commit: a6ec4a9af528039b69f6c7f33f968723214c557b Parents: 648c2a2 Author: Kuai Yu <[email protected]> Authored: Wed Sep 12 13:14:21 2018 -0700 Committer: Hung Tran <[email protected]> Committed: Wed Sep 12 13:14:21 2018 -0700 ---------------------------------------------------------------------- .../configuration/ConfigurationKeys.java | 9 ++---- .../GobblinClusterConfigurationKeys.java | 7 +++++ ...blinHelixDistributeJobExecutionLauncher.java | 29 ++++++++++++++------ .../cluster/GobblinHelixJobLauncher.java | 24 +++++++++++----- .../org/apache/gobblin/cluster/HelixUtils.java | 3 +- .../apache/gobblin/util/PropertiesUtils.java | 4 +++ 6 files changed, 52 insertions(+), 24 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/incubator-gobblin/blob/a6ec4a9a/gobblin-api/src/main/java/org/apache/gobblin/configuration/ConfigurationKeys.java ---------------------------------------------------------------------- diff --git a/gobblin-api/src/main/java/org/apache/gobblin/configuration/ConfigurationKeys.java b/gobblin-api/src/main/java/org/apache/gobblin/configuration/ConfigurationKeys.java index 47233e8..35a3a45 100644 --- a/gobblin-api/src/main/java/org/apache/gobblin/configuration/ConfigurationKeys.java +++ b/gobblin-api/src/main/java/org/apache/gobblin/configuration/ConfigurationKeys.java @@ -112,13 +112,8 @@ public class ConfigurationKeys { public static final String SCHEDULER_WAIT_FOR_JOB_COMPLETION_KEY = "scheduler.wait.for.job.completion"; public static final String DEFAULT_SCHEDULER_WAIT_FOR_JOB_COMPLETION = Boolean.TRUE.toString(); - public static final String HELIX_JOB_TIMEOUT_ENABLED_KEY = "job.timeout.enabled"; - public static final String DEFAULT_HELIX_JOB_TIMEOUT_ENABLED = "false"; - public static final String HELIX_JOB_TIMEOUT_SECONDS = "job.timeout.seconds"; - public static final String DEFAULT_HELIX_JOB_TIMEOUT_SECONDS = "10800"; - - public static final String HELIX_TASK_TIMEOUT_SECONDS = "task.timeout.seconds"; - public static final long DEFAULT_HELIX_TASK_TIMEOUT_SECONDS = 60 * 60; + public static final String TASK_TIMEOUT_SECONDS = "task.timeout.seconds"; + public static final long DEFAULT_TASK_TIMEOUT_SECONDS = 60 * 60; /** * Task executor and state tracker configuration properties. http://git-wip-us.apache.org/repos/asf/incubator-gobblin/blob/a6ec4a9a/gobblin-cluster/src/main/java/org/apache/gobblin/cluster/GobblinClusterConfigurationKeys.java ---------------------------------------------------------------------- diff --git a/gobblin-cluster/src/main/java/org/apache/gobblin/cluster/GobblinClusterConfigurationKeys.java b/gobblin-cluster/src/main/java/org/apache/gobblin/cluster/GobblinClusterConfigurationKeys.java index b6c11e3..2b1ba09 100644 --- a/gobblin-cluster/src/main/java/org/apache/gobblin/cluster/GobblinClusterConfigurationKeys.java +++ b/gobblin-cluster/src/main/java/org/apache/gobblin/cluster/GobblinClusterConfigurationKeys.java @@ -118,4 +118,11 @@ public class GobblinClusterConfigurationKeys { public static final long DEFAULT_HELIX_WORKFLOW_EXPIRY_TIME_SECONDS = 6 * 60 * 60; public static final String TASK_RUNNER_SUITE_BUILDER = GOBBLIN_CLUSTER_PREFIX + "taskRunnerSuite.builder"; + + public static final String HELIX_JOB_TIMEOUT_ENABLED_KEY = "helix.job.timeout.enabled"; + public static final String DEFAULT_HELIX_JOB_TIMEOUT_ENABLED = "false"; + public static final String HELIX_JOB_TIMEOUT_SECONDS = "helix.job.timeout.seconds"; + public static final String DEFAULT_HELIX_JOB_TIMEOUT_SECONDS = "10800"; + public static final String HELIX_TASK_TIMEOUT_SECONDS = "helix.task.timeout.seconds"; + public static final String HELIX_MAX_TASK_RETRIES_KEY = "helix.task.maxretries"; } http://git-wip-us.apache.org/repos/asf/incubator-gobblin/blob/a6ec4a9a/gobblin-cluster/src/main/java/org/apache/gobblin/cluster/GobblinHelixDistributeJobExecutionLauncher.java ---------------------------------------------------------------------- diff --git a/gobblin-cluster/src/main/java/org/apache/gobblin/cluster/GobblinHelixDistributeJobExecutionLauncher.java b/gobblin-cluster/src/main/java/org/apache/gobblin/cluster/GobblinHelixDistributeJobExecutionLauncher.java index b5f8928..eb78938 100644 --- a/gobblin-cluster/src/main/java/org/apache/gobblin/cluster/GobblinHelixDistributeJobExecutionLauncher.java +++ b/gobblin-cluster/src/main/java/org/apache/gobblin/cluster/GobblinHelixDistributeJobExecutionLauncher.java @@ -184,6 +184,14 @@ class GobblinHelixDistributeJobExecutionLauncher implements JobExecutionLauncher /** * Create a job config builder which has a single task that wraps the original jobProps. + * + * The planning job (which runs the original {@link GobblinHelixJobLauncher}) will be + * executed on one of the Helix participants. + * + * We rely on the underlying {@link GobblinHelixJobLauncher} to correctly handle the task + * execution timeout so that the planning job itself is relieved of the timeout constrain. + * + * In short, the planning job will run once and requires no timeout. */ private JobConfig.Builder createPlanningJob (Properties jobProps) { // Create a single task for job planning @@ -199,10 +207,15 @@ class GobblinHelixDistributeJobExecutionLauncher implements JobExecutionLauncher taskConfigMap.put(planningId, TaskConfig.Builder.from(rawConfigMap)); JobConfig.Builder jobConfigBuilder = new JobConfig.Builder(); - jobConfigBuilder.setTimeoutPerTask(PropertiesUtils.getPropAsLong( - jobProps, - ConfigurationKeys.HELIX_TASK_TIMEOUT_SECONDS, - ConfigurationKeys.DEFAULT_HELIX_TASK_TIMEOUT_SECONDS) * 1000); + // We want GobblinHelixJobLauncher only run once. + jobConfigBuilder.setMaxAttemptsPerTask(1); + + // Planning job never timeout (Helix defaults 1h timeout, set a large number '1 month') + jobConfigBuilder.setTimeoutPerTask(JobConfig.DEFAULT_TIMEOUT_PER_TASK * 24 * 30); + + jobConfigBuilder.setNumConcurrentTasksPerInstance(PropertiesUtils.getPropAsInt(jobProps, + GobblinClusterConfigurationKeys.HELIX_CLUSTER_TASK_CONCURRENCY, + GobblinClusterConfigurationKeys.HELIX_CLUSTER_TASK_CONCURRENCY_DEFAULT)); jobConfigBuilder.setFailureThreshold(1); jobConfigBuilder.addTaskConfigMap(taskConfigMap).setCommand(GobblinTaskRunner.GOBBLIN_JOB_FACTORY_NAME); @@ -252,10 +265,10 @@ class GobblinHelixDistributeJobExecutionLauncher implements JobExecutionLauncher } private DistributeJobResult waitForJobCompletion(String workFlowName, String jobName) throws InterruptedException { - boolean timeoutEnabled = Boolean.parseBoolean(this.jobProperties.getProperty(ConfigurationKeys.HELIX_JOB_TIMEOUT_ENABLED_KEY, - ConfigurationKeys.DEFAULT_HELIX_JOB_TIMEOUT_ENABLED)); - long timeoutInSeconds = Long.parseLong(this.jobProperties.getProperty(ConfigurationKeys.HELIX_JOB_TIMEOUT_SECONDS, - ConfigurationKeys.DEFAULT_HELIX_JOB_TIMEOUT_SECONDS)); + boolean timeoutEnabled = Boolean.parseBoolean(this.jobProperties.getProperty(GobblinClusterConfigurationKeys.HELIX_JOB_TIMEOUT_ENABLED_KEY, + GobblinClusterConfigurationKeys.DEFAULT_HELIX_JOB_TIMEOUT_ENABLED)); + long timeoutInSeconds = Long.parseLong(this.jobProperties.getProperty(GobblinClusterConfigurationKeys.HELIX_JOB_TIMEOUT_SECONDS, + GobblinClusterConfigurationKeys.DEFAULT_HELIX_JOB_TIMEOUT_SECONDS)); try { HelixUtils.waitJobCompletion( http://git-wip-us.apache.org/repos/asf/incubator-gobblin/blob/a6ec4a9a/gobblin-cluster/src/main/java/org/apache/gobblin/cluster/GobblinHelixJobLauncher.java ---------------------------------------------------------------------- diff --git a/gobblin-cluster/src/main/java/org/apache/gobblin/cluster/GobblinHelixJobLauncher.java b/gobblin-cluster/src/main/java/org/apache/gobblin/cluster/GobblinHelixJobLauncher.java index 6672923..8523e21 100644 --- a/gobblin-cluster/src/main/java/org/apache/gobblin/cluster/GobblinHelixJobLauncher.java +++ b/gobblin-cluster/src/main/java/org/apache/gobblin/cluster/GobblinHelixJobLauncher.java @@ -269,12 +269,20 @@ public class GobblinHelixJobLauncher extends AbstractJobLauncher { } JobConfig.Builder jobConfigBuilder = new JobConfig.Builder(); + + // Helix task attempts = retries + 1 (fallback to general task retry for backward compatibility) jobConfigBuilder.setMaxAttemptsPerTask(this.jobContext.getJobState().getPropAsInt( - ConfigurationKeys.MAX_TASK_RETRIES_KEY, ConfigurationKeys.DEFAULT_MAX_TASK_RETRIES)); + GobblinClusterConfigurationKeys.HELIX_MAX_TASK_RETRIES_KEY, + this.jobContext.getJobState().getPropAsInt( + ConfigurationKeys.MAX_TASK_RETRIES_KEY, + ConfigurationKeys.DEFAULT_MAX_TASK_RETRIES)) + 1); + // Helix task timeout (fallback to general task timeout for backward compatibility) jobConfigBuilder.setTimeoutPerTask(this.jobContext.getJobState().getPropAsLong( - ConfigurationKeys.HELIX_TASK_TIMEOUT_SECONDS, - ConfigurationKeys.DEFAULT_HELIX_TASK_TIMEOUT_SECONDS) * 1000); + GobblinClusterConfigurationKeys.HELIX_TASK_TIMEOUT_SECONDS, + this.jobContext.getJobState().getPropAsLong( + ConfigurationKeys.TASK_TIMEOUT_SECONDS, + ConfigurationKeys.DEFAULT_TASK_TIMEOUT_SECONDS)) * 1000); jobConfigBuilder.setFailureThreshold(workUnits.size()); jobConfigBuilder.addTaskConfigMap(taskConfigMap).setCommand(GobblinTaskRunner.GOBBLIN_TASK_FACTORY_NAME); @@ -377,10 +385,12 @@ public class GobblinHelixJobLauncher extends AbstractJobLauncher { } private void waitForJobCompletion() throws InterruptedException { - boolean timeoutEnabled = Boolean.parseBoolean(this.jobProps.getProperty(ConfigurationKeys.HELIX_JOB_TIMEOUT_ENABLED_KEY, - ConfigurationKeys.DEFAULT_HELIX_JOB_TIMEOUT_ENABLED)); - long timeoutInSeconds = Long.parseLong(this.jobProps.getProperty(ConfigurationKeys.HELIX_JOB_TIMEOUT_SECONDS, - ConfigurationKeys.DEFAULT_HELIX_JOB_TIMEOUT_SECONDS)); + boolean timeoutEnabled = Boolean.parseBoolean(this.jobProps.getProperty( + GobblinClusterConfigurationKeys.HELIX_JOB_TIMEOUT_ENABLED_KEY, + GobblinClusterConfigurationKeys.DEFAULT_HELIX_JOB_TIMEOUT_ENABLED)); + long timeoutInSeconds = Long.parseLong(this.jobProps.getProperty( + GobblinClusterConfigurationKeys.HELIX_JOB_TIMEOUT_SECONDS, + GobblinClusterConfigurationKeys.DEFAULT_HELIX_JOB_TIMEOUT_SECONDS)); try { HelixUtils.waitJobCompletion( http://git-wip-us.apache.org/repos/asf/incubator-gobblin/blob/a6ec4a9a/gobblin-cluster/src/main/java/org/apache/gobblin/cluster/HelixUtils.java ---------------------------------------------------------------------- diff --git a/gobblin-cluster/src/main/java/org/apache/gobblin/cluster/HelixUtils.java b/gobblin-cluster/src/main/java/org/apache/gobblin/cluster/HelixUtils.java index 452e421..6fbc7c5 100644 --- a/gobblin-cluster/src/main/java/org/apache/gobblin/cluster/HelixUtils.java +++ b/gobblin-cluster/src/main/java/org/apache/gobblin/cluster/HelixUtils.java @@ -176,12 +176,11 @@ public class HelixUtils { static void handleJobTimeout(String workFlowName, String jobName, HelixManager helixManager, Object jobLauncher, JobListener jobListener) throws InterruptedException { try { + log.warn("Timeout occurred for job launcher {} with job {}", jobLauncher.getClass(), jobName); if (jobLauncher instanceof GobblinHelixJobLauncher) { ((GobblinHelixJobLauncher) jobLauncher).cancelJob(jobListener); } else if (jobLauncher instanceof GobblinHelixDistributeJobExecutionLauncher) { ((GobblinHelixDistributeJobExecutionLauncher) jobLauncher).cancel(); - } else { - log.warn("Timeout occured for unknown job launcher {}", jobLauncher.getClass()); } } catch (JobException e) { throw new RuntimeException("Unable to cancel job " + jobName + ": ", e); http://git-wip-us.apache.org/repos/asf/incubator-gobblin/blob/a6ec4a9a/gobblin-utility/src/main/java/org/apache/gobblin/util/PropertiesUtils.java ---------------------------------------------------------------------- diff --git a/gobblin-utility/src/main/java/org/apache/gobblin/util/PropertiesUtils.java b/gobblin-utility/src/main/java/org/apache/gobblin/util/PropertiesUtils.java index c41273c..51f2019 100644 --- a/gobblin-utility/src/main/java/org/apache/gobblin/util/PropertiesUtils.java +++ b/gobblin-utility/src/main/java/org/apache/gobblin/util/PropertiesUtils.java @@ -61,6 +61,10 @@ public class PropertiesUtils { return Boolean.valueOf(properties.getProperty(key, defaultValue)); } + public static int getPropAsInt(Properties properties, String key, int defaultValue) { + return Integer.parseInt(properties.getProperty(key, Integer.toString(defaultValue))); + } + public static long getPropAsLong(Properties properties, String key, long defaultValue) { return Long.parseLong(properties.getProperty(key, Long.toString(defaultValue))); }
