This is an automated email from the ASF dual-hosted git repository.

suvasude pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/incubator-gobblin.git


The following commit(s) were added to refs/heads/master by this push:
     new 4d92192  [GOBBLIN-1230] add option to add spec executor configs to 
gaas job
4d92192 is described below

commit 4d921928b9ca35b81ded594bfa8fc1301c6ff9e7
Author: Arjun <[email protected]>
AuthorDate: Wed Sep 16 16:39:45 2020 -0700

    [GOBBLIN-1230] add option to add spec executor configs to gaas job
    
    Closes #3076 from arjun4084346/gaasmetrics
---
 .../gobblin/runtime/AbstractJobLauncher.java       |  8 ++++++
 .../gobblin/service/monitoring/JobStatus.java      |  7 +++--
 .../service/modules/spec/JobExecutionPlan.java     | 31 +++++++++++++++-------
 .../spec/JobExecutionPlanDagFactoryTest.java       |  2 +-
 4 files changed, 34 insertions(+), 14 deletions(-)

diff --git 
a/gobblin-runtime/src/main/java/org/apache/gobblin/runtime/AbstractJobLauncher.java
 
b/gobblin-runtime/src/main/java/org/apache/gobblin/runtime/AbstractJobLauncher.java
index cce4c38..595864b 100644
--- 
a/gobblin-runtime/src/main/java/org/apache/gobblin/runtime/AbstractJobLauncher.java
+++ 
b/gobblin-runtime/src/main/java/org/apache/gobblin/runtime/AbstractJobLauncher.java
@@ -120,6 +120,8 @@ public abstract class AbstractJobLauncher implements 
JobLauncher {
 
   public static final String GOBBLIN_JOB_TEMPLATE_KEY = "gobblin.template.uri";
 
+  public static final String NUM_WORKUNITS = "numWorkUnits";
+
   /** Making {@link AbstractJobLauncher} capable of loading multiple job 
templates.
    * Keep the original {@link #GOBBLIN_JOB_TEMPLATE_KEY} for 
backward-compatibility.
    * TODO: Expand support to Gobblin-as-a-Service in FlowTemplateCatalog.
@@ -430,6 +432,7 @@ public abstract class AbstractJobLauncher implements 
JobLauncher {
           jobState.setState(JobState.RunningState.FAILED);
           String errMsg = "Failed to get work units for job " + jobId;
           this.jobContext.getJobState().setJobFailureMessage(errMsg);
+          this.jobContext.getJobState().setProp(NUM_WORKUNITS, 0);
           throw new JobException(errMsg);
         }
 
@@ -439,9 +442,14 @@ public abstract class AbstractJobLauncher implements 
JobLauncher {
           LOG.warn("No work units have been created for job " + jobId);
           jobState.setState(JobState.RunningState.COMMITTED);
           isWorkUnitsEmpty = true;
+          this.jobContext.getJobState().setProp(NUM_WORKUNITS, 0);
           return;
         }
 
+        // If it is a streaming source, workunits cannot be counted
+        this.jobContext.getJobState().setProp(NUM_WORKUNITS,
+            workUnitStream.isSafeToMaterialize() ? 
workUnitStream.getMaterializedWorkUnitCollection().size() : 0);
+
         //Initialize writer and converter(s)
         closer.register(WriterInitializerFactory.newInstace(jobState, 
workUnitStream)).initialize();
         closer.register(ConverterInitializerFactory.newInstance(jobState, 
workUnitStream)).initialize();
diff --git 
a/gobblin-runtime/src/main/java/org/apache/gobblin/service/monitoring/JobStatus.java
 
b/gobblin-runtime/src/main/java/org/apache/gobblin/service/monitoring/JobStatus.java
index b1f3371..a436ce0 100644
--- 
a/gobblin-runtime/src/main/java/org/apache/gobblin/service/monitoring/JobStatus.java
+++ 
b/gobblin-runtime/src/main/java/org/apache/gobblin/service/monitoring/JobStatus.java
@@ -17,16 +17,14 @@
 
 package org.apache.gobblin.service.monitoring;
 
-import org.apache.gobblin.annotation.Alpha;
-
 import lombok.Builder;
 import lombok.Getter;
+import lombok.Setter;
 
 
 /**
  * Contains attributes that describe job status.
  */
-@Alpha
 @Builder
 @Getter
 public class JobStatus {
@@ -41,7 +39,8 @@ public class JobStatus {
   private final long orchestratedTime;
   private final long startTime;
   private final long endTime;
-  private final String message;
+  @Setter
+  private String message;
   private final long processedCount;
   private final String lowWatermark;
   private final String highWatermark;
diff --git 
a/gobblin-service/src/main/java/org/apache/gobblin/service/modules/spec/JobExecutionPlan.java
 
b/gobblin-service/src/main/java/org/apache/gobblin/service/modules/spec/JobExecutionPlan.java
index 5697a64..f88b561 100644
--- 
a/gobblin-service/src/main/java/org/apache/gobblin/service/modules/spec/JobExecutionPlan.java
+++ 
b/gobblin-service/src/main/java/org/apache/gobblin/service/modules/spec/JobExecutionPlan.java
@@ -19,6 +19,7 @@ package org.apache.gobblin.service.modules.spec;
 
 import java.net.URI;
 import java.net.URISyntaxException;
+import java.util.concurrent.ExecutionException;
 import java.util.concurrent.Future;
 
 import org.apache.commons.lang3.StringUtils;
@@ -72,17 +73,26 @@ public class JobExecutionPlan {
 
     public JobExecutionPlan createPlan(FlowSpec flowSpec, Config jobConfig, 
SpecExecutor specExecutor, Long flowExecutionId, Config sysConfig)
         throws URISyntaxException {
-        JobSpec jobSpec = buildJobSpec(flowSpec, jobConfig, flowExecutionId, 
sysConfig);
-        return new JobExecutionPlan(jobSpec, specExecutor);
+        try {
+          JobSpec jobSpec = buildJobSpec(flowSpec, jobConfig, flowExecutionId, 
sysConfig, specExecutor.getConfig().get());
+          return new JobExecutionPlan(jobSpec, specExecutor);
+        } catch (InterruptedException | ExecutionException e) {
+          throw new RuntimeException(e);
+        }
     }
 
     /**
      * Given a resolved job config, this helper method converts the config to 
a {@link JobSpec}.
-     * @param jobConfig resolved job config.
      * @param flowSpec input FlowSpec.
+     * @param jobConfig resolved job config.
+     * @param flowExecutionId flow execution id for the flow
+     * @param sysConfig gobblin service level configs
+     * @param specExecutorConfig configs for the {@link SpecExecutor} of this 
{@link JobExecutionPlan}
      * @return a {@link JobSpec} corresponding to the resolved job config.
+     * @throws URISyntaxException if creation of {@link JobSpec} URI fails
      */
-    private static JobSpec buildJobSpec(FlowSpec flowSpec, Config jobConfig, 
Long flowExecutionId, Config sysConfig) throws URISyntaxException {
+    private static JobSpec buildJobSpec(FlowSpec flowSpec, Config jobConfig, 
Long flowExecutionId, Config sysConfig, Config specExecutorConfig)
+        throws URISyntaxException {
       Config flowConfig = flowSpec.getConfig();
 
       String flowName = ConfigUtils.getString(flowConfig, 
ConfigurationKeys.FLOW_NAME_KEY, "");
@@ -134,7 +144,7 @@ public class JobExecutionPlan {
       //Add tracking config to JobSpec.
       addTrackingEventConfig(jobSpec, sysConfig);
 
-      addAdditionalConfig(jobSpec, sysConfig);
+      addAdditionalConfig(jobSpec, sysConfig, specExecutorConfig);
 
       // Add dynamic config to jobSpec if a dynamic config generator is 
specified in sysConfig
       DynamicConfigGenerator dynamicConfigGenerator = 
DynamicConfigGeneratorFactory.createDynamicConfigGenerator(sysConfig);
@@ -149,12 +159,13 @@ public class JobExecutionPlan {
 
     /**
      * A method to add any additional configurations to a JobSpec which need 
to be passed to the {@link SpecExecutor}.
-     * This enables {@link org.apache.gobblin.metrics.GobblinTrackingEvent}s
-     * to be emitted from each Gobblin job orchestrated by 
Gobblin-as-a-Service, which will then be used for tracking the
-     * execution status of the job.
+     * This enables {@link org.apache.gobblin.metrics.GobblinTrackingEvent}s 
to be emitted from each Gobblin job
+     * orchestrated by Gobblin-as-a-Service, which will then be used for 
tracking the execution status of the job.
      * @param jobSpec representing a fully resolved {@link JobSpec}.
+     * @param sysConfig gobblin service level configs
+     * @param specExecutorConfig configs for the {@link SpecExecutor} of this 
{@link JobExecutionPlan}
      */
-    private static void addAdditionalConfig(JobSpec jobSpec, Config sysConfig) 
{
+    private static void addAdditionalConfig(JobSpec jobSpec, Config sysConfig, 
Config specExecutorConfig) {
       if 
(!(sysConfig.hasPath(ConfigurationKeys.SPECEXECUTOR_CONFIGS_PREFIX_KEY)
           && !Strings.isNullOrEmpty(ConfigUtils.getString(sysConfig, 
ConfigurationKeys.SPECEXECUTOR_CONFIGS_PREFIX_KEY, ""))
           && 
sysConfig.hasPath(sysConfig.getString(ConfigurationKeys.SPECEXECUTOR_CONFIGS_PREFIX_KEY))))
 {
@@ -165,6 +176,8 @@ public class JobExecutionPlan {
 
       Config config = 
jobSpec.getConfig().withFallback(ConfigUtils.getConfigOrEmpty(sysConfig, 
additionalConfigsPrefix));
 
+      config = 
config.withFallback(ConfigUtils.getConfigOrEmpty(specExecutorConfig, 
additionalConfigsPrefix));
+
       if (!config.isEmpty()) {
         jobSpec.setConfig(config);
       }
diff --git 
a/gobblin-service/src/test/java/org/apache/gobblin/service/modules/spec/JobExecutionPlanDagFactoryTest.java
 
b/gobblin-service/src/test/java/org/apache/gobblin/service/modules/spec/JobExecutionPlanDagFactoryTest.java
index d29ed74..fdc2929 100644
--- 
a/gobblin-service/src/test/java/org/apache/gobblin/service/modules/spec/JobExecutionPlanDagFactoryTest.java
+++ 
b/gobblin-service/src/test/java/org/apache/gobblin/service/modules/spec/JobExecutionPlanDagFactoryTest.java
@@ -148,7 +148,7 @@ public class JobExecutionPlanDagFactoryTest {
 
       FlowSpec flowSpec = 
FlowSpec.builder("testFlowSpec").withConfig(flowConfigs.get(i)).build();
       jobExecutionPlans.add(new 
JobExecutionPlan.Factory().createPlan(flowSpec, 
jobConfig.withValue(ConfigurationKeys.JOB_TEMPLATE_PATH,
-          ConfigValueFactory.fromAnyRef("testUri")), null, 0L, 
ConfigFactory.empty()));
+          ConfigValueFactory.fromAnyRef("testUri")), new 
InMemorySpecExecutor(ConfigFactory.empty()), 0L, ConfigFactory.empty()));
     }
 
     Dag<JobExecutionPlan> dag = new 
JobExecutionPlanDagFactory().createDag(jobExecutionPlans);

Reply via email to