This is an automated email from the ASF dual-hosted git repository.
hutran pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/incubator-gobblin.git
The following commit(s) were added to refs/heads/master by this push:
new 3e38ce3 [GOBBLIN-703] Allow planning job to be running in
non-blocking mode
3e38ce3 is described below
commit 3e38ce3b963b746f1ee55e5a6261f454f8eefaa2
Author: Kuai Yu <[email protected]>
AuthorDate: Mon Mar 25 17:13:05 2019 -0700
[GOBBLIN-703] Allow planning job to be running in non-blocking mode
Closes #2573 from yukuai518/nonblock
---
.../cluster/GobblinClusterConfigurationKeys.java | 2 ++
...GobblinHelixDistributeJobExecutionLauncher.java | 31 +++++++++++++++-------
.../cluster/HelixRetriggeringJobCallable.java | 25 +++++++++++++----
.../org/apache/gobblin/cluster/HelixUtils.java | 1 -
.../cluster/TaskRunnerSuiteForJobFactoryTest.java | 1 -
5 files changed, 44 insertions(+), 16 deletions(-)
diff --git
a/gobblin-cluster/src/main/java/org/apache/gobblin/cluster/GobblinClusterConfigurationKeys.java
b/gobblin-cluster/src/main/java/org/apache/gobblin/cluster/GobblinClusterConfigurationKeys.java
index faeaf17..a760c65 100644
---
a/gobblin-cluster/src/main/java/org/apache/gobblin/cluster/GobblinClusterConfigurationKeys.java
+++
b/gobblin-cluster/src/main/java/org/apache/gobblin/cluster/GobblinClusterConfigurationKeys.java
@@ -151,6 +151,8 @@ public class GobblinClusterConfigurationKeys {
public static final String CLEAN_ALL_DIST_JOBS = GOBBLIN_CLUSTER_PREFIX +
"bootup.clean.dist.jobs";
public static final boolean DEFAULT_CLEAN_ALL_DIST_JOBS = false;
+ public static final String NON_BLOCKING_PLANNING_JOB_ENABLED =
GOBBLIN_CLUSTER_PREFIX + "nonBlocking.planningJob.enabled";
+ public static final boolean DEFAULT_NON_BLOCKING_PLANNING_JOB_ENABLED =
false;
public static final String KILL_DUPLICATE_PLANNING_JOB =
GOBBLIN_CLUSTER_PREFIX + "kill.duplicate.planningJob";
public static final boolean DEFAULT_KILL_DUPLICATE_PLANNING_JOB = true;
diff --git
a/gobblin-cluster/src/main/java/org/apache/gobblin/cluster/GobblinHelixDistributeJobExecutionLauncher.java
b/gobblin-cluster/src/main/java/org/apache/gobblin/cluster/GobblinHelixDistributeJobExecutionLauncher.java
index b14b700..36239a2 100644
---
a/gobblin-cluster/src/main/java/org/apache/gobblin/cluster/GobblinHelixDistributeJobExecutionLauncher.java
+++
b/gobblin-cluster/src/main/java/org/apache/gobblin/cluster/GobblinHelixDistributeJobExecutionLauncher.java
@@ -105,6 +105,9 @@ class GobblinHelixDistributeJobExecutionLauncher implements
JobExecutionLauncher
private volatile boolean cancellationRequested = false;
// A flag indicating whether a cancellation has been executed or not
private volatile boolean cancellationExecuted = false;
+ // A flag indicating wheter a planning job should wait for its completion
+ private boolean nonBlockingMode = false;
+
@Getter
private DistributeJobMonitor jobMonitor;
@@ -126,6 +129,9 @@ class GobblinHelixDistributeJobExecutionLauncher implements
JobExecutionLauncher
GobblinClusterConfigurationKeys.HELIX_WORKFLOW_EXPIRY_TIME_SECONDS,
GobblinClusterConfigurationKeys.DEFAULT_HELIX_WORKFLOW_EXPIRY_TIME_SECONDS);
this.planningJobLauncherMetrics = builder.planningJobLauncherMetrics;
+ this.nonBlockingMode = ConfigUtils.getBoolean(combined,
+ GobblinClusterConfigurationKeys.NON_BLOCKING_PLANNING_JOB_ENABLED,
+
GobblinClusterConfigurationKeys.DEFAULT_NON_BLOCKING_PLANNING_JOB_ENABLED);
this.helixMetrics = builder.helixMetrics;
this.jobsMapping = builder.jobsMapping;
this.helixJobStopTimeoutSeconds = ConfigUtils.getLong(combined,
@@ -274,12 +280,18 @@ class GobblinHelixDistributeJobExecutionLauncher
implements JobExecutionLauncher
submitJobToHelix(planningId, planningId, builder);
GobblinHelixDistributeJobExecutionLauncher.this.helixMetrics.updateTimeForHelixSubmit(submitStartTime);
long waitStartTime = System.currentTimeMillis();
- DistributeJobResult rst = waitForJobCompletion(planningId, planningId);
-
GobblinHelixDistributeJobExecutionLauncher.this.helixMetrics.updateTimeForHelixWait(waitStartTime);
+
+ // we should not wait if in non-blocking mode.
+ DistributeJobResult rst = new DistributeJobResult();
+ if (!GobblinHelixDistributeJobExecutionLauncher.this.nonBlockingMode) {
+ rst = waitForJobCompletion(planningId, planningId);
+
GobblinHelixDistributeJobExecutionLauncher.this.helixMetrics.updateTimeForHelixWait(waitStartTime);
+ }
+
return rst;
} catch (Exception e) {
log.error(planningId + " is not able to submit.");
- return new DistributeJobResult(false);
+ return new DistributeJobResult();
}
}
}
@@ -302,19 +314,20 @@ class GobblinHelixDistributeJobExecutionLauncher
implements JobExecutionLauncher
} catch (TimeoutException te) {
HelixUtils.handleJobTimeout(workFlowName, jobName,
planningJobHelixManager, this, null);
- return new DistributeJobResult(false);
+ return new DistributeJobResult();
}
}
- //TODO: we will remove this logic after we change to polling model.
protected DistributeJobResult getResultFromUserContent() {
- return new DistributeJobResult(false);
+ return new DistributeJobResult();
}
- @Getter
- @AllArgsConstructor
+ /**
+ * If {@link
GobblinClusterConfigurationKeys#NON_BLOCKING_PLANNING_JOB_ENABLED} is enabled
+ * this result object contains nothing; otherwise this result object can be
used to contain
+ * any values returned from other task-driver instances.
+ */
static class DistributeJobResult implements ExecutionResult {
- boolean isEarlyStopped = false;
}
private class DistributeJobMonitor extends FutureTask<ExecutionResult>
implements JobExecutionMonitor {
diff --git
a/gobblin-cluster/src/main/java/org/apache/gobblin/cluster/HelixRetriggeringJobCallable.java
b/gobblin-cluster/src/main/java/org/apache/gobblin/cluster/HelixRetriggeringJobCallable.java
index 5223c71..4a6571f 100644
---
a/gobblin-cluster/src/main/java/org/apache/gobblin/cluster/HelixRetriggeringJobCallable.java
+++
b/gobblin-cluster/src/main/java/org/apache/gobblin/cluster/HelixRetriggeringJobCallable.java
@@ -31,6 +31,7 @@ import org.apache.helix.HelixManager;
import com.google.common.io.Closer;
import com.google.common.util.concurrent.Striped;
+import com.typesafe.config.Config;
import lombok.extern.slf4j.Slf4j;
@@ -41,6 +42,7 @@ import org.apache.gobblin.runtime.api.JobExecutionMonitor;
import org.apache.gobblin.runtime.api.MutableJobCatalog;
import org.apache.gobblin.runtime.listeners.JobListener;
import org.apache.gobblin.util.ClassAliasResolver;
+import org.apache.gobblin.util.ConfigUtils;
import org.apache.gobblin.util.PropertiesUtils;
import org.apache.gobblin.util.reflection.GobblinConstructorUtils;
@@ -234,7 +236,7 @@ class HelixRetriggeringJobCallable implements Callable {
// Check if any existing planning job is running
Optional<String> planningJobIdFromStore =
jobsMapping.getPlanningJobId(jobName);
-
+ boolean nonblocking = false;
// start of critical section to check if a job with same job name is
running
Lock jobLock = locks.get(jobName);
jobLock.lock();
@@ -290,6 +292,14 @@ class HelixRetriggeringJobCallable implements Callable {
builder.setPlanningJobLauncherMetrics(this.planningJobLauncherMetrics);
builder.setHelixMetrics(this.helixMetrics);
+ // if the distributed job launcher should wait for planning job
completion
+
+ Config combined = ConfigUtils.propertiesToConfig(jobPlanningProps)
+ .withFallback(ConfigUtils.propertiesToConfig(sysProps));
+
+ nonblocking = ConfigUtils.getBoolean(combined,
GobblinClusterConfigurationKeys.NON_BLOCKING_PLANNING_JOB_ENABLED,
+
GobblinClusterConfigurationKeys.DEFAULT_NON_BLOCKING_PLANNING_JOB_ENABLED);
+
log.info("Planning job {} started.", newPlanningId);
GobblinHelixDistributeJobExecutionLauncher launcher = builder.build();
closer.register(launcher);
@@ -297,7 +307,7 @@ class HelixRetriggeringJobCallable implements Callable {
startTime = System.currentTimeMillis();
this.currentJobMonitor = launcher.launchJob(null);
- // make sure the planning job will be visible to other parallel
running threads,
+ // make sure the planning job is initialized (or visible) to other
parallel running threads,
// so that the same critical section check (querying Helix for job
completeness)
// can be applied.
HelixUtils.waitJobInitialization(planningJobManager, newPlanningId,
newPlanningId, 300_000);
@@ -310,11 +320,16 @@ class HelixRetriggeringJobCallable implements Callable {
// we can remove the job spec from the catalog because Helix will drive
this job to the end.
this.deleteJobSpec();
+ // If we are using non-blocking mode, this get() only guarantees the
plannning job is submitted.
+ // It doesn't guarantee the job will finish because internally we won't
wait for Helix completion.
this.currentJobMonitor.get();
this.currentJobMonitor = null;
- log.info("Planning job {} finished.", newPlanningId);
-
this.planningJobLauncherMetrics.updateTimeForCompletedPlanningJobs(startTime);
-
+ if (nonblocking) {
+ log.info("Planning job {} submitted successfully.", newPlanningId);
+ } else {
+ log.info("Planning job {} finished.", newPlanningId);
+
this.planningJobLauncherMetrics.updateTimeForCompletedPlanningJobs(startTime);
+ }
} catch (Exception e) {
if (startTime != 0) {
this.planningJobLauncherMetrics.updateTimeForFailedPlanningJobs(startTime);
diff --git
a/gobblin-cluster/src/main/java/org/apache/gobblin/cluster/HelixUtils.java
b/gobblin-cluster/src/main/java/org/apache/gobblin/cluster/HelixUtils.java
index 69ae823..8b587ef 100644
--- a/gobblin-cluster/src/main/java/org/apache/gobblin/cluster/HelixUtils.java
+++ b/gobblin-cluster/src/main/java/org/apache/gobblin/cluster/HelixUtils.java
@@ -47,7 +47,6 @@ import static org.apache.helix.task.TaskState.STOPPED;
*
* @author Yinan Li
*/
-@Alpha
@Slf4j
public class HelixUtils {
diff --git
a/gobblin-cluster/src/test/java/org/apache/gobblin/cluster/TaskRunnerSuiteForJobFactoryTest.java
b/gobblin-cluster/src/test/java/org/apache/gobblin/cluster/TaskRunnerSuiteForJobFactoryTest.java
index 0d973af..d16a62f 100644
---
a/gobblin-cluster/src/test/java/org/apache/gobblin/cluster/TaskRunnerSuiteForJobFactoryTest.java
+++
b/gobblin-cluster/src/test/java/org/apache/gobblin/cluster/TaskRunnerSuiteForJobFactoryTest.java
@@ -93,7 +93,6 @@ public class TaskRunnerSuiteForJobFactoryTest extends
TaskRunnerSuiteThreadModel
protected DistributeJobResult getResultFromUserContent() {
DistributeJobResult rst = super.getResultFromUserContent();
- Assert.assertTrue(!rst.isEarlyStopped());
String jobName =
this.jobPlanningProps.getProperty(ConfigurationKeys.JOB_NAME_KEY);
try {
Assert.assertFalse(this.jobsMapping.getPlanningJobId(jobName).isPresent());