Repository: incubator-gobblin Updated Branches: refs/heads/master be075c629 -> 3bc3d3691
[GOBBLIN-532] Add option to delete job no matter if it is successful or not Closes #2395 from yukuai518/deleteAlways Project: http://git-wip-us.apache.org/repos/asf/incubator-gobblin/repo Commit: http://git-wip-us.apache.org/repos/asf/incubator-gobblin/commit/3bc3d369 Tree: http://git-wip-us.apache.org/repos/asf/incubator-gobblin/tree/3bc3d369 Diff: http://git-wip-us.apache.org/repos/asf/incubator-gobblin/diff/3bc3d369 Branch: refs/heads/master Commit: 3bc3d3691fafdc32128a7511ea2a69d5ba4ddc2e Parents: be075c6 Author: Kuai Yu <[email protected]> Authored: Fri Jul 13 10:20:38 2018 -0700 Committer: Hung Tran <[email protected]> Committed: Fri Jul 13 10:20:38 2018 -0700 ---------------------------------------------------------------------- .../cluster/GobblinClusterConfigurationKeys.java | 6 ++++++ .../gobblin/cluster/GobblinHelixJobScheduler.java | 12 ++++++++++++ 2 files changed, 18 insertions(+) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/incubator-gobblin/blob/3bc3d369/gobblin-cluster/src/main/java/org/apache/gobblin/cluster/GobblinClusterConfigurationKeys.java ---------------------------------------------------------------------- diff --git a/gobblin-cluster/src/main/java/org/apache/gobblin/cluster/GobblinClusterConfigurationKeys.java b/gobblin-cluster/src/main/java/org/apache/gobblin/cluster/GobblinClusterConfigurationKeys.java index 648b5bc..b0badc4 100644 --- a/gobblin-cluster/src/main/java/org/apache/gobblin/cluster/GobblinClusterConfigurationKeys.java +++ b/gobblin-cluster/src/main/java/org/apache/gobblin/cluster/GobblinClusterConfigurationKeys.java @@ -68,6 +68,8 @@ public class GobblinClusterConfigurationKeys { // Should job be executed in the scheduler thread? public static final String JOB_EXECUTE_IN_SCHEDULING_THREAD = GOBBLIN_CLUSTER_PREFIX + "job.executeInSchedulingThread"; public static final boolean JOB_EXECUTE_IN_SCHEDULING_THREAD_DEFAULT = true; + + // Helix related tagging public static final String HELIX_JOB_TAG_KEY = GOBBLIN_CLUSTER_PREFIX + "helixJobTag"; public static final String HELIX_INSTANCE_TAGS_KEY = GOBBLIN_CLUSTER_PREFIX + "helixInstanceTags"; @@ -76,6 +78,10 @@ public class GobblinClusterConfigurationKeys { public static final String PLANNING_CONF_PREFIX = GOBBLIN_CLUSTER_PREFIX + "planning."; public static final String PLANNING_ID_KEY = PLANNING_CONF_PREFIX + "idKey"; + // job spec operation + public static final String JOB_ALWAYS_DELETE = GOBBLIN_CLUSTER_PREFIX + "job.alwaysDelete"; + + /** * A path pointing to a directory that contains job execution files to be executed by Gobblin. This directory can * have a nested structure. http://git-wip-us.apache.org/repos/asf/incubator-gobblin/blob/3bc3d369/gobblin-cluster/src/main/java/org/apache/gobblin/cluster/GobblinHelixJobScheduler.java ---------------------------------------------------------------------- diff --git a/gobblin-cluster/src/main/java/org/apache/gobblin/cluster/GobblinHelixJobScheduler.java b/gobblin-cluster/src/main/java/org/apache/gobblin/cluster/GobblinHelixJobScheduler.java index b991406..3f53c23 100644 --- a/gobblin-cluster/src/main/java/org/apache/gobblin/cluster/GobblinHelixJobScheduler.java +++ b/gobblin-cluster/src/main/java/org/apache/gobblin/cluster/GobblinHelixJobScheduler.java @@ -62,6 +62,7 @@ import org.apache.gobblin.runtime.listeners.JobListener; import org.apache.gobblin.scheduler.JobScheduler; import org.apache.gobblin.scheduler.SchedulerService; import org.apache.gobblin.util.ConfigUtils; +import org.apache.gobblin.util.PropertiesUtils; /** @@ -403,6 +404,7 @@ public class GobblinHelixJobScheduler extends JobScheduler implements StandardMe @Override public void run() { + boolean isDeleted = false; try { ((MetricsTrackingListener)jobListener).metrics.updateTimeBeforeJobLaunching(this.jobConfig); ((MetricsTrackingListener)jobListener).metrics.updateTimeBetweenJobSchedulingAndJobLaunching(this.creationTimeInMillis, System.currentTimeMillis()); @@ -412,11 +414,21 @@ public class GobblinHelixJobScheduler extends JobScheduler implements StandardMe if (GobblinHelixJobScheduler.this.jobCatalog != null) { try { GobblinHelixJobScheduler.this.jobCatalog.remove(new URI(jobUri)); + isDeleted = true; } catch (URISyntaxException e) { LOGGER.error("Failed to remove job with bad uri " + jobUri, e); } } } catch (JobException je) { + boolean alwaysDelete = PropertiesUtils + .getPropAsBoolean(this.jobConfig, GobblinClusterConfigurationKeys.JOB_ALWAYS_DELETE, "false"); + if (alwaysDelete && !isDeleted) { + try { + GobblinHelixJobScheduler.this.jobCatalog.remove(new URI(jobUri)); + } catch (URISyntaxException e) { + LOGGER.error("Always delete " + jobUri + ". Failed to remove job with bad uri " + jobUri, e); + } + } LOGGER.error("Failed to run job " + this.jobConfig.getProperty(ConfigurationKeys.JOB_NAME_KEY), je); } }
