This is an automated email from the ASF dual-hosted git repository.
suvasude pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/incubator-gobblin.git
The following commit(s) were added to refs/heads/master by this push:
new 0ac0671 [GOBBLIN-1319] fix cancellation in gobblin cluster jobs
0ac0671 is described below
commit 0ac0671251f39f9a922e68267894099cb668e896
Author: Arjun <[email protected]>
AuthorDate: Mon Nov 30 13:57:54 2020 -0800
[GOBBLIN-1319] fix cancellation in gobblin cluster jobs
Closes #3155 from arjun4084346/flowCancel
---
.../apache/gobblin/runtime/api/SpecExecutor.java | 3 +-
.../gobblin/cluster/FsJobConfigurationManager.java | 3 +
.../apache/gobblin/cluster/GobblinHelixJob.java | 4 +-
.../gobblin/cluster/GobblinHelixJobScheduler.java | 48 +++++++-
.../gobblin/cluster/GobblinHelixJobTask.java | 19 ++--
.../apache/gobblin/cluster/HelixJobsMapping.java | 68 ++++++++----
.../cluster/HelixRetriggeringJobCallable.java | 123 ++++++++++++++-------
.../gobblin/cluster/JobConfigurationManager.java | 6 +
.../cluster/ScheduledJobConfigurationManager.java | 6 +-
.../cluster/StreamingJobConfigurationManager.java | 5 +-
.../cluster/event/CancelJobConfigArrivalEvent.java | 38 +++++++
.../service/StreamingKafkaSpecConsumer.java | 12 ++
.../gobblin/runtime/api/JobCatalogListener.java | 18 +++
.../gobblin/runtime/api/MutableJobCatalog.java | 4 +
.../job_catalog/JobCatalogListenersList.java | 11 ++
.../job_catalog/NonObservingFSJobCatalog.java | 10 ++
.../runtime/job_monitor/KafkaJobMonitor.java | 2 +-
.../runtime/job_monitor/MockedKafkaJobMonitor.java | 2 +-
18 files changed, 300 insertions(+), 82 deletions(-)
diff --git
a/gobblin-api/src/main/java/org/apache/gobblin/runtime/api/SpecExecutor.java
b/gobblin-api/src/main/java/org/apache/gobblin/runtime/api/SpecExecutor.java
index 8569275..c045c5a 100644
--- a/gobblin-api/src/main/java/org/apache/gobblin/runtime/api/SpecExecutor.java
+++ b/gobblin-api/src/main/java/org/apache/gobblin/runtime/api/SpecExecutor.java
@@ -59,7 +59,8 @@ public interface SpecExecutor {
ADD(1, "add"),
UPDATE(2, "update"),
DELETE(3, "delete"),
- UNKNOWN(4, "unknown");
+ UNKNOWN(4, "unknown"),
+ CANCEL(5, "cancel");
private int _id;
private String _verb;
diff --git
a/gobblin-cluster/src/main/java/org/apache/gobblin/cluster/FsJobConfigurationManager.java
b/gobblin-cluster/src/main/java/org/apache/gobblin/cluster/FsJobConfigurationManager.java
index 3b65e42..2563201 100644
---
a/gobblin-cluster/src/main/java/org/apache/gobblin/cluster/FsJobConfigurationManager.java
+++
b/gobblin-cluster/src/main/java/org/apache/gobblin/cluster/FsJobConfigurationManager.java
@@ -114,6 +114,9 @@ public class FsJobConfigurationManager extends
JobConfigurationManager {
this.jobCatalogOptional.get().remove(jobSpec.getUri());
}
postDeleteJobConfigArrival(jobSpec.getUri().toString(),
jobSpec.getConfigAsProperties());
+ } else if (verb.equals(SpecExecutor.Verb.CANCEL)) {
+ // Handle cancel
+ postCancelJobConfigArrival(jobSpec.getUri().toString());
}
try {
diff --git
a/gobblin-cluster/src/main/java/org/apache/gobblin/cluster/GobblinHelixJob.java
b/gobblin-cluster/src/main/java/org/apache/gobblin/cluster/GobblinHelixJob.java
index db34e3e..fe3cc17 100644
---
a/gobblin-cluster/src/main/java/org/apache/gobblin/cluster/GobblinHelixJob.java
+++
b/gobblin-cluster/src/main/java/org/apache/gobblin/cluster/GobblinHelixJob.java
@@ -44,7 +44,7 @@ import org.apache.gobblin.scheduler.JobScheduler;
@Alpha
@Slf4j
public class GobblinHelixJob extends BaseGobblinJob implements
InterruptableJob {
- private Future cancellable = null;
+ private Future<?> cancellable = null;
@Override
public void executeImpl(JobExecutionContext context) throws
JobExecutionException {
@@ -56,7 +56,7 @@ public class GobblinHelixJob extends BaseGobblinJob
implements InterruptableJob
final JobListener jobListener = (JobListener)
dataMap.get(JobScheduler.JOB_LISTENER_KEY);
try {
- if
(Boolean.valueOf(jobProps.getProperty(GobblinClusterConfigurationKeys.JOB_EXECUTE_IN_SCHEDULING_THREAD,
+ if
(Boolean.parseBoolean(jobProps.getProperty(GobblinClusterConfigurationKeys.JOB_EXECUTE_IN_SCHEDULING_THREAD,
Boolean.toString(GobblinClusterConfigurationKeys.JOB_EXECUTE_IN_SCHEDULING_THREAD_DEFAULT))))
{
jobScheduler.runJob(jobProps, jobListener);
} else {
diff --git
a/gobblin-cluster/src/main/java/org/apache/gobblin/cluster/GobblinHelixJobScheduler.java
b/gobblin-cluster/src/main/java/org/apache/gobblin/cluster/GobblinHelixJobScheduler.java
index 8959378..e26f5dd 100644
---
a/gobblin-cluster/src/main/java/org/apache/gobblin/cluster/GobblinHelixJobScheduler.java
+++
b/gobblin-cluster/src/main/java/org/apache/gobblin/cluster/GobblinHelixJobScheduler.java
@@ -17,6 +17,7 @@
package org.apache.gobblin.cluster;
+import java.io.IOException;
import java.util.Collection;
import java.util.Collections;
import java.util.List;
@@ -50,6 +51,7 @@ import com.google.common.util.concurrent.Striped;
import com.typesafe.config.Config;
import org.apache.gobblin.annotation.Alpha;
+import org.apache.gobblin.cluster.event.CancelJobConfigArrivalEvent;
import org.apache.gobblin.cluster.event.DeleteJobConfigArrivalEvent;
import org.apache.gobblin.cluster.event.NewJobConfigArrivalEvent;
import org.apache.gobblin.cluster.event.UpdateJobConfigArrivalEvent;
@@ -84,7 +86,7 @@ import org.apache.gobblin.util.PropertiesUtils;
* <p> More details can be found at {@link HelixRetriggeringJobCallable}.
*/
@Alpha
-public class GobblinHelixJobScheduler extends JobScheduler implements
StandardMetricsBridge{
+public class GobblinHelixJobScheduler extends JobScheduler implements
StandardMetricsBridge {
private static final Logger LOGGER =
LoggerFactory.getLogger(GobblinHelixJobScheduler.class);
private static final String COMMON_JOB_PROPS = "gobblin.common.job.props";
@@ -205,9 +207,9 @@ public class GobblinHelixJobScheduler extends JobScheduler
implements StandardMe
if (cleanAllDistJobs) {
for (org.apache.gobblin.configuration.State state :
this.jobsMapping.getAllStates()) {
- String jobName = state.getId();
- LOGGER.info("Delete mapping for job " + jobName);
- this.jobsMapping.deleteMapping(jobName);
+ String jobUri = state.getId();
+ LOGGER.info("Delete mapping for job " + jobUri);
+ this.jobsMapping.deleteMapping(jobUri);
}
}
}
@@ -346,7 +348,7 @@ public class GobblinHelixJobScheduler extends JobScheduler
implements StandardMe
}
private void waitForJobCompletion(String jobName) {
- while (this.jobRunningMap.getOrDefault(jobName, false) != false) {
+ while (this.jobRunningMap.getOrDefault(jobName, false)) {
LOGGER.info("Waiting for job {} to stop...", jobName);
try {
Thread.sleep(1000);
@@ -367,6 +369,42 @@ public class GobblinHelixJobScheduler extends JobScheduler
implements StandardMe
}
}
+ @Subscribe
+ public void handleCancelJobConfigArrival(CancelJobConfigArrivalEvent
cancelJobArrival)
+ throws InterruptedException {
+ String jobUri = cancelJobArrival.getJoburi();
+ LOGGER.info("Received cancel for job configuration of job " + jobUri);
+ Optional<String> distributedJobMode;
+ Optional<String> planningJob = Optional.empty();
+ Optional<String> actualJob = Optional.empty();
+
+ this.jobSchedulerMetrics.numCancellationStart.incrementAndGet();
+
+ try {
+ distributedJobMode = this.jobsMapping.getDistributedJobMode(jobUri);
+ if (distributedJobMode.isPresent() &&
Boolean.parseBoolean(distributedJobMode.get())) {
+ planningJob = this.jobsMapping.getPlanningJobId(jobUri);
+ } else {
+ actualJob = this.jobsMapping.getActualJobId(jobUri);
+ }
+ } catch (IOException e) {
+ LOGGER.warn("jobsMapping could not be retrieved for job {}", jobUri);
+ return;
+ }
+
+ if (planningJob.isPresent()) {
+ LOGGER.info("Cancelling planning job helix workflow: {}",
planningJob.get());
+ new
TaskDriver(this.taskDriverHelixManager.get()).waitToStop(planningJob.get(),
this.helixJobStopTimeoutMillis);
+ }
+
+ if (actualJob.isPresent()) {
+ LOGGER.info("Cancelling actual job helix workflow: {}", actualJob.get());
+ new TaskDriver(this.jobHelixManager).waitToStop(actualJob.get(),
this.helixJobStopTimeoutMillis);
+ }
+
+ this.jobSchedulerMetrics.numCancellationStart.decrementAndGet();
+ }
+
private void cancelJobIfRequired(DeleteJobConfigArrivalEvent
deleteJobArrival) throws InterruptedException {
Properties jobConfig = deleteJobArrival.getJobConfig();
if (PropertiesUtils.getPropAsBoolean(jobConfig,
GobblinClusterConfigurationKeys.CANCEL_RUNNING_JOB_ON_DELETE,
diff --git
a/gobblin-cluster/src/main/java/org/apache/gobblin/cluster/GobblinHelixJobTask.java
b/gobblin-cluster/src/main/java/org/apache/gobblin/cluster/GobblinHelixJobTask.java
index aa2d7fa..78127b2 100644
---
a/gobblin-cluster/src/main/java/org/apache/gobblin/cluster/GobblinHelixJobTask.java
+++
b/gobblin-cluster/src/main/java/org/apache/gobblin/cluster/GobblinHelixJobTask.java
@@ -65,7 +65,7 @@ class GobblinHelixJobTask implements Task {
private final String planningJobId;
private final HelixManager jobHelixManager;
private final Path appWorkDir;
- private final String jobName;
+ private final String jobUri;
private final List<? extends Tag<?>> metadataTags;
private GobblinHelixJobLauncher launcher;
private GobblinHelixJobTaskMetrics jobTaskMetrics;
@@ -100,7 +100,7 @@ class GobblinHelixJobTask implements Task {
throw new RuntimeException("Job doesn't have planning ID");
}
- this.jobName =
jobPlusSysConfig.getProperty(ConfigurationKeys.JOB_NAME_KEY);
+ this.jobUri =
jobPlusSysConfig.getProperty(GobblinClusterConfigurationKeys.JOB_SPEC_URI);
this.planningJobId =
jobPlusSysConfig.getProperty(GobblinClusterConfigurationKeys.PLANNING_ID_KEY);
this.jobsMapping = jobsMapping;
this.appWorkDir = builder.getAppWorkPath();
@@ -149,7 +149,7 @@ class GobblinHelixJobTask implements Task {
this.jobTaskMetrics.updateTimeBetweenJobSubmissionAndExecution(this.jobPlusSysConfig);
try (Closer closer = Closer.create()) {
- Optional<String> planningIdFromStateStore =
this.jobsMapping.getPlanningJobId(jobName);
+ Optional<String> planningIdFromStateStore =
this.jobsMapping.getPlanningJobId(jobUri);
long timeOut = PropertiesUtils.getPropAsLong(jobPlusSysConfig,
GobblinClusterConfigurationKeys.HELIX_WORKFLOW_DELETE_TIMEOUT_SECONDS,
@@ -161,7 +161,7 @@ class GobblinHelixJobTask implements Task {
}
while (true) {
- Optional<String> actualJobIdFromStateStore =
this.jobsMapping.getActualJobId(jobName);
+ Optional<String> actualJobIdFromStateStore =
this.jobsMapping.getActualJobId(jobUri);
if (actualJobIdFromStateStore.isPresent()) {
String previousActualJobId = actualJobIdFromStateStore.get();
if (HelixUtils.isJobFinished(previousActualJobId,
previousActualJobId, this.jobHelixManager)) {
@@ -186,7 +186,7 @@ class GobblinHelixJobTask implements Task {
this.launcher = createJobLauncher();
- this.jobsMapping.setActualJobId(jobName, this.planningJobId,
this.launcher.getJobId());
+ this.jobsMapping.setActualJobId(jobUri, this.planningJobId,
this.launcher.getJobId());
closer.register(launcher).launchJob(this.jobLauncherListener);
@@ -205,9 +205,9 @@ class GobblinHelixJobTask implements Task {
} finally {
// always cleanup the job mapping for current job name.
try {
- this.jobsMapping.deleteMapping(jobName);
+ this.jobsMapping.deleteMapping(jobUri);
} catch (Exception e) {
- return new TaskResult(TaskResult.Status.FAILED,"Cannot delete jobs
mapping for job : " + jobName);
+ return new TaskResult(TaskResult.Status.FAILED,"Cannot delete jobs
mapping for job : " + jobUri);
}
}
}
@@ -217,15 +217,16 @@ class GobblinHelixJobTask implements Task {
log.info("Cancelling planning job {}", this.planningJobId);
if (launcher != null) {
try {
+ // this cancel should cancel the helix job which run method submitted,
right?
launcher.cancelJob(this.jobLauncherListener);
} catch (JobException e) {
throw new RuntimeException("Unable to cancel planning job " +
this.planningJobId + ": ", e);
} finally {
// always cleanup the job mapping for current job name.
try {
- this.jobsMapping.deleteMapping(jobName);
+ this.jobsMapping.deleteMapping(jobUri);
} catch (Exception e) {
- throw new RuntimeException("Cannot delete jobs mapping for job : " +
jobName);
+ throw new RuntimeException("Cannot delete jobs mapping for job : " +
jobUri);
}
}
}
diff --git
a/gobblin-cluster/src/main/java/org/apache/gobblin/cluster/HelixJobsMapping.java
b/gobblin-cluster/src/main/java/org/apache/gobblin/cluster/HelixJobsMapping.java
index 11a90da..ab4d87c 100644
---
a/gobblin-cluster/src/main/java/org/apache/gobblin/cluster/HelixJobsMapping.java
+++
b/gobblin-cluster/src/main/java/org/apache/gobblin/cluster/HelixJobsMapping.java
@@ -116,49 +116,73 @@ public class HelixJobsMapping {
this.stateStore.delete(this.distributedStateStoreName, jobName);
}
- public void setPlanningJobId (String jobName, String planningJobId) throws
IOException {
- State state = getOrCreate(distributedStateStoreName, jobName);
- state.setId(jobName);
+ public void setPlanningJobId (String jobUri, String planningJobId) throws
IOException {
+ State state = getOrCreate(distributedStateStoreName, jobUri);
+ state.setId(jobUri);
state.setProp(GobblinClusterConfigurationKeys.PLANNING_ID_KEY,
planningJobId);
- // fs state store use hdfs rename, which assumes the target file doesn't
exist.
- if (stateStore instanceof FsStateStore) {
- this.deleteMapping(jobName);
- }
- this.stateStore.put(distributedStateStoreName, jobName, state);
+ writeToStateStore(jobUri, state);
}
- public void setActualJobId (String jobName, String planningJobId, String
actualJobId) throws IOException {
- State state = getOrCreate(distributedStateStoreName, jobName);
- state.setId(jobName);
- state.setProp(GobblinClusterConfigurationKeys.PLANNING_ID_KEY,
planningJobId);
+ public void setActualJobId (String jobUri, String actualJobId) throws
IOException {
+ setActualJobId(jobUri, null, actualJobId);
+ }
+
+ public void setActualJobId (String jobUri, String planningJobId, String
actualJobId) throws IOException {
+ State state = getOrCreate(distributedStateStoreName, jobUri);
+ state.setId(jobUri);
+ if (null != planningJobId) {
+ state.setProp(GobblinClusterConfigurationKeys.PLANNING_ID_KEY,
planningJobId);
+ }
state.setProp(ConfigurationKeys.JOB_ID_KEY, actualJobId);
+ writeToStateStore(jobUri, state);
+ }
+
+ public void setDistributedJobMode(String jobUri, boolean distributedJobMode)
throws IOException {
+ State state = getOrCreate(distributedStateStoreName, jobUri);
+ state.setId(jobUri);
+
state.setProp(GobblinClusterConfigurationKeys.DISTRIBUTED_JOB_LAUNCHER_ENABLED,
distributedJobMode);
+ writeToStateStore(jobUri, state);
+ }
+
+ private void writeToStateStore(String jobUri, State state) throws
IOException {
// fs state store use hdfs rename, which assumes the target file doesn't
exist.
- if (stateStore instanceof FsStateStore) {
- this.deleteMapping(jobName);
+ if (this.stateStore instanceof FsStateStore) {
+ this.deleteMapping(jobUri);
}
- this.stateStore.put(distributedStateStoreName, jobName, state);
+ this.stateStore.put(distributedStateStoreName, jobUri, state);
}
- private Optional<String> getId (String jobName, String idName) throws
IOException {
- State state = this.stateStore.get(distributedStateStoreName, jobName,
jobName);
+ private Optional<String> getId (String jobUri, String idName) throws
IOException {
+ State state = this.stateStore.get(distributedStateStoreName, jobUri,
jobUri);
if (state == null) {
return Optional.empty();
}
String id = state.getProp(idName);
- return id == null? Optional.empty() : Optional.of(id);
+ return id == null ? Optional.empty() : Optional.of(id);
}
public List<State> getAllStates() throws IOException {
return this.stateStore.getAll(distributedStateStoreName);
}
- public Optional<String> getActualJobId (String jobName) throws IOException {
- return getId(jobName, ConfigurationKeys.JOB_ID_KEY);
+ public Optional<String> getActualJobId (String jobUri) throws IOException {
+ return getId(jobUri, ConfigurationKeys.JOB_ID_KEY);
+ }
+
+ public Optional<String> getPlanningJobId (String jobUri) throws IOException {
+ return getId(jobUri, GobblinClusterConfigurationKeys.PLANNING_ID_KEY);
}
- public Optional<String> getPlanningJobId (String jobName) throws IOException
{
- return getId(jobName, GobblinClusterConfigurationKeys.PLANNING_ID_KEY);
+ public Optional<String> getDistributedJobMode(String jobUri) throws
IOException {
+ State state = this.stateStore.get(distributedStateStoreName, jobUri,
jobUri);
+ if (state == null) {
+ return Optional.empty();
+ }
+
+ String id =
state.getProp(GobblinClusterConfigurationKeys.DISTRIBUTED_JOB_LAUNCHER_ENABLED);
+
+ return id == null ? Optional.empty() : Optional.of(id);
}
}
diff --git
a/gobblin-cluster/src/main/java/org/apache/gobblin/cluster/HelixRetriggeringJobCallable.java
b/gobblin-cluster/src/main/java/org/apache/gobblin/cluster/HelixRetriggeringJobCallable.java
index bfa2dce..251cb0e 100644
---
a/gobblin-cluster/src/main/java/org/apache/gobblin/cluster/HelixRetriggeringJobCallable.java
+++
b/gobblin-cluster/src/main/java/org/apache/gobblin/cluster/HelixRetriggeringJobCallable.java
@@ -153,6 +153,12 @@ class HelixRetriggeringJobCallable implements Callable {
"false");
try {
+ this.jobsMapping.setDistributedJobMode(this.jobUri,
this.isDistributeJobEnabled);
+ } catch (IOException e) {
+ throw new JobException("Could not update jobsMapping", e);
+ }
+
+ try {
if (this.isDistributeJobEnabled) {
runJobExecutionLauncher();
} else {
@@ -197,8 +203,25 @@ class HelixRetriggeringJobCallable implements Callable {
*/
private void runJobLauncherLoop() throws JobException {
try {
+ // Check if any existing planning job is running
+ Optional<String> actualJobIdFromStore =
jobsMapping.getActualJobId(this.jobUri);
+
+ if (actualJobIdFromStore.isPresent() &&
!canRun(actualJobIdFromStore.get(), this.jobHelixManager)) {
+ return;
+ }
+
+ String actualJobId = jobProps.containsKey(ConfigurationKeys.JOB_ID_KEY)
+ ? jobProps.getProperty(ConfigurationKeys.JOB_ID_KEY) :
HelixJobsMapping.createActualJobId(jobProps);
+ log.info("Job {} creates actual job {}", this.jobUri, actualJobId);
+
+ jobProps.setProperty(ConfigurationKeys.JOB_ID_KEY, actualJobId);
+
+ /* create the job launcher after setting ConfigurationKeys.JOB_ID_KEY */
+ currentJobLauncher = this.jobScheduler.buildJobLauncher(jobProps);
+
+ this.jobsMapping.setActualJobId(this.jobUri, actualJobId);
+
while (true) {
- currentJobLauncher = this.jobScheduler.buildJobLauncher(jobProps);
// in "run once" case, job scheduler will remove current job from the
scheduler
boolean isEarlyStopped = this.jobScheduler.runJob(jobProps,
jobListener, currentJobLauncher);
boolean isRetriggerEnabled = this.isRetriggeringEnabled();
@@ -207,11 +230,19 @@ class HelixRetriggeringJobCallable implements Callable {
} else {
break;
}
- currentJobLauncher = null;
}
+
} catch (Exception e) {
log.error("Failed to run job {}",
jobProps.getProperty(ConfigurationKeys.JOB_NAME_KEY), e);
throw new JobException("Failed to run job " +
jobProps.getProperty(ConfigurationKeys.JOB_NAME_KEY), e);
+ } finally {
+ currentJobLauncher = null;
+ // always cleanup the job mapping for current job name.
+ try {
+ this.jobsMapping.deleteMapping(jobUri);
+ } catch (Exception e) {
+ throw new JobException("Cannot delete jobs mapping for job : " +
jobUri);
+ }
}
}
@@ -225,7 +256,6 @@ class HelixRetriggeringJobCallable implements Callable {
private void runJobExecutionLauncher() throws JobException {
long startTime = 0;
String newPlanningId;
- String jobName = jobProps.getProperty(ConfigurationKeys.JOB_NAME_KEY);
Closer closer = Closer.create();
try {
HelixManager planningJobManager =
this.taskDriverHelixManager.isPresent()?
@@ -235,42 +265,19 @@ class HelixRetriggeringJobCallable implements Callable {
GobblinHelixDistributeJobExecutionLauncher.Builder.class.getName());
// Check if any existing planning job is running
- Optional<String> planningJobIdFromStore =
jobsMapping.getPlanningJobId(jobName);
+ Optional<String> planningJobIdFromStore =
jobsMapping.getPlanningJobId(this.jobUri);
boolean nonblocking = false;
// start of critical section to check if a job with same job name is
running
- Lock jobLock = locks.get(jobName);
+ Lock jobLock = locks.get(this.jobUri);
jobLock.lock();
try {
- if (planningJobIdFromStore.isPresent()) {
- String previousPlanningJobId = planningJobIdFromStore.get();
-
- if (HelixUtils.isJobFinished(previousPlanningJobId,
previousPlanningJobId, planningJobManager)) {
- log.info("Previous planning job {} has reached to the final state.
Start a new one.", previousPlanningJobId);
- } else {
- boolean killDuplicateJob = PropertiesUtils
- .getPropAsBoolean(this.jobProps,
GobblinClusterConfigurationKeys.KILL_DUPLICATE_PLANNING_JOB,
String.valueOf(GobblinClusterConfigurationKeys.DEFAULT_KILL_DUPLICATE_PLANNING_JOB));
-
- if (!killDuplicateJob) {
- log.info("Previous planning job {} has not finished yet. Skip
this job.", previousPlanningJobId);
- return;
- } else {
- log.info("Previous planning job {} has not finished yet. Kill
it.", previousPlanningJobId);
- long timeOut = PropertiesUtils.getPropAsLong(sysProps,
GobblinClusterConfigurationKeys.HELIX_WORKFLOW_DELETE_TIMEOUT_SECONDS,
-
GobblinClusterConfigurationKeys.DEFAULT_HELIX_WORKFLOW_DELETE_TIMEOUT_SECONDS)
* 1000;
- try {
- HelixUtils.deleteWorkflow(previousPlanningJobId,
planningJobManager, timeOut);
- } catch (HelixException e) {
- log.info("Helix cannot delete previous planning job id {}
within {} seconds.", previousPlanningJobId,
- timeOut / 1000);
- throw new JobException("Helix cannot delete previous planning
job id " + previousPlanningJobId, e);
- }
- }
- }
- } else {
- log.info("Planning job for {} does not exist. First time run.",
jobName);
+ if (planningJobIdFromStore.isPresent() &&
!canRun(planningJobIdFromStore.get(), planningJobManager)) {
+ return;
}
+ log.info("Planning job for {} does not exist. First time run.",
this.jobUri);
+
GobblinHelixDistributeJobExecutionLauncher.Builder builder =
GobblinConstructorUtils.<GobblinHelixDistributeJobExecutionLauncher.Builder>invokeLongestConstructor(
new
ClassAliasResolver(GobblinHelixDistributeJobExecutionLauncher.Builder.class).resolveClass(builderStr));
@@ -297,13 +304,14 @@ class HelixRetriggeringJobCallable implements Callable {
Config combined = ConfigUtils.propertiesToConfig(jobPlanningProps)
.withFallback(ConfigUtils.propertiesToConfig(sysProps));
- nonblocking = ConfigUtils.getBoolean(combined,
GobblinClusterConfigurationKeys.NON_BLOCKING_PLANNING_JOB_ENABLED,
-
GobblinClusterConfigurationKeys.DEFAULT_NON_BLOCKING_PLANNING_JOB_ENABLED);
+ nonblocking = ConfigUtils
+ .getBoolean(combined,
GobblinClusterConfigurationKeys.NON_BLOCKING_PLANNING_JOB_ENABLED,
+
GobblinClusterConfigurationKeys.DEFAULT_NON_BLOCKING_PLANNING_JOB_ENABLED);
log.info("Planning job {} started.", newPlanningId);
GobblinHelixDistributeJobExecutionLauncher launcher = builder.build();
closer.register(launcher);
- this.jobsMapping.setPlanningJobId(jobName, newPlanningId);
+ this.jobsMapping.setPlanningJobId(this.jobUri, newPlanningId);
startTime = System.currentTimeMillis();
this.currentJobMonitor = launcher.launchJob(null);
@@ -311,7 +319,6 @@ class HelixRetriggeringJobCallable implements Callable {
// so that the same critical section check (querying Helix for job
completeness)
// can be applied.
HelixUtils.waitJobInitialization(planningJobManager, newPlanningId,
newPlanningId);
-
} finally {
// end of the critical section to check if a job with same job name is
running
jobLock.unlock();
@@ -334,15 +341,53 @@ class HelixRetriggeringJobCallable implements Callable {
if (startTime != 0) {
this.planningJobLauncherMetrics.updateTimeForFailedPlanningJobs(startTime);
}
- log.error("Failed to run planning job {}", jobName, e);
- throw new JobException("Failed to run planning job " + jobName, e);
+ log.error("Failed to run planning job for {}", this.jobUri, e);
+ throw new JobException("Failed to run planning job for " + this.jobUri,
e);
} finally {
try {
closer.close();
} catch (IOException e) {
- throw new JobException("Cannot properly close planning job " +
jobName, e);
+ throw new JobException("Cannot properly close planning job for " +
this.jobUri, e);
+ }
+ }
+ }
+
+ /**
+ * This method checks if a job can be submitted to helix for execution.
+ * A job can run, 1) if the previous job with the same job id is finished,
+ * 2) if the previous job is running but can be killed specified by
property
+ * {@link GobblinClusterConfigurationKeys#KILL_DUPLICATE_PLANNING_JOB},
default being true
+ * @param previousJobId job id from the previous execution
+ * @param helixManager helix manager
+ * @return true if the job can run on Helix
+ * @throws JobException if previous job deletion fails
+ * @throws InterruptedException
+ */
+ private boolean canRun(String previousJobId, HelixManager helixManager)
throws JobException, InterruptedException {
+ if (HelixUtils.isJobFinished(previousJobId, previousJobId, helixManager)) {
+ log.info("Previous planning job {} has reached to the final state. Start
a new one.", previousJobId);
+ } else {
+ boolean killDuplicateJob = PropertiesUtils
+ .getPropAsBoolean(this.jobProps,
GobblinClusterConfigurationKeys.KILL_DUPLICATE_PLANNING_JOB,
String.valueOf(GobblinClusterConfigurationKeys.DEFAULT_KILL_DUPLICATE_PLANNING_JOB));
+
+ if (!killDuplicateJob) {
+ log.info("Previous planning job {} has not finished yet. Skip this
job.", previousJobId);
+ return false;
+ } else {
+ log.info("Previous planning job {} has not finished yet. Kill it.",
previousJobId);
+ long timeOut = PropertiesUtils
+ .getPropAsLong(sysProps,
GobblinClusterConfigurationKeys.HELIX_WORKFLOW_DELETE_TIMEOUT_SECONDS,
+
GobblinClusterConfigurationKeys.DEFAULT_HELIX_WORKFLOW_DELETE_TIMEOUT_SECONDS)
* 1000;
+ try {
+ HelixUtils.deleteWorkflow(previousJobId, helixManager, timeOut);
+ } catch (HelixException e) {
+ log.info("Helix cannot delete previous planning job id {} within {}
seconds.", previousJobId,
+ timeOut / 1000);
+ throw new JobException("Helix cannot delete previous planning job id
" + previousJobId, e);
+ }
}
}
+ return true;
}
void cancel() throws JobException {
diff --git
a/gobblin-cluster/src/main/java/org/apache/gobblin/cluster/JobConfigurationManager.java
b/gobblin-cluster/src/main/java/org/apache/gobblin/cluster/JobConfigurationManager.java
index c2ba8a4..86b5219 100644
---
a/gobblin-cluster/src/main/java/org/apache/gobblin/cluster/JobConfigurationManager.java
+++
b/gobblin-cluster/src/main/java/org/apache/gobblin/cluster/JobConfigurationManager.java
@@ -22,6 +22,7 @@ import java.io.IOException;
import java.util.List;
import java.util.Properties;
+import org.apache.gobblin.cluster.event.CancelJobConfigArrivalEvent;
import org.apache.gobblin.runtime.job_spec.JobSpecResolver;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
@@ -132,4 +133,9 @@ public class JobConfigurationManager extends
AbstractIdleService implements Stan
LOGGER.info(String.format("Posting delete JobConfig with name: %s and
config: %s", jobName, jobConfig));
this.eventBus.post(new DeleteJobConfigArrivalEvent(jobName, jobConfig));
}
+
+ protected void postCancelJobConfigArrival(String jobUri) {
+ LOGGER.info(String.format("Posting cancel JobConfig with name: %s",
jobUri));
+ this.eventBus.post(new CancelJobConfigArrivalEvent(jobUri));
+ }
}
diff --git
a/gobblin-cluster/src/main/java/org/apache/gobblin/cluster/ScheduledJobConfigurationManager.java
b/gobblin-cluster/src/main/java/org/apache/gobblin/cluster/ScheduledJobConfigurationManager.java
index e311507..601ed90 100644
---
a/gobblin-cluster/src/main/java/org/apache/gobblin/cluster/ScheduledJobConfigurationManager.java
+++
b/gobblin-cluster/src/main/java/org/apache/gobblin/cluster/ScheduledJobConfigurationManager.java
@@ -140,7 +140,11 @@ public class ScheduledJobConfigurationManager extends
JobConfigurationManager {
Spec anonymousSpec = (Spec) entry.getValue();
postDeleteJobConfigArrival(anonymousSpec.getUri().toString(), new
Properties());
jobSpecs.remove(entry.getValue().getUri());
- }
+ } else if (verb.equals(SpecExecutor.Verb.CANCEL)) {
+ // Handle cancel
+ Spec anonymousSpec = entry.getValue();
+ postCancelJobConfigArrival(anonymousSpec.getUri().toString());
+ }
}
}
diff --git
a/gobblin-cluster/src/main/java/org/apache/gobblin/cluster/StreamingJobConfigurationManager.java
b/gobblin-cluster/src/main/java/org/apache/gobblin/cluster/StreamingJobConfigurationManager.java
index a18e759..d731e80 100644
---
a/gobblin-cluster/src/main/java/org/apache/gobblin/cluster/StreamingJobConfigurationManager.java
+++
b/gobblin-cluster/src/main/java/org/apache/gobblin/cluster/StreamingJobConfigurationManager.java
@@ -158,8 +158,11 @@ public class StreamingJobConfigurationManager extends
JobConfigurationManager {
postUpdateJobConfigArrival(jobSpec.getUri().toString(),
jobSpec.getConfigAsProperties());
} else if (verb.equals(SpecExecutor.Verb.DELETE)) {
// Handle delete
- Spec anonymousSpec = (Spec) entry.getValue();
+ Spec anonymousSpec = entry.getValue();
postDeleteJobConfigArrival(anonymousSpec.getUri().toString(), new
Properties());
+ } else if (verb.equals(SpecExecutor.Verb.CANCEL)) {
+ Spec anonymousSpec = entry.getValue();
+ postCancelJobConfigArrival(anonymousSpec.getUri().toString());
}
}
}
diff --git
a/gobblin-cluster/src/main/java/org/apache/gobblin/cluster/event/CancelJobConfigArrivalEvent.java
b/gobblin-cluster/src/main/java/org/apache/gobblin/cluster/event/CancelJobConfigArrivalEvent.java
new file mode 100644
index 0000000..6be9e8f
--- /dev/null
+++
b/gobblin-cluster/src/main/java/org/apache/gobblin/cluster/event/CancelJobConfigArrivalEvent.java
@@ -0,0 +1,38 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.gobblin.cluster.event;
+
+import java.util.Properties;
+
+
+public class CancelJobConfigArrivalEvent {
+ private final String jobUri;
+
+ public CancelJobConfigArrivalEvent(String jobUri) {
+ this.jobUri = jobUri;
+ }
+
+ /**
+ * Get the job uri.
+ *
+ * @return the job uri
+ */
+ public String getJoburi() {
+ return this.jobUri;
+ }
+}
\ No newline at end of file
diff --git
a/gobblin-modules/gobblin-service-kafka/src/main/java/org/apache/gobblin/service/StreamingKafkaSpecConsumer.java
b/gobblin-modules/gobblin-service-kafka/src/main/java/org/apache/gobblin/service/StreamingKafkaSpecConsumer.java
index f7a0614..515a108 100644
---
a/gobblin-modules/gobblin-service-kafka/src/main/java/org/apache/gobblin/service/StreamingKafkaSpecConsumer.java
+++
b/gobblin-modules/gobblin-service-kafka/src/main/java/org/apache/gobblin/service/StreamingKafkaSpecConsumer.java
@@ -179,6 +179,18 @@ public class StreamingKafkaSpecConsumer extends
AbstractIdleService implements S
}
}
+ @Override
+ public void onCancelJob(URI cancelledJobURI) {
+ super.onCancelJob(cancelledJobURI);
+ try {
+ JobSpec.Builder jobSpecBuilder = JobSpec.builder(cancelledJobURI);
+ jobSpecBuilder.withConfigAsProperties(new Properties());
+ _jobSpecQueue.put(new ImmutablePair<>(SpecExecutor.Verb.CANCEL,
jobSpecBuilder.build()));
+ } catch (InterruptedException e) {
+ Thread.currentThread().interrupt();
+ }
+ }
+
@Override public void onUpdateJob(JobSpec updatedJob) {
super.onUpdateJob(updatedJob);
diff --git
a/gobblin-runtime/src/main/java/org/apache/gobblin/runtime/api/JobCatalogListener.java
b/gobblin-runtime/src/main/java/org/apache/gobblin/runtime/api/JobCatalogListener.java
index c670573..7ef97e6 100644
---
a/gobblin-runtime/src/main/java/org/apache/gobblin/runtime/api/JobCatalogListener.java
+++
b/gobblin-runtime/src/main/java/org/apache/gobblin/runtime/api/JobCatalogListener.java
@@ -37,6 +37,9 @@ public interface JobCatalogListener {
*/
public void onDeleteJob(URI deletedJobURI, String deletedJobVersion);
+ default void onCancelJob(URI cancelledJobURI) {
+ }
+
/**
* Invoked when the contents of a JobSpec gets updated in the catalog.
*/
@@ -76,6 +79,21 @@ public interface JobCatalogListener {
}
}
+ public static class CancelJobCallback extends Callback<JobCatalogListener,
Void> {
+ private final URI _cancelledJobURI;
+
+ public CancelJobCallback(URI cancelledJobURI) {
+ super(Objects.toStringHelper("onCancelJob").add("cancelJob",
cancelledJobURI).toString());
+ _cancelledJobURI = cancelledJobURI;
+ }
+
+ @Override
+ public Void apply(JobCatalogListener listener) {
+ listener.onCancelJob(_cancelledJobURI);
+ return null;
+ }
+ }
+
public static class UpdateJobCallback extends Callback<JobCatalogListener,
Void> {
private final JobSpec _updatedJob;
public UpdateJobCallback(JobSpec updatedJob) {
diff --git
a/gobblin-runtime/src/main/java/org/apache/gobblin/runtime/api/MutableJobCatalog.java
b/gobblin-runtime/src/main/java/org/apache/gobblin/runtime/api/MutableJobCatalog.java
index 57dfce5..4c12140 100644
---
a/gobblin-runtime/src/main/java/org/apache/gobblin/runtime/api/MutableJobCatalog.java
+++
b/gobblin-runtime/src/main/java/org/apache/gobblin/runtime/api/MutableJobCatalog.java
@@ -48,6 +48,10 @@ public interface MutableJobCatalog extends JobCatalog {
* */
public void put(JobSpec jobSpec);
+ default void remove(URI uri, boolean alwaysTriggerListeners) {
+ throw new UnsupportedOperationException("Method not implemented by " +
this.getClass());
+ }
+
/**
* Removes an existing JobSpec with the given URI. A no-op if such JobSpec
does not exist.
*/
diff --git
a/gobblin-runtime/src/main/java/org/apache/gobblin/runtime/job_catalog/JobCatalogListenersList.java
b/gobblin-runtime/src/main/java/org/apache/gobblin/runtime/job_catalog/JobCatalogListenersList.java
index 404d8f2..48fb737 100644
---
a/gobblin-runtime/src/main/java/org/apache/gobblin/runtime/job_catalog/JobCatalogListenersList.java
+++
b/gobblin-runtime/src/main/java/org/apache/gobblin/runtime/job_catalog/JobCatalogListenersList.java
@@ -99,6 +99,17 @@ public class JobCatalogListenersList implements
JobCatalogListener, JobCatalogLi
}
@Override
+ public synchronized void onCancelJob(URI cancelledJobURI) {
+ Preconditions.checkNotNull(cancelledJobURI);
+
+ try {
+ _disp.execCallbacks(new CancelJobCallback(cancelledJobURI));
+ } catch (InterruptedException e) {
+ getLog().warn("onCancelJob interrupted.");
+ }
+ }
+
+ @Override
public void close()
throws IOException {
_disp.close();
diff --git
a/gobblin-runtime/src/main/java/org/apache/gobblin/runtime/job_catalog/NonObservingFSJobCatalog.java
b/gobblin-runtime/src/main/java/org/apache/gobblin/runtime/job_catalog/NonObservingFSJobCatalog.java
index e031d75..0cde6c0 100644
---
a/gobblin-runtime/src/main/java/org/apache/gobblin/runtime/job_catalog/NonObservingFSJobCatalog.java
+++
b/gobblin-runtime/src/main/java/org/apache/gobblin/runtime/job_catalog/NonObservingFSJobCatalog.java
@@ -102,6 +102,10 @@ public class NonObservingFSJobCatalog extends FSJobCatalog
{
*/
@Override
public synchronized void remove(URI jobURI) {
+ remove(jobURI, false);
+ }
+
+ public synchronized void remove(URI jobURI, boolean alwaysTriggerListeners) {
Preconditions.checkState(state() == State.RUNNING, String.format("%s is
not running.", this.getClass().getName()));
try {
long startTime = System.currentTimeMillis();
@@ -119,6 +123,12 @@ public class NonObservingFSJobCatalog extends FSJobCatalog
{
throw new RuntimeException("When removing a JobConf. file, issues
unexpected happen:" + e.getMessage());
} catch (SpecNotFoundException e) {
LOGGER.warn("No file with URI:" + jobURI + " is found. Deletion
failed.");
+ } finally {
+ // HelixRetriggeringJobCallable deletes the job file after submitting it
to helix,
+ // so we will have to trigger listeners regardless the existence of job
file to cancel the job
+ if (alwaysTriggerListeners) {
+ this.listeners.onCancelJob(jobURI);
+ }
}
}
}
diff --git
a/gobblin-runtime/src/main/java/org/apache/gobblin/runtime/job_monitor/KafkaJobMonitor.java
b/gobblin-runtime/src/main/java/org/apache/gobblin/runtime/job_monitor/KafkaJobMonitor.java
index ba9f06f..38c3c68 100644
---
a/gobblin-runtime/src/main/java/org/apache/gobblin/runtime/job_monitor/KafkaJobMonitor.java
+++
b/gobblin-runtime/src/main/java/org/apache/gobblin/runtime/job_monitor/KafkaJobMonitor.java
@@ -108,7 +108,7 @@ public abstract class KafkaJobMonitor extends
HighLevelConsumer<byte[], byte[]>
} else if (parsedMessage instanceof Either.Right) {
this.removedSpecs.inc();
URI jobSpecUri = ((Either.Right<JobSpec, URI>)
parsedMessage).getRight();
- this.jobCatalog.remove(jobSpecUri);
+ this.jobCatalog.remove(jobSpecUri, true);
// Delete the job state if it is a delete spec request
deleteStateStore(jobSpecUri);
}
diff --git
a/gobblin-runtime/src/test/java/org/apache/gobblin/runtime/job_monitor/MockedKafkaJobMonitor.java
b/gobblin-runtime/src/test/java/org/apache/gobblin/runtime/job_monitor/MockedKafkaJobMonitor.java
index 0562534..b68bc1a 100644
---
a/gobblin-runtime/src/test/java/org/apache/gobblin/runtime/job_monitor/MockedKafkaJobMonitor.java
+++
b/gobblin-runtime/src/test/java/org/apache/gobblin/runtime/job_monitor/MockedKafkaJobMonitor.java
@@ -90,7 +90,7 @@ public class MockedKafkaJobMonitor extends KafkaJobMonitor {
jobSpecs.remove(uri);
return null;
}
- }).when(jobCatalog).remove(Mockito.any(URI.class));
+ }).when(jobCatalog).remove(Mockito.any(URI.class), Mockito.anyBoolean());
return jobCatalog;
}