This is an automated email from the ASF dual-hosted git repository.
suvasude pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/incubator-gobblin.git
The following commit(s) were added to refs/heads/master by this push:
new 4d92192 [GOBBLIN-1230] add option to add spec executor configs to
gaas job
4d92192 is described below
commit 4d921928b9ca35b81ded594bfa8fc1301c6ff9e7
Author: Arjun <[email protected]>
AuthorDate: Wed Sep 16 16:39:45 2020 -0700
[GOBBLIN-1230] add option to add spec executor configs to gaas job
Closes #3076 from arjun4084346/gaasmetrics
---
.../gobblin/runtime/AbstractJobLauncher.java | 8 ++++++
.../gobblin/service/monitoring/JobStatus.java | 7 +++--
.../service/modules/spec/JobExecutionPlan.java | 31 +++++++++++++++-------
.../spec/JobExecutionPlanDagFactoryTest.java | 2 +-
4 files changed, 34 insertions(+), 14 deletions(-)
diff --git
a/gobblin-runtime/src/main/java/org/apache/gobblin/runtime/AbstractJobLauncher.java
b/gobblin-runtime/src/main/java/org/apache/gobblin/runtime/AbstractJobLauncher.java
index cce4c38..595864b 100644
---
a/gobblin-runtime/src/main/java/org/apache/gobblin/runtime/AbstractJobLauncher.java
+++
b/gobblin-runtime/src/main/java/org/apache/gobblin/runtime/AbstractJobLauncher.java
@@ -120,6 +120,8 @@ public abstract class AbstractJobLauncher implements
JobLauncher {
public static final String GOBBLIN_JOB_TEMPLATE_KEY = "gobblin.template.uri";
+ public static final String NUM_WORKUNITS = "numWorkUnits";
+
/** Making {@link AbstractJobLauncher} capable of loading multiple job
templates.
* Keep the original {@link #GOBBLIN_JOB_TEMPLATE_KEY} for
backward-compatibility.
* TODO: Expand support to Gobblin-as-a-Service in FlowTemplateCatalog.
@@ -430,6 +432,7 @@ public abstract class AbstractJobLauncher implements
JobLauncher {
jobState.setState(JobState.RunningState.FAILED);
String errMsg = "Failed to get work units for job " + jobId;
this.jobContext.getJobState().setJobFailureMessage(errMsg);
+ this.jobContext.getJobState().setProp(NUM_WORKUNITS, 0);
throw new JobException(errMsg);
}
@@ -439,9 +442,14 @@ public abstract class AbstractJobLauncher implements
JobLauncher {
LOG.warn("No work units have been created for job " + jobId);
jobState.setState(JobState.RunningState.COMMITTED);
isWorkUnitsEmpty = true;
+ this.jobContext.getJobState().setProp(NUM_WORKUNITS, 0);
return;
}
+ // If it is a streaming source, workunits cannot be counted
+ this.jobContext.getJobState().setProp(NUM_WORKUNITS,
+ workUnitStream.isSafeToMaterialize() ?
workUnitStream.getMaterializedWorkUnitCollection().size() : 0);
+
//Initialize writer and converter(s)
closer.register(WriterInitializerFactory.newInstace(jobState,
workUnitStream)).initialize();
closer.register(ConverterInitializerFactory.newInstance(jobState,
workUnitStream)).initialize();
diff --git
a/gobblin-runtime/src/main/java/org/apache/gobblin/service/monitoring/JobStatus.java
b/gobblin-runtime/src/main/java/org/apache/gobblin/service/monitoring/JobStatus.java
index b1f3371..a436ce0 100644
---
a/gobblin-runtime/src/main/java/org/apache/gobblin/service/monitoring/JobStatus.java
+++
b/gobblin-runtime/src/main/java/org/apache/gobblin/service/monitoring/JobStatus.java
@@ -17,16 +17,14 @@
package org.apache.gobblin.service.monitoring;
-import org.apache.gobblin.annotation.Alpha;
-
import lombok.Builder;
import lombok.Getter;
+import lombok.Setter;
/**
* Contains attributes that describe job status.
*/
-@Alpha
@Builder
@Getter
public class JobStatus {
@@ -41,7 +39,8 @@ public class JobStatus {
private final long orchestratedTime;
private final long startTime;
private final long endTime;
- private final String message;
+ @Setter
+ private String message;
private final long processedCount;
private final String lowWatermark;
private final String highWatermark;
diff --git
a/gobblin-service/src/main/java/org/apache/gobblin/service/modules/spec/JobExecutionPlan.java
b/gobblin-service/src/main/java/org/apache/gobblin/service/modules/spec/JobExecutionPlan.java
index 5697a64..f88b561 100644
---
a/gobblin-service/src/main/java/org/apache/gobblin/service/modules/spec/JobExecutionPlan.java
+++
b/gobblin-service/src/main/java/org/apache/gobblin/service/modules/spec/JobExecutionPlan.java
@@ -19,6 +19,7 @@ package org.apache.gobblin.service.modules.spec;
import java.net.URI;
import java.net.URISyntaxException;
+import java.util.concurrent.ExecutionException;
import java.util.concurrent.Future;
import org.apache.commons.lang3.StringUtils;
@@ -72,17 +73,26 @@ public class JobExecutionPlan {
public JobExecutionPlan createPlan(FlowSpec flowSpec, Config jobConfig,
SpecExecutor specExecutor, Long flowExecutionId, Config sysConfig)
throws URISyntaxException {
- JobSpec jobSpec = buildJobSpec(flowSpec, jobConfig, flowExecutionId,
sysConfig);
- return new JobExecutionPlan(jobSpec, specExecutor);
+ try {
+ JobSpec jobSpec = buildJobSpec(flowSpec, jobConfig, flowExecutionId,
sysConfig, specExecutor.getConfig().get());
+ return new JobExecutionPlan(jobSpec, specExecutor);
+ } catch (InterruptedException | ExecutionException e) {
+ throw new RuntimeException(e);
+ }
}
/**
* Given a resolved job config, this helper method converts the config to
a {@link JobSpec}.
- * @param jobConfig resolved job config.
* @param flowSpec input FlowSpec.
+ * @param jobConfig resolved job config.
+ * @param flowExecutionId flow execution id for the flow
+ * @param sysConfig gobblin service level configs
+ * @param specExecutorConfig configs for the {@link SpecExecutor} of this
{@link JobExecutionPlan}
* @return a {@link JobSpec} corresponding to the resolved job config.
+ * @throws URISyntaxException if creation of {@link JobSpec} URI fails
*/
- private static JobSpec buildJobSpec(FlowSpec flowSpec, Config jobConfig,
Long flowExecutionId, Config sysConfig) throws URISyntaxException {
+ private static JobSpec buildJobSpec(FlowSpec flowSpec, Config jobConfig,
Long flowExecutionId, Config sysConfig, Config specExecutorConfig)
+ throws URISyntaxException {
Config flowConfig = flowSpec.getConfig();
String flowName = ConfigUtils.getString(flowConfig,
ConfigurationKeys.FLOW_NAME_KEY, "");
@@ -134,7 +144,7 @@ public class JobExecutionPlan {
//Add tracking config to JobSpec.
addTrackingEventConfig(jobSpec, sysConfig);
- addAdditionalConfig(jobSpec, sysConfig);
+ addAdditionalConfig(jobSpec, sysConfig, specExecutorConfig);
// Add dynamic config to jobSpec if a dynamic config generator is
specified in sysConfig
DynamicConfigGenerator dynamicConfigGenerator =
DynamicConfigGeneratorFactory.createDynamicConfigGenerator(sysConfig);
@@ -149,12 +159,13 @@ public class JobExecutionPlan {
/**
* A method to add any additional configurations to a JobSpec which need
to be passed to the {@link SpecExecutor}.
- * This enables {@link org.apache.gobblin.metrics.GobblinTrackingEvent}s
- * to be emitted from each Gobblin job orchestrated by
Gobblin-as-a-Service, which will then be used for tracking the
- * execution status of the job.
+ * This enables {@link org.apache.gobblin.metrics.GobblinTrackingEvent}s
to be emitted from each Gobblin job
+ * orchestrated by Gobblin-as-a-Service, which will then be used for
tracking the execution status of the job.
* @param jobSpec representing a fully resolved {@link JobSpec}.
+ * @param sysConfig gobblin service level configs
+ * @param specExecutorConfig configs for the {@link SpecExecutor} of this
{@link JobExecutionPlan}
*/
- private static void addAdditionalConfig(JobSpec jobSpec, Config sysConfig)
{
+ private static void addAdditionalConfig(JobSpec jobSpec, Config sysConfig,
Config specExecutorConfig) {
if
(!(sysConfig.hasPath(ConfigurationKeys.SPECEXECUTOR_CONFIGS_PREFIX_KEY)
&& !Strings.isNullOrEmpty(ConfigUtils.getString(sysConfig,
ConfigurationKeys.SPECEXECUTOR_CONFIGS_PREFIX_KEY, ""))
&&
sysConfig.hasPath(sysConfig.getString(ConfigurationKeys.SPECEXECUTOR_CONFIGS_PREFIX_KEY))))
{
@@ -165,6 +176,8 @@ public class JobExecutionPlan {
Config config =
jobSpec.getConfig().withFallback(ConfigUtils.getConfigOrEmpty(sysConfig,
additionalConfigsPrefix));
+ config =
config.withFallback(ConfigUtils.getConfigOrEmpty(specExecutorConfig,
additionalConfigsPrefix));
+
if (!config.isEmpty()) {
jobSpec.setConfig(config);
}
diff --git
a/gobblin-service/src/test/java/org/apache/gobblin/service/modules/spec/JobExecutionPlanDagFactoryTest.java
b/gobblin-service/src/test/java/org/apache/gobblin/service/modules/spec/JobExecutionPlanDagFactoryTest.java
index d29ed74..fdc2929 100644
---
a/gobblin-service/src/test/java/org/apache/gobblin/service/modules/spec/JobExecutionPlanDagFactoryTest.java
+++
b/gobblin-service/src/test/java/org/apache/gobblin/service/modules/spec/JobExecutionPlanDagFactoryTest.java
@@ -148,7 +148,7 @@ public class JobExecutionPlanDagFactoryTest {
FlowSpec flowSpec =
FlowSpec.builder("testFlowSpec").withConfig(flowConfigs.get(i)).build();
jobExecutionPlans.add(new
JobExecutionPlan.Factory().createPlan(flowSpec,
jobConfig.withValue(ConfigurationKeys.JOB_TEMPLATE_PATH,
- ConfigValueFactory.fromAnyRef("testUri")), null, 0L,
ConfigFactory.empty()));
+ ConfigValueFactory.fromAnyRef("testUri")), new
InMemorySpecExecutor(ConfigFactory.empty()), 0L, ConfigFactory.empty()));
}
Dag<JobExecutionPlan> dag = new
JobExecutionPlanDagFactory().createDag(jobExecutionPlans);