Repository: incubator-gobblin Updated Branches: refs/heads/master 92f4255bf -> 754b06696
[GOBBLIN-661] Prevent jobs resubmission after manager failure Closes #2532 from kyuamazon/catalog Project: http://git-wip-us.apache.org/repos/asf/incubator-gobblin/repo Commit: http://git-wip-us.apache.org/repos/asf/incubator-gobblin/commit/754b0669 Tree: http://git-wip-us.apache.org/repos/asf/incubator-gobblin/tree/754b0669 Diff: http://git-wip-us.apache.org/repos/asf/incubator-gobblin/diff/754b0669 Branch: refs/heads/master Commit: 754b066960e9cb1b9fd4c93f2238d3efc056b550 Parents: 92f4255 Author: Kuai Yu <[email protected]> Authored: Mon Jan 14 11:04:06 2019 -0800 Committer: Hung Tran <[email protected]> Committed: Mon Jan 14 11:04:06 2019 -0800 ---------------------------------------------------------------------- .../GobblinClusterConfigurationKeys.java | 11 + ...blinHelixDistributeJobExecutionLauncher.java | 13 +- .../cluster/GobblinHelixJobScheduler.java | 84 ++++---- .../gobblin/cluster/GobblinHelixJobTask.java | 31 ++- .../cluster/GobblinHelixMultiManager.java | 49 +++-- .../GobblinHelixPlanningJobLauncherMetrics.java | 23 ++- .../gobblin/cluster/HelixJobsMapping.java | 30 ++- .../cluster/HelixRetriggeringJobCallable.java | 202 +++++++++++++------ .../org/apache/gobblin/cluster/HelixUtils.java | 38 ++-- .../TaskRunnerSuiteForJobFactoryTest.java | 6 +- .../job_catalog/NonObservingFSJobCatalog.java | 3 +- 11 files changed, 327 insertions(+), 163 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/incubator-gobblin/blob/754b0669/gobblin-cluster/src/main/java/org/apache/gobblin/cluster/GobblinClusterConfigurationKeys.java ---------------------------------------------------------------------- diff --git a/gobblin-cluster/src/main/java/org/apache/gobblin/cluster/GobblinClusterConfigurationKeys.java b/gobblin-cluster/src/main/java/org/apache/gobblin/cluster/GobblinClusterConfigurationKeys.java index 6791d52..faeaf17 100644 --- a/gobblin-cluster/src/main/java/org/apache/gobblin/cluster/GobblinClusterConfigurationKeys.java +++ b/gobblin-cluster/src/main/java/org/apache/gobblin/cluster/GobblinClusterConfigurationKeys.java @@ -90,6 +90,9 @@ public class GobblinClusterConfigurationKeys { public static final String PLANNING_ID_KEY = PLANNING_CONF_PREFIX + "idKey"; public static final String PLANNING_JOB_CREATE_TIME = PLANNING_CONF_PREFIX + "createTime"; + // Actual job properties + public static final String ACTUAL_JOB_NAME_PREFIX = "ActualJob"; + // job spec operation public static final String JOB_ALWAYS_DELETE = GOBBLIN_CLUSTER_PREFIX + "job.alwaysDelete"; @@ -114,6 +117,8 @@ public class GobblinClusterConfigurationKeys { public static final String JOB_CONFIGURATION_MANAGER_KEY = GOBBLIN_CLUSTER_PREFIX + "job.configuration.manager"; public static final String JOB_SPEC_REFRESH_INTERVAL = GOBBLIN_CLUSTER_PREFIX + "job.spec.refresh.interval"; + public static final String JOB_SPEC_URI = GOBBLIN_CLUSTER_PREFIX + "job.spec.uri"; + public static final String SPEC_CONSUMER_CLASS_KEY = GOBBLIN_CLUSTER_PREFIX + "specConsumer.class"; public static final String DEFAULT_SPEC_CONSUMER_CLASS = "org.apache.gobblin.service.SimpleKafkaSpecConsumer"; @@ -143,4 +148,10 @@ public class GobblinClusterConfigurationKeys { public static final String HELIX_WORKFLOW_DELETE_TIMEOUT_SECONDS = GOBBLIN_CLUSTER_PREFIX + "workflowDeleteTimeoutSeconds"; public static final long DEFAULT_HELIX_WORKFLOW_DELETE_TIMEOUT_SECONDS = 300; + public static final String CLEAN_ALL_DIST_JOBS = GOBBLIN_CLUSTER_PREFIX + "bootup.clean.dist.jobs"; + public static final boolean DEFAULT_CLEAN_ALL_DIST_JOBS = false; + + public static final String KILL_DUPLICATE_PLANNING_JOB = GOBBLIN_CLUSTER_PREFIX + "kill.duplicate.planningJob"; + public static final boolean DEFAULT_KILL_DUPLICATE_PLANNING_JOB = true; + } http://git-wip-us.apache.org/repos/asf/incubator-gobblin/blob/754b0669/gobblin-cluster/src/main/java/org/apache/gobblin/cluster/GobblinHelixDistributeJobExecutionLauncher.java ---------------------------------------------------------------------- diff --git a/gobblin-cluster/src/main/java/org/apache/gobblin/cluster/GobblinHelixDistributeJobExecutionLauncher.java b/gobblin-cluster/src/main/java/org/apache/gobblin/cluster/GobblinHelixDistributeJobExecutionLauncher.java index 9424ca8..b14b700 100644 --- a/gobblin-cluster/src/main/java/org/apache/gobblin/cluster/GobblinHelixDistributeJobExecutionLauncher.java +++ b/gobblin-cluster/src/main/java/org/apache/gobblin/cluster/GobblinHelixDistributeJobExecutionLauncher.java @@ -122,16 +122,12 @@ class GobblinHelixDistributeJobExecutionLauncher implements JobExecutionLauncher Config combined = ConfigUtils.propertiesToConfig(jobPlanningProps) .withFallback(ConfigUtils.propertiesToConfig(sysProps)); - this.jobsMapping = new HelixJobsMapping(combined, - PathUtils.getRootPath(builder.appWorkDir).toUri(), - builder.appWorkDir.toString()); - this.workFlowExpiryTimeSeconds = ConfigUtils.getLong(combined, GobblinClusterConfigurationKeys.HELIX_WORKFLOW_EXPIRY_TIME_SECONDS, GobblinClusterConfigurationKeys.DEFAULT_HELIX_WORKFLOW_EXPIRY_TIME_SECONDS); this.planningJobLauncherMetrics = builder.planningJobLauncherMetrics; this.helixMetrics = builder.helixMetrics; - + this.jobsMapping = builder.jobsMapping; this.helixJobStopTimeoutSeconds = ConfigUtils.getLong(combined, GobblinClusterConfigurationKeys.HELIX_JOB_STOP_TIMEOUT_SECONDS, GobblinClusterConfigurationKeys.DEFAULT_HELIX_JOB_STOP_TIMEOUT_SECONDS); @@ -172,6 +168,7 @@ class GobblinHelixDistributeJobExecutionLauncher implements JobExecutionLauncher Path appWorkDir; GobblinHelixPlanningJobLauncherMetrics planningJobLauncherMetrics; GobblinHelixMetrics helixMetrics; + HelixJobsMapping jobsMapping; public GobblinHelixDistributeJobExecutionLauncher build() throws Exception { return new GobblinHelixDistributeJobExecutionLauncher(this); } @@ -288,9 +285,11 @@ class GobblinHelixDistributeJobExecutionLauncher implements JobExecutionLauncher } private DistributeJobResult waitForJobCompletion(String workFlowName, String jobName) throws InterruptedException { - boolean timeoutEnabled = Boolean.parseBoolean(this.jobPlanningProps.getProperty(GobblinClusterConfigurationKeys.HELIX_JOB_TIMEOUT_ENABLED_KEY, + boolean timeoutEnabled = Boolean.parseBoolean(this.jobPlanningProps.getProperty( + GobblinClusterConfigurationKeys.HELIX_JOB_TIMEOUT_ENABLED_KEY, GobblinClusterConfigurationKeys.DEFAULT_HELIX_JOB_TIMEOUT_ENABLED)); - long timeoutInSeconds = Long.parseLong(this.jobPlanningProps.getProperty(GobblinClusterConfigurationKeys.HELIX_JOB_TIMEOUT_SECONDS, + long timeoutInSeconds = Long.parseLong(this.jobPlanningProps.getProperty( + GobblinClusterConfigurationKeys.HELIX_JOB_TIMEOUT_SECONDS, GobblinClusterConfigurationKeys.DEFAULT_HELIX_JOB_TIMEOUT_SECONDS)); try { http://git-wip-us.apache.org/repos/asf/incubator-gobblin/blob/754b0669/gobblin-cluster/src/main/java/org/apache/gobblin/cluster/GobblinHelixJobScheduler.java ---------------------------------------------------------------------- diff --git a/gobblin-cluster/src/main/java/org/apache/gobblin/cluster/GobblinHelixJobScheduler.java b/gobblin-cluster/src/main/java/org/apache/gobblin/cluster/GobblinHelixJobScheduler.java index 9cf757c..79b088b 100644 --- a/gobblin-cluster/src/main/java/org/apache/gobblin/cluster/GobblinHelixJobScheduler.java +++ b/gobblin-cluster/src/main/java/org/apache/gobblin/cluster/GobblinHelixJobScheduler.java @@ -17,8 +17,6 @@ package org.apache.gobblin.cluster; -import java.net.URI; -import java.net.URISyntaxException; import java.util.Collection; import java.util.List; import java.util.Optional; @@ -28,6 +26,7 @@ import java.util.concurrent.ExecutionException; import java.util.concurrent.Future; import java.util.concurrent.TimeUnit; import java.util.concurrent.TimeoutException; +import java.util.concurrent.locks.Lock; import org.apache.hadoop.fs.Path; import org.apache.helix.HelixManager; @@ -38,6 +37,7 @@ import com.google.common.collect.ImmutableList; import com.google.common.collect.Maps; import com.google.common.eventbus.EventBus; import com.google.common.eventbus.Subscribe; +import com.google.common.util.concurrent.Striped; import org.apache.gobblin.annotation.Alpha; import org.apache.gobblin.cluster.event.DeleteJobConfigArrivalEvent; @@ -54,6 +54,7 @@ import org.apache.gobblin.runtime.listeners.JobListener; import org.apache.gobblin.scheduler.JobScheduler; import org.apache.gobblin.scheduler.SchedulerService; import org.apache.gobblin.util.ConfigUtils; +import org.apache.gobblin.util.PathUtils; import org.apache.gobblin.util.PropertiesUtils; @@ -91,6 +92,8 @@ public class GobblinHelixJobScheduler extends JobScheduler implements StandardMe final GobblinHelixJobSchedulerMetrics jobSchedulerMetrics; final GobblinHelixJobLauncherMetrics launcherMetrics; final GobblinHelixPlanningJobLauncherMetrics planningJobLauncherMetrics; + final HelixJobsMapping jobsMapping; + final Striped<Lock> locks = Striped.lazyWeakLock(256); private boolean startServicesCompleted; @@ -125,9 +128,13 @@ public class GobblinHelixJobScheduler extends JobScheduler implements StandardMe this.metricContext, metricsWindowSizeInMin); + this.jobsMapping = new HelixJobsMapping(ConfigUtils.propertiesToConfig(properties), + PathUtils.getRootPath(appWorkDir).toUri(), + appWorkDir.toString()); + this.planningJobLauncherMetrics = new GobblinHelixPlanningJobLauncherMetrics("planningLauncherInScheduler", this.metricContext, - metricsWindowSizeInMin); + metricsWindowSizeInMin, this.jobsMapping); this.helixMetrics = new GobblinHelixMetrics("helixMetricsInJobScheduler", this.metricContext, @@ -171,11 +178,24 @@ public class GobblinHelixJobScheduler extends JobScheduler implements StandardMe @Override protected void startServices() throws Exception { + + boolean cleanAllDistJobs = PropertiesUtils.getPropAsBoolean(this.properties, + GobblinClusterConfigurationKeys.CLEAN_ALL_DIST_JOBS, + String.valueOf(GobblinClusterConfigurationKeys.DEFAULT_CLEAN_ALL_DIST_JOBS)); + + if (cleanAllDistJobs) { + for (org.apache.gobblin.configuration.State state : this.jobsMapping.getAllStates()) { + String jobName = state.getId(); + LOGGER.info("Delete mapping for job " + jobName); + this.jobsMapping.deleteMapping(jobName); + } + } } @Override public void runJob(Properties jobProps, JobListener jobListener) throws JobException { new HelixRetriggeringJobCallable(this, + this.jobCatalog, this.properties, jobProps, jobListener, @@ -183,7 +203,9 @@ public class GobblinHelixJobScheduler extends JobScheduler implements StandardMe this.helixMetrics, this.appWorkDir, this.jobHelixManager, - this.taskDriverHelixManager).call(); + this.taskDriverHelixManager, + this.jobsMapping, + this.locks).call(); } @Override @@ -203,6 +225,7 @@ public class GobblinHelixJobScheduler extends JobScheduler implements StandardMe public Future<?> scheduleJobImmediately(Properties jobProps, JobListener jobListener) { HelixRetriggeringJobCallable retriggeringJob = new HelixRetriggeringJobCallable(this, + this.jobCatalog, this.properties, jobProps, jobListener, @@ -210,7 +233,9 @@ public class GobblinHelixJobScheduler extends JobScheduler implements StandardMe this.helixMetrics, this.appWorkDir, this.jobHelixManager, - this.taskDriverHelixManager); + this.taskDriverHelixManager, + this.jobsMapping, + this.locks); final Future<?> future = this.jobExecutor.submit(retriggeringJob); return new Future() { @@ -256,24 +281,28 @@ public class GobblinHelixJobScheduler extends JobScheduler implements StandardMe @Subscribe public void handleNewJobConfigArrival(NewJobConfigArrivalEvent newJobArrival) { - LOGGER.info("Received new job configuration of job " + newJobArrival.getJobName()); + String jobUri = newJobArrival.getJobName(); + LOGGER.info("Received new job configuration of job " + jobUri); try { Properties jobProps = new Properties(); jobProps.putAll(newJobArrival.getJobConfig()); + // set uri so that we can delete it later + jobProps.setProperty(GobblinClusterConfigurationKeys.JOB_SPEC_URI, jobUri); + this.jobSchedulerMetrics.updateTimeBeforeJobScheduling(jobProps); if (jobProps.containsKey(ConfigurationKeys.JOB_SCHEDULE_KEY)) { - LOGGER.info("Scheduling job " + newJobArrival.getJobName()); + LOGGER.info("Scheduling job " + jobUri); scheduleJob(jobProps, new GobblinHelixJobLauncherListener(this.launcherMetrics)); } else { - LOGGER.info("No job schedule found, so running job " + newJobArrival.getJobName()); - this.jobExecutor.execute(new NonScheduledJobRunner(newJobArrival.getJobName(), jobProps, + LOGGER.info("No job schedule found, so running job " + jobUri); + this.jobExecutor.execute(new NonScheduledJobRunner(jobProps, new GobblinHelixJobLauncherListener(this.launcherMetrics))); } } catch (JobException je) { - LOGGER.error("Failed to schedule or run job " + newJobArrival.getJobName(), je); + LOGGER.error("Failed to schedule or run job " + jobUri, je); } } @@ -308,59 +337,26 @@ public class GobblinHelixJobScheduler extends JobScheduler implements StandardMe * This class is responsible for running non-scheduled jobs. */ class NonScheduledJobRunner implements Runnable { - - private final String jobUri; private final Properties jobProps; private final GobblinHelixJobLauncherListener jobListener; private final Long creationTimeInMillis; - public NonScheduledJobRunner(String jobUri, - Properties jobProps, + public NonScheduledJobRunner(Properties jobProps, GobblinHelixJobLauncherListener jobListener) { - this.jobUri = jobUri; this.jobProps = jobProps; this.jobListener = jobListener; this.creationTimeInMillis = System.currentTimeMillis(); } - private void deleteJobSpec(boolean alwaysDelete, boolean isDeleted) { - if (alwaysDelete && !isDeleted) { - try { - GobblinHelixJobScheduler.this.jobCatalog.remove(new URI(jobUri)); - } catch (URISyntaxException e) { - LOGGER.error("Always delete " + jobUri + ". Failed to remove job with bad uri " + jobUri, e); - } - } - } - @Override public void run() { - boolean alwaysDelete = PropertiesUtils.getPropAsBoolean(this.jobProps, - GobblinClusterConfigurationKeys.JOB_ALWAYS_DELETE, - "false"); - boolean isDeleted = false; - try { GobblinHelixJobScheduler.this.jobSchedulerMetrics.updateTimeBeforeJobLaunching(this.jobProps); GobblinHelixJobScheduler.this.jobSchedulerMetrics.updateTimeBetweenJobSchedulingAndJobLaunching(this.creationTimeInMillis, System.currentTimeMillis()); GobblinHelixJobScheduler.this.runJob(this.jobProps, this.jobListener); - - // remove non-scheduled job catalog once done so it won't be re-executed - if (GobblinHelixJobScheduler.this.jobCatalog != null) { - try { - GobblinHelixJobScheduler.this.jobCatalog.remove(new URI(jobUri)); - isDeleted = true; - } catch (URISyntaxException e) { - LOGGER.error("Failed to remove job with bad uri " + jobUri, e); - } - } } catch (JobException je) { - deleteJobSpec(alwaysDelete, isDeleted); LOGGER.error("Failed to run job " + this.jobProps.getProperty(ConfigurationKeys.JOB_NAME_KEY), je); - } catch (Exception e) { - deleteJobSpec(alwaysDelete, isDeleted); - throw e; } } } http://git-wip-us.apache.org/repos/asf/incubator-gobblin/blob/754b0669/gobblin-cluster/src/main/java/org/apache/gobblin/cluster/GobblinHelixJobTask.java ---------------------------------------------------------------------- diff --git a/gobblin-cluster/src/main/java/org/apache/gobblin/cluster/GobblinHelixJobTask.java b/gobblin-cluster/src/main/java/org/apache/gobblin/cluster/GobblinHelixJobTask.java index ff61ea6..aa2d7fa 100644 --- a/gobblin-cluster/src/main/java/org/apache/gobblin/cluster/GobblinHelixJobTask.java +++ b/gobblin-cluster/src/main/java/org/apache/gobblin/cluster/GobblinHelixJobTask.java @@ -65,6 +65,7 @@ class GobblinHelixJobTask implements Task { private final String planningJobId; private final HelixManager jobHelixManager; private final Path appWorkDir; + private final String jobName; private final List<? extends Tag<?>> metadataTags; private GobblinHelixJobLauncher launcher; private GobblinHelixJobTaskMetrics jobTaskMetrics; @@ -99,6 +100,7 @@ class GobblinHelixJobTask implements Task { throw new RuntimeException("Job doesn't have planning ID"); } + this.jobName = jobPlusSysConfig.getProperty(ConfigurationKeys.JOB_NAME_KEY); this.planningJobId = jobPlusSysConfig.getProperty(GobblinClusterConfigurationKeys.PLANNING_ID_KEY); this.jobsMapping = jobsMapping; this.appWorkDir = builder.getAppWorkPath(); @@ -147,8 +149,6 @@ class GobblinHelixJobTask implements Task { this.jobTaskMetrics.updateTimeBetweenJobSubmissionAndExecution(this.jobPlusSysConfig); try (Closer closer = Closer.create()) { - String jobName = jobPlusSysConfig.getProperty(ConfigurationKeys.JOB_NAME_KEY); - Optional<String> planningIdFromStateStore = this.jobsMapping.getPlanningJobId(jobName); long timeOut = PropertiesUtils.getPropAsLong(jobPlusSysConfig, @@ -171,14 +171,19 @@ class GobblinHelixJobTask implements Task { try { HelixUtils.deleteWorkflow(previousActualJobId, this.jobHelixManager, timeOut); } catch (HelixException e) { - log.error("Helix cannot delete previous actual job id {} within 5 min.", previousActualJobId); + log.error("Helix cannot delete previous actual job id {} within {} seconds.", previousActualJobId, timeOut / 1000); return new TaskResult(TaskResult.Status.FAILED, ExceptionUtils.getFullStackTrace(e)); } } } else { - log.info("Actual job {} does not exist. First time run.", this.planningJobId); + log.info("No previous actual job [plan: {}]. First time run.", this.planningJobId); } + String actualJobId = HelixJobsMapping.createActualJobId(jobPlusSysConfig); + log.info("Planning job {} creates actual job {}", this.planningJobId, actualJobId); + + this.jobPlusSysConfig.setProperty(ConfigurationKeys.JOB_ID_KEY, actualJobId); + this.launcher = createJobLauncher(); this.jobsMapping.setActualJobId(jobName, this.planningJobId, this.launcher.getJobId()); @@ -191,13 +196,20 @@ class GobblinHelixJobTask implements Task { log.info("Planning job {} has more runs due to early stop.", this.planningJobId); } } + log.info("Completing planning job {}", this.planningJobId); + return new TaskResult(TaskResult.Status.COMPLETED, ""); } catch (Exception e) { log.info("Failing planning job {}", this.planningJobId); return new TaskResult(TaskResult.Status.FAILED, "Exception occurred for job " + planningJobId + ":" + ExceptionUtils .getFullStackTrace(e)); + } finally { + // always cleanup the job mapping for current job name. + try { + this.jobsMapping.deleteMapping(jobName); + } catch (Exception e) { + return new TaskResult(TaskResult.Status.FAILED,"Cannot delete jobs mapping for job : " + jobName); + } } - log.info("Completing planning job {}", this.planningJobId); - return new TaskResult(TaskResult.Status.COMPLETED, ""); } @Override @@ -208,6 +220,13 @@ class GobblinHelixJobTask implements Task { launcher.cancelJob(this.jobLauncherListener); } catch (JobException e) { throw new RuntimeException("Unable to cancel planning job " + this.planningJobId + ": ", e); + } finally { + // always cleanup the job mapping for current job name. + try { + this.jobsMapping.deleteMapping(jobName); + } catch (Exception e) { + throw new RuntimeException("Cannot delete jobs mapping for job : " + jobName); + } } } } http://git-wip-us.apache.org/repos/asf/incubator-gobblin/blob/754b0669/gobblin-cluster/src/main/java/org/apache/gobblin/cluster/GobblinHelixMultiManager.java ---------------------------------------------------------------------- diff --git a/gobblin-cluster/src/main/java/org/apache/gobblin/cluster/GobblinHelixMultiManager.java b/gobblin-cluster/src/main/java/org/apache/gobblin/cluster/GobblinHelixMultiManager.java index 81ed7ce..03ae69f 100644 --- a/gobblin-cluster/src/main/java/org/apache/gobblin/cluster/GobblinHelixMultiManager.java +++ b/gobblin-cluster/src/main/java/org/apache/gobblin/cluster/GobblinHelixMultiManager.java @@ -205,7 +205,6 @@ public class GobblinHelixMultiManager implements StandardMetricsBridge { InstanceType.CONTROLLER)); } - // This will creat a dedicated controller for planning job distribution if (this.dedicatedTaskDriverCluster) { // This will create a Helix administrator to dispatch jobs to ZooKeeper this.taskDriverHelixManager = Optional.of(buildHelixManager(this.config, @@ -218,6 +217,7 @@ public class GobblinHelixMultiManager implements StandardMetricsBridge { GobblinClusterConfigurationKeys.DEDICATED_TASK_DRIVER_CLUSTER_CONTROLLER_ENABLED, true); + // This will creat a dedicated controller for planning job distribution if (this.dedicatedTaskDriverClusterController) { this.taskDriverClusterController = Optional.of(GobblinHelixMultiManager .buildHelixManager(this.config, zkConnectionString, GobblinClusterConfigurationKeys.TASK_DRIVER_CLUSTER_NAME_KEY, @@ -339,20 +339,10 @@ public class GobblinHelixMultiManager implements StandardMetricsBridge { if (!isLeader) { log.info("New Helix Controller leader {}", this.managerClusterHelixManager.getInstanceName()); - // Clean up existing jobs - TaskDriver taskDriver = new TaskDriver(this.jobClusterHelixManager); - Map<String, WorkflowConfig> workflows = taskDriver.getWorkflows(); + cleanUpJobs(this.jobClusterHelixManager); - for (Map.Entry<String, WorkflowConfig> entry : workflows.entrySet()) { - String queueName = entry.getKey(); - WorkflowConfig workflowConfig = entry.getValue(); - - // request delete if not already requested - if (workflowConfig.getTargetState() != TargetState.DELETE) { - taskDriver.delete(queueName); - - log.info("Requested delete of queue {}", queueName); - } + if (this.taskDriverHelixManager.isPresent()) { + cleanUpJobs(this.taskDriverHelixManager.get()); } for (LeadershipChangeAwareComponent c: this.leadershipChangeAwareComponents) { @@ -373,6 +363,37 @@ public class GobblinHelixMultiManager implements StandardMetricsBridge { } } + private void cleanUpJobs(HelixManager helixManager) { + // Clean up existing jobs + TaskDriver taskDriver = new TaskDriver(helixManager); + + Map<String, WorkflowConfig> workflows = taskDriver.getWorkflows(); + + boolean cleanupDistJobs = ConfigUtils.getBoolean(this.config, + GobblinClusterConfigurationKeys.CLEAN_ALL_DIST_JOBS, + GobblinClusterConfigurationKeys.DEFAULT_CLEAN_ALL_DIST_JOBS); + + for (Map.Entry<String, WorkflowConfig> entry : workflows.entrySet()) { + String workflowName = entry.getKey(); + + if (workflowName.contains(GobblinClusterConfigurationKeys.PLANNING_CONF_PREFIX) + || workflowName.contains(GobblinClusterConfigurationKeys.ACTUAL_JOB_NAME_PREFIX)) { + if (!cleanupDistJobs) { + log.info("Distributed job {} won't be deleted.", workflowName); + continue; + } + } + + WorkflowConfig workflowConfig = entry.getValue(); + + // request delete if not already requested + if (workflowConfig.getTargetState() != TargetState.DELETE) { + taskDriver.delete(workflowName); + + log.info("Requested delete of workflowName {}", workflowName); + } + } + } /** * A custom {@link MessageHandlerFactory} for {@link MessageHandler}s that handle messages of type http://git-wip-us.apache.org/repos/asf/incubator-gobblin/blob/754b0669/gobblin-cluster/src/main/java/org/apache/gobblin/cluster/GobblinHelixPlanningJobLauncherMetrics.java ---------------------------------------------------------------------- diff --git a/gobblin-cluster/src/main/java/org/apache/gobblin/cluster/GobblinHelixPlanningJobLauncherMetrics.java b/gobblin-cluster/src/main/java/org/apache/gobblin/cluster/GobblinHelixPlanningJobLauncherMetrics.java index 593e72d..8758c9b 100644 --- a/gobblin-cluster/src/main/java/org/apache/gobblin/cluster/GobblinHelixPlanningJobLauncherMetrics.java +++ b/gobblin-cluster/src/main/java/org/apache/gobblin/cluster/GobblinHelixPlanningJobLauncherMetrics.java @@ -17,34 +17,47 @@ package org.apache.gobblin.cluster; +import java.io.IOException; import java.util.concurrent.TimeUnit; import org.apache.gobblin.instrumented.Instrumented; import org.apache.gobblin.instrumented.StandardMetricsBridge; +import org.apache.gobblin.metrics.ContextAwareGauge; import org.apache.gobblin.metrics.ContextAwareTimer; import org.apache.gobblin.metrics.MetricContext; public class GobblinHelixPlanningJobLauncherMetrics extends StandardMetricsBridge.StandardMetrics { private final String metricsName; - + private final HelixJobsMapping jobsMapping; public static final String TIMER_FOR_COMPLETED_PLANNING_JOBS = "timeForCompletedPlanningJobs"; public static final String TIMER_FOR_FAILED_PLANNING_JOBS = "timeForFailedPlanningJobs"; - + public static final String NUM_ACTIVE_PLANNING_JOBS = "numActivePlanningJobs"; final ContextAwareTimer timeForCompletedPlanningJobs; final ContextAwareTimer timeForFailedPlanningJobs; + final ContextAwareGauge<Integer> numActivePlanningJobs; public GobblinHelixPlanningJobLauncherMetrics(String metricsName, final MetricContext metricContext, - int windowSizeInMin) { + int windowSizeInMin, + HelixJobsMapping jobsMapping) { this.metricsName = metricsName; - + this.jobsMapping = jobsMapping; this.timeForCompletedPlanningJobs = metricContext.contextAwareTimer(TIMER_FOR_COMPLETED_PLANNING_JOBS, windowSizeInMin, TimeUnit.MINUTES); this.timeForFailedPlanningJobs = metricContext.contextAwareTimer(TIMER_FOR_FAILED_PLANNING_JOBS, windowSizeInMin, TimeUnit.MINUTES); - + this.numActivePlanningJobs = metricContext.newContextAwareGauge(NUM_ACTIVE_PLANNING_JOBS, ()->getNumOfMappings()); this.contextAwareMetrics.add(timeForCompletedPlanningJobs); this.contextAwareMetrics.add(timeForFailedPlanningJobs); + this.contextAwareMetrics.add(numActivePlanningJobs); + } + + private int getNumOfMappings() { + try { + return this.jobsMapping.getAllStates().size(); + } catch (IOException e) { + return 0; + } } public void updateTimeForCompletedPlanningJobs(long startTime) { http://git-wip-us.apache.org/repos/asf/incubator-gobblin/blob/754b0669/gobblin-cluster/src/main/java/org/apache/gobblin/cluster/HelixJobsMapping.java ---------------------------------------------------------------------- diff --git a/gobblin-cluster/src/main/java/org/apache/gobblin/cluster/HelixJobsMapping.java b/gobblin-cluster/src/main/java/org/apache/gobblin/cluster/HelixJobsMapping.java index 8c2a836..8124ba1 100644 --- a/gobblin-cluster/src/main/java/org/apache/gobblin/cluster/HelixJobsMapping.java +++ b/gobblin-cluster/src/main/java/org/apache/gobblin/cluster/HelixJobsMapping.java @@ -19,7 +19,9 @@ package org.apache.gobblin.cluster; import java.io.IOException; import java.net.URI; +import java.util.List; import java.util.Optional; +import java.util.Properties; import com.typesafe.config.Config; import com.typesafe.config.ConfigValueFactory; @@ -30,8 +32,10 @@ import org.apache.gobblin.configuration.ConfigurationKeys; import org.apache.gobblin.configuration.State; import org.apache.gobblin.metastore.FsStateStore; import org.apache.gobblin.metastore.StateStore; +import org.apache.gobblin.runtime.JobState; import org.apache.gobblin.util.ClassAliasResolver; import org.apache.gobblin.util.ConfigUtils; +import org.apache.gobblin.util.JobLauncherUtils; /** @@ -58,8 +62,8 @@ public class HelixJobsMapping { public static final String DISTRIBUTED_STATE_STORE_NAME_KEY = "jobs.mapping.distributed.state.store.name"; public static final String DEFAULT_DISTRIBUTED_STATE_STORE_NAME = "distributedState"; - - private StateStore stateStore; + + private StateStore<State> stateStore; private String distributedStateStoreName; public HelixJobsMapping(Config sysConfig, URI fsUri, String rootDir) { @@ -94,6 +98,16 @@ public class HelixJobsMapping { this.stateStore = stateStoreFactory.createStateStore(stateStoreJobConfig, State.class); } + public static String createPlanningJobId (Properties jobPlanningProps) { + return JobLauncherUtils.newJobId(GobblinClusterConfigurationKeys.PLANNING_JOB_NAME_PREFIX + + JobState.getJobNameFromProps(jobPlanningProps)); + } + + public static String createActualJobId (Properties jobProps) { + return JobLauncherUtils.newJobId(GobblinClusterConfigurationKeys.ACTUAL_JOB_NAME_PREFIX + + JobState.getJobNameFromProps(jobProps)); + } + @Nullable private State getOrCreate (String storeName, String jobName) throws IOException { if (this.stateStore.exists(storeName, jobName)) { @@ -102,8 +116,8 @@ public class HelixJobsMapping { return new State(); } - private void delete (String storeName, String jobName) throws IOException { - this.stateStore.delete(storeName, jobName); + public void deleteMapping (String jobName) throws IOException { + this.stateStore.delete(this.distributedStateStoreName, jobName); } public void setPlanningJobId (String jobName, String planningJobId) throws IOException { @@ -112,7 +126,7 @@ public class HelixJobsMapping { state.setProp(GobblinClusterConfigurationKeys.PLANNING_ID_KEY, planningJobId); // fs state store use hdfs rename, which assumes the target file doesn't exist. if (stateStore instanceof FsStateStore) { - this.delete(distributedStateStoreName, jobName); + this.deleteMapping(jobName); } this.stateStore.put(distributedStateStoreName, jobName, state); } @@ -124,7 +138,7 @@ public class HelixJobsMapping { state.setProp(ConfigurationKeys.JOB_ID_KEY, actualJobId); // fs state store use hdfs rename, which assumes the target file doesn't exist. if (stateStore instanceof FsStateStore) { - this.delete(distributedStateStoreName, jobName); + this.deleteMapping(jobName); } this.stateStore.put(distributedStateStoreName, jobName, state); } @@ -140,6 +154,10 @@ public class HelixJobsMapping { return id == null? Optional.empty() : Optional.of(id); } + public List<State> getAllStates() throws IOException { + return this.stateStore.getAll(distributedStateStoreName); + } + public Optional<String> getActualJobId (String jobName) throws IOException { return getId(jobName, ConfigurationKeys.JOB_ID_KEY); } http://git-wip-us.apache.org/repos/asf/incubator-gobblin/blob/754b0669/gobblin-cluster/src/main/java/org/apache/gobblin/cluster/HelixRetriggeringJobCallable.java ---------------------------------------------------------------------- diff --git a/gobblin-cluster/src/main/java/org/apache/gobblin/cluster/HelixRetriggeringJobCallable.java b/gobblin-cluster/src/main/java/org/apache/gobblin/cluster/HelixRetriggeringJobCallable.java index f563198..5223c71 100644 --- a/gobblin-cluster/src/main/java/org/apache/gobblin/cluster/HelixRetriggeringJobCallable.java +++ b/gobblin-cluster/src/main/java/org/apache/gobblin/cluster/HelixRetriggeringJobCallable.java @@ -17,29 +17,30 @@ package org.apache.gobblin.cluster; +import java.io.IOException; +import java.net.URI; +import java.net.URISyntaxException; import java.util.Optional; import java.util.Properties; import java.util.concurrent.Callable; -import java.util.concurrent.TimeUnit; +import java.util.concurrent.locks.Lock; import org.apache.hadoop.fs.Path; +import org.apache.helix.HelixException; import org.apache.helix.HelixManager; import com.google.common.io.Closer; +import com.google.common.util.concurrent.Striped; import lombok.extern.slf4j.Slf4j; import org.apache.gobblin.annotation.Alpha; import org.apache.gobblin.configuration.ConfigurationKeys; -import org.apache.gobblin.instrumented.Instrumented; import org.apache.gobblin.runtime.JobException; -import org.apache.gobblin.runtime.JobState; import org.apache.gobblin.runtime.api.JobExecutionMonitor; +import org.apache.gobblin.runtime.api.MutableJobCatalog; import org.apache.gobblin.runtime.listeners.JobListener; import org.apache.gobblin.util.ClassAliasResolver; -import org.apache.gobblin.util.ConfigUtils; -import org.apache.gobblin.util.JobLauncherUtils; -import org.apache.gobblin.util.PathUtils; import org.apache.gobblin.util.PropertiesUtils; import org.apache.gobblin.util.reflection.GobblinConstructorUtils; @@ -83,6 +84,7 @@ import org.apache.gobblin.util.reflection.GobblinConstructorUtils; @Alpha class HelixRetriggeringJobCallable implements Callable { private final GobblinHelixJobScheduler jobScheduler; + private final MutableJobCatalog jobCatalog; private final Properties sysProps; private final Properties jobProps; private final JobListener jobListener; @@ -95,9 +97,13 @@ class HelixRetriggeringJobCallable implements Callable { private GobblinHelixJobLauncher currentJobLauncher = null; private JobExecutionMonitor currentJobMonitor = null; private boolean isDistributeJobEnabled = false; + private final String jobUri; + private boolean jobDeleteAttempted = false; + private final Striped<Lock> locks; public HelixRetriggeringJobCallable( GobblinHelixJobScheduler jobScheduler, + MutableJobCatalog jobCatalog, Properties sysProps, Properties jobProps, JobListener jobListener, @@ -105,8 +111,11 @@ class HelixRetriggeringJobCallable implements Callable { GobblinHelixMetrics helixMetrics, Path appWorkDir, HelixManager jobHelixManager, - Optional<HelixManager> taskDriverHelixManager) { + Optional<HelixManager> taskDriverHelixManager, + HelixJobsMapping jobsMapping, + Striped<Lock> locks) { this.jobScheduler = jobScheduler; + this.jobCatalog = jobCatalog; this.sysProps = sysProps; this.jobProps = jobProps; this.jobListener = jobListener; @@ -116,9 +125,9 @@ class HelixRetriggeringJobCallable implements Callable { this.jobHelixManager = jobHelixManager; this.taskDriverHelixManager = taskDriverHelixManager; this.isDistributeJobEnabled = isDistributeJobEnabled(); - this.jobsMapping = new HelixJobsMapping(ConfigUtils.propertiesToConfig(sysProps), - PathUtils.getRootPath(appWorkDir).toUri(), - appWorkDir.toString()); + this.jobUri = jobProps.getProperty(GobblinClusterConfigurationKeys.JOB_SPEC_URI); + this.jobsMapping = jobsMapping; + this.locks = locks; } private boolean isRetriggeringEnabled() { @@ -137,15 +146,47 @@ class HelixRetriggeringJobCallable implements Callable { @Override public Void call() throws JobException { - if (this.isDistributeJobEnabled) { - runJobExecutionLauncher(); - } else { - runJobLauncherLoop(); + boolean deleteJobWhenException = PropertiesUtils.getPropAsBoolean(this.jobProps, + GobblinClusterConfigurationKeys.JOB_ALWAYS_DELETE, + "false"); + + try { + if (this.isDistributeJobEnabled) { + runJobExecutionLauncher(); + } else { + runJobLauncherLoop(); + } + + deleteJobSpec(); + } catch (Exception e) { // delete job spec when exception occurred + if (deleteJobWhenException) { + deleteJobSpec(); + } + throw e; } return null; } + private void deleteJobSpec() throws JobException { + boolean runOnce = Boolean.valueOf(jobProps.getProperty(ConfigurationKeys.JOB_RUN_ONCE_KEY, "false")); + boolean hasSchedule = jobProps.containsKey(ConfigurationKeys.JOB_SCHEDULE_KEY); + if (runOnce || !hasSchedule) { + if (this.jobCatalog != null) { + try { + if (!this.jobDeleteAttempted) { + log.info("Deleting job spec on {}", this.jobUri); + this.jobScheduler.unscheduleJob(this.jobUri); + this.jobCatalog.remove(new URI(jobUri)); + this.jobDeleteAttempted = true; + } + } catch (URISyntaxException e) { + log.error("Failed to remove job with bad uri " + jobUri, e); + } + } + } + } + /** * <p> In some cases, the job launcher will be early stopped. * It can be due to the large volume of input source data. @@ -181,74 +222,111 @@ class HelixRetriggeringJobCallable implements Callable { */ private void runJobExecutionLauncher() throws JobException { long startTime = 0; + String newPlanningId; + String jobName = jobProps.getProperty(ConfigurationKeys.JOB_NAME_KEY); + Closer closer = Closer.create(); try { + HelixManager planningJobManager = this.taskDriverHelixManager.isPresent()? + this.taskDriverHelixManager.get() : this.jobHelixManager; + String builderStr = jobProps.getProperty(GobblinClusterConfigurationKeys.DISTRIBUTED_JOB_LAUNCHER_BUILDER, GobblinHelixDistributeJobExecutionLauncher.Builder.class.getName()); // Check if any existing planning job is running - String jobName = jobProps.getProperty(ConfigurationKeys.JOB_NAME_KEY); Optional<String> planningJobIdFromStore = jobsMapping.getPlanningJobId(jobName); - if (planningJobIdFromStore.isPresent()) { - String previousPlanningJobId = planningJobIdFromStore.get(); - HelixManager planningJobManager = this.taskDriverHelixManager.isPresent()? - this.taskDriverHelixManager.get() : this.jobHelixManager; + // start of critical section to check if a job with same job name is running + Lock jobLock = locks.get(jobName); + jobLock.lock(); + + try { + if (planningJobIdFromStore.isPresent()) { + String previousPlanningJobId = planningJobIdFromStore.get(); - if (HelixUtils.isJobFinished(previousPlanningJobId, previousPlanningJobId, planningJobManager)) { - log.info("Previous planning job {} has reached to the final state. Start a new one.", previousPlanningJobId); + if (HelixUtils.isJobFinished(previousPlanningJobId, previousPlanningJobId, planningJobManager)) { + log.info("Previous planning job {} has reached to the final state. Start a new one.", previousPlanningJobId); + } else { + boolean killDuplicateJob = PropertiesUtils + .getPropAsBoolean(this.jobProps, GobblinClusterConfigurationKeys.KILL_DUPLICATE_PLANNING_JOB, String.valueOf(GobblinClusterConfigurationKeys.DEFAULT_KILL_DUPLICATE_PLANNING_JOB)); + + if (!killDuplicateJob) { + log.info("Previous planning job {} has not finished yet. Skip this job.", previousPlanningJobId); + return; + } else { + log.info("Previous planning job {} has not finished yet. Kill it.", previousPlanningJobId); + long timeOut = PropertiesUtils.getPropAsLong(sysProps, GobblinClusterConfigurationKeys.HELIX_WORKFLOW_DELETE_TIMEOUT_SECONDS, + GobblinClusterConfigurationKeys.DEFAULT_HELIX_WORKFLOW_DELETE_TIMEOUT_SECONDS) * 1000; + try { + HelixUtils.deleteWorkflow(previousPlanningJobId, planningJobManager, timeOut); + } catch (HelixException e) { + log.info("Helix cannot delete previous planning job id {} within {} seconds.", previousPlanningJobId, + timeOut / 1000); + throw new JobException("Helix cannot delete previous planning job id " + previousPlanningJobId, e); + } + } + } } else { - log.info("Previous planning job {} has not finished yet. Skip it.", previousPlanningJobId); - return; + log.info("Planning job for {} does not exist. First time run.", jobName); } - } else { - log.info("Planning job for {} does not exist. First time run.", jobName); - } - GobblinHelixDistributeJobExecutionLauncher.Builder builder = GobblinConstructorUtils - .<GobblinHelixDistributeJobExecutionLauncher.Builder>invokeLongestConstructor(new ClassAliasResolver( - GobblinHelixDistributeJobExecutionLauncher.Builder.class).resolveClass(builderStr)); - - // Make a separate copy because we could update some of attributes in job properties (like adding planning id). - Properties jobPlanningProps = new Properties(); - jobPlanningProps.putAll(this.jobProps); - - // Inject planning id and start time - String planningId = JobLauncherUtils.newJobId(GobblinClusterConfigurationKeys.PLANNING_JOB_NAME_PREFIX - + JobState.getJobNameFromProps(jobPlanningProps)); - jobPlanningProps.setProperty(GobblinClusterConfigurationKeys.PLANNING_ID_KEY, planningId); - jobPlanningProps.setProperty(GobblinClusterConfigurationKeys.PLANNING_JOB_CREATE_TIME, String.valueOf(System.currentTimeMillis())); - - builder.setSysProps(this.sysProps); - builder.setJobPlanningProps(jobPlanningProps); - builder.setJobHelixManager(this.jobHelixManager); - builder.setTaskDriverHelixManager(this.taskDriverHelixManager); - builder.setAppWorkDir(this.appWorkDir); - builder.setPlanningJobLauncherMetrics(this.planningJobLauncherMetrics); - builder.setHelixMetrics(this.helixMetrics); - - try (Closer closer = Closer.create()) { - log.info("Planning job {} started.", planningId); + GobblinHelixDistributeJobExecutionLauncher.Builder builder = GobblinConstructorUtils.<GobblinHelixDistributeJobExecutionLauncher.Builder>invokeLongestConstructor( + new ClassAliasResolver(GobblinHelixDistributeJobExecutionLauncher.Builder.class).resolveClass(builderStr)); + + // Make a separate copy because we could update some of attributes in job properties (like adding planning id). + Properties jobPlanningProps = new Properties(); + jobPlanningProps.putAll(this.jobProps); + + // Inject planning id and start time + newPlanningId = HelixJobsMapping.createPlanningJobId(jobPlanningProps); + jobPlanningProps.setProperty(GobblinClusterConfigurationKeys.PLANNING_ID_KEY, newPlanningId); + jobPlanningProps.setProperty(GobblinClusterConfigurationKeys.PLANNING_JOB_CREATE_TIME, String.valueOf(System.currentTimeMillis())); + + builder.setSysProps(this.sysProps); + builder.setJobPlanningProps(jobPlanningProps); + builder.setJobHelixManager(this.jobHelixManager); + builder.setTaskDriverHelixManager(this.taskDriverHelixManager); + builder.setAppWorkDir(this.appWorkDir); + builder.setJobsMapping(this.jobsMapping); + builder.setPlanningJobLauncherMetrics(this.planningJobLauncherMetrics); + builder.setHelixMetrics(this.helixMetrics); + + log.info("Planning job {} started.", newPlanningId); GobblinHelixDistributeJobExecutionLauncher launcher = builder.build(); closer.register(launcher); - this.jobsMapping.setPlanningJobId(jobName, planningId); + this.jobsMapping.setPlanningJobId(jobName, newPlanningId); startTime = System.currentTimeMillis(); this.currentJobMonitor = launcher.launchJob(null); - this.currentJobMonitor.get(); - this.currentJobMonitor = null; - log.info("Planning job {} finished.", planningId); - this.planningJobLauncherMetrics.updateTimeForCompletedPlanningJobs(startTime); - } catch (Throwable t) { - if (startTime != 0) { - this.planningJobLauncherMetrics.updateTimeForFailedPlanningJobs(startTime); - } - throw new JobException("Failed to launch and run planning job " + jobProps.getProperty(ConfigurationKeys.JOB_NAME_KEY), t); + + // make sure the planning job will be visible to other parallel running threads, + // so that the same critical section check (querying Helix for job completeness) + // can be applied. + HelixUtils.waitJobInitialization(planningJobManager, newPlanningId, newPlanningId, 300_000); + + } finally { + // end of the critical section to check if a job with same job name is running + jobLock.unlock(); } + + // we can remove the job spec from the catalog because Helix will drive this job to the end. + this.deleteJobSpec(); + + this.currentJobMonitor.get(); + this.currentJobMonitor = null; + log.info("Planning job {} finished.", newPlanningId); + this.planningJobLauncherMetrics.updateTimeForCompletedPlanningJobs(startTime); + } catch (Exception e) { if (startTime != 0) { this.planningJobLauncherMetrics.updateTimeForFailedPlanningJobs(startTime); } - log.error("Failed to run planning job {}", jobProps.getProperty(ConfigurationKeys.JOB_NAME_KEY), e); - throw new JobException("Failed to run planning job " + jobProps.getProperty(ConfigurationKeys.JOB_NAME_KEY), e); + log.error("Failed to run planning job {}", jobName, e); + throw new JobException("Failed to run planning job " + jobName, e); + } finally { + try { + closer.close(); + } catch (IOException e) { + throw new JobException("Cannot properly close planning job " + jobName, e); + } } } http://git-wip-us.apache.org/repos/asf/incubator-gobblin/blob/754b0669/gobblin-cluster/src/main/java/org/apache/gobblin/cluster/HelixUtils.java ---------------------------------------------------------------------- diff --git a/gobblin-cluster/src/main/java/org/apache/gobblin/cluster/HelixUtils.java b/gobblin-cluster/src/main/java/org/apache/gobblin/cluster/HelixUtils.java index f67e77c..69ae823 100644 --- a/gobblin-cluster/src/main/java/org/apache/gobblin/cluster/HelixUtils.java +++ b/gobblin-cluster/src/main/java/org/apache/gobblin/cluster/HelixUtils.java @@ -107,19 +107,11 @@ public class HelixUtils { submitJobToWorkFlow(jobConfigBuilder, queueName, jobName, helixTaskDriver, helixManager, jobQueueDeleteTimeoutSeconds); } - public static void submitJobToWorkFlow(JobConfig.Builder jobConfigBuilder, + static void waitJobInitialization( + HelixManager helixManager, String workFlowName, String jobName, - TaskDriver helixTaskDriver, - HelixManager helixManager, - long workFlowExpiryTime) throws Exception { - - WorkflowConfig workFlowConfig = new WorkflowConfig.Builder().setExpiry(workFlowExpiryTime, TimeUnit.SECONDS).build(); - // Create a work flow for each job with the name being the queue name - Workflow workFlow = new Workflow.Builder(workFlowName).setWorkflowConfig(workFlowConfig).addJob(jobName, jobConfigBuilder).build(); - // start the workflow - helixTaskDriver.start(workFlow); - log.info("Created a work flow {}", workFlowName); + long timeoutMillis) throws Exception { WorkflowContext workflowContext = TaskDriver.getWorkflowContext(helixManager, workFlowName); // If the helix job is deleted from some other thread or a completely external process, @@ -127,15 +119,37 @@ public class HelixUtils { // 1) workflowContext did not get initialized ever, in which case we need to keep waiting, or // 2) it did get initialized but deleted soon after, in which case we should stop waiting // To overcome this issue, we wait here till workflowContext gets initialized - + long start = System.currentTimeMillis(); while (workflowContext == null || workflowContext.getJobState(TaskUtil.getNamespacedJobName(workFlowName, jobName)) == null) { + if (System.currentTimeMillis() - start > timeoutMillis) { + log.error("Job cannot be initialized within {} milliseconds, considered as an error", timeoutMillis); + throw new JobException("Job cannot be initialized within {} milliseconds, considered as an error"); + } workflowContext = TaskDriver.getWorkflowContext(helixManager, workFlowName); Thread.sleep(1000); log.info("Waiting for work flow initialization."); } + log.info("Work flow {} initialized", workFlowName); } + public static void submitJobToWorkFlow(JobConfig.Builder jobConfigBuilder, + String workFlowName, + String jobName, + TaskDriver helixTaskDriver, + HelixManager helixManager, + long workFlowExpiryTime) throws Exception { + + WorkflowConfig workFlowConfig = new WorkflowConfig.Builder().setExpiry(workFlowExpiryTime, TimeUnit.SECONDS).build(); + // Create a work flow for each job with the name being the queue name + Workflow workFlow = new Workflow.Builder(workFlowName).setWorkflowConfig(workFlowConfig).addJob(jobName, jobConfigBuilder).build(); + // start the workflow + helixTaskDriver.start(workFlow); + log.info("Created a work flow {}", workFlowName); + + waitJobInitialization(helixManager, workFlowName, jobName, Long.MAX_VALUE); + } + static void waitJobCompletion(HelixManager helixManager, String workFlowName, String jobName, Optional<Long> timeoutInSeconds) throws InterruptedException, TimeoutException { http://git-wip-us.apache.org/repos/asf/incubator-gobblin/blob/754b0669/gobblin-cluster/src/test/java/org/apache/gobblin/cluster/TaskRunnerSuiteForJobFactoryTest.java ---------------------------------------------------------------------- diff --git a/gobblin-cluster/src/test/java/org/apache/gobblin/cluster/TaskRunnerSuiteForJobFactoryTest.java b/gobblin-cluster/src/test/java/org/apache/gobblin/cluster/TaskRunnerSuiteForJobFactoryTest.java index dc88a4e..0d973af 100644 --- a/gobblin-cluster/src/test/java/org/apache/gobblin/cluster/TaskRunnerSuiteForJobFactoryTest.java +++ b/gobblin-cluster/src/test/java/org/apache/gobblin/cluster/TaskRunnerSuiteForJobFactoryTest.java @@ -95,12 +95,8 @@ public class TaskRunnerSuiteForJobFactoryTest extends TaskRunnerSuiteThreadModel DistributeJobResult rst = super.getResultFromUserContent(); Assert.assertTrue(!rst.isEarlyStopped()); String jobName = this.jobPlanningProps.getProperty(ConfigurationKeys.JOB_NAME_KEY); - String planningJobId = this.jobPlanningProps.getProperty(GobblinClusterConfigurationKeys.PLANNING_ID_KEY); - try { - String planningJobFromStore = this.jobsMapping.getPlanningJobId(jobName).get(); - Assert.assertTrue(planningJobFromStore.equals(planningJobId)); - + Assert.assertFalse(this.jobsMapping.getPlanningJobId(jobName).isPresent()); } catch (Exception e) { Assert.fail(e.toString()); } http://git-wip-us.apache.org/repos/asf/incubator-gobblin/blob/754b0669/gobblin-runtime/src/main/java/org/apache/gobblin/runtime/job_catalog/NonObservingFSJobCatalog.java ---------------------------------------------------------------------- diff --git a/gobblin-runtime/src/main/java/org/apache/gobblin/runtime/job_catalog/NonObservingFSJobCatalog.java b/gobblin-runtime/src/main/java/org/apache/gobblin/runtime/job_catalog/NonObservingFSJobCatalog.java index 76adca4..e031d75 100644 --- a/gobblin-runtime/src/main/java/org/apache/gobblin/runtime/job_catalog/NonObservingFSJobCatalog.java +++ b/gobblin-runtime/src/main/java/org/apache/gobblin/runtime/job_catalog/NonObservingFSJobCatalog.java @@ -111,11 +111,10 @@ public class NonObservingFSJobCatalog extends FSJobCatalog { if (fs.exists(jobSpecPath)) { fs.delete(jobSpecPath, false); this.mutableMetrics.updateRemoveJobTime(startTime); + this.listeners.onDeleteJob(jobURI, jobSpec.getVersion()); } else { LOGGER.warn("No file with URI:" + jobSpecPath + " is found. Deletion failed."); } - - this.listeners.onDeleteJob(jobURI, jobSpec.getVersion()); } catch (IOException e) { throw new RuntimeException("When removing a JobConf. file, issues unexpected happen:" + e.getMessage()); } catch (SpecNotFoundException e) {
