This is an automated email from the ASF dual-hosted git repository.

suvasude pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/incubator-gobblin.git


The following commit(s) were added to refs/heads/master by this push:
     new 678c8f0  [GOBBLIN-1041][Gobblin-1041] send metrics for workunit 
creation time
678c8f0 is described below

commit 678c8f0538287990f44974bc08d5a8a7f4d4876c
Author: Arjun <[email protected]>
AuthorDate: Mon Feb 10 17:51:55 2020 -0800

    [GOBBLIN-1041][Gobblin-1041] send metrics for workunit creation time
    
    Closes #2881 from arjun4084346/benchmarking
---
 .../gobblin/configuration/ConfigurationKeys.java   |  1 +
 .../metrics/reporter/util/MetricReportUtils.java   |  3 ++-
 .../modules/orchestration/AzkabanSpecExecutor.java | 18 +++++++++------
 .../gobblin/runtime/AbstractJobLauncher.java       | 27 ++++++++++++++++------
 .../service/modules/spec/JobExecutionPlan.java     | 25 ++++++++++++++++++++
 .../topology/ConfigBasedTopologySpecFactory.java   |  6 ++++-
 6 files changed, 64 insertions(+), 16 deletions(-)

diff --git 
a/gobblin-api/src/main/java/org/apache/gobblin/configuration/ConfigurationKeys.java
 
b/gobblin-api/src/main/java/org/apache/gobblin/configuration/ConfigurationKeys.java
index 79d1280..df86caa 100644
--- 
a/gobblin-api/src/main/java/org/apache/gobblin/configuration/ConfigurationKeys.java
+++ 
b/gobblin-api/src/main/java/org/apache/gobblin/configuration/ConfigurationKeys.java
@@ -938,6 +938,7 @@ public class ConfigurationKeys {
    */
   public static final String SPECEXECUTOR_INSTANCE_URI_KEY = 
"specExecInstance.uri";
   public static final String SPECEXECUTOR_INSTANCE_CAPABILITIES_KEY = 
"specExecInstance.capabilities";
+  public static final String SPECEXECUTOR_CONFIGS_PREFIX_KEY = 
"specExecutor.additional.configs.key";
 
   /***
    * Configuration properties related to Spec Producer
diff --git 
a/gobblin-metrics-libs/gobblin-metrics-base/src/main/java/org/apache/gobblin/metrics/reporter/util/MetricReportUtils.java
 
b/gobblin-metrics-libs/gobblin-metrics-base/src/main/java/org/apache/gobblin/metrics/reporter/util/MetricReportUtils.java
index 3cdb0a4..2d49606 100644
--- 
a/gobblin-metrics-libs/gobblin-metrics-base/src/main/java/org/apache/gobblin/metrics/reporter/util/MetricReportUtils.java
+++ 
b/gobblin-metrics-libs/gobblin-metrics-base/src/main/java/org/apache/gobblin/metrics/reporter/util/MetricReportUtils.java
@@ -38,9 +38,10 @@ public class MetricReportUtils {
 
   public static final int SCHEMA_VERSION = 1;
   private static Optional<SpecificDatumReader<MetricReport>> READER = 
Optional.absent();
-  // This prefix can be used to distinguish metrics reported by GobblinService 
from other metrics reported by Gobblin
+  // These prefixes can be used to distinguish metrics reported by 
GobblinService from other metrics reported by Gobblin
   // This can be used in conjunction with MetricNameRegexFilter to filter out 
metrics in any MetricReporter
   public static final String GOBBLIN_SERVICE_METRICS_PREFIX = "GobblinService";
+  public static final String GOBBLIN_JOB_METRICS_PREFIX = "JobMetrics";
 
   /**
    * Parses a {@link org.apache.gobblin.metrics.MetricReport} from a byte 
array representing a json input.
diff --git 
a/gobblin-modules/gobblin-azkaban/src/main/java/org/apache/gobblin/service/modules/orchestration/AzkabanSpecExecutor.java
 
b/gobblin-modules/gobblin-azkaban/src/main/java/org/apache/gobblin/service/modules/orchestration/AzkabanSpecExecutor.java
index ba78904..0e6c4a0 100644
--- 
a/gobblin-modules/gobblin-azkaban/src/main/java/org/apache/gobblin/service/modules/orchestration/AzkabanSpecExecutor.java
+++ 
b/gobblin-modules/gobblin-azkaban/src/main/java/org/apache/gobblin/service/modules/orchestration/AzkabanSpecExecutor.java
@@ -18,6 +18,12 @@ package org.apache.gobblin.service.modules.orchestration;
 
 import java.util.concurrent.Future;
 
+import org.slf4j.Logger;
+
+import com.google.common.base.Optional;
+import com.typesafe.config.Config;
+import com.typesafe.config.ConfigFactory;
+
 import org.apache.gobblin.runtime.api.Spec;
 import org.apache.gobblin.runtime.api.SpecProducer;
 import org.apache.gobblin.runtime.spec_executorInstance.AbstractSpecExecutor;
@@ -25,12 +31,6 @@ import org.apache.gobblin.util.CompletedFuture;
 import org.apache.gobblin.util.ConfigUtils;
 import org.apache.gobblin.util.reflection.GobblinConstructorUtils;
 
-import org.slf4j.Logger;
-
-import com.google.common.base.Optional;
-import com.typesafe.config.Config;
-import com.typesafe.config.ConfigFactory;
-
 
 public class AzkabanSpecExecutor extends AbstractSpecExecutor {
 
@@ -55,7 +55,11 @@ public class AzkabanSpecExecutor extends 
AbstractSpecExecutor {
       azkabanSpecProducer = (SpecProducer<Spec>) GobblinConstructorUtils
           .invokeLongestConstructor(producerClass, _config);
     } catch (ReflectiveOperationException e) {
-      throw new RuntimeException("Could not instantiate kafka pusher", e);
+      if (e.getCause() != null) {
+        throw new RuntimeException("Could not instantiate spec producer", 
e.getCause());
+      } else {
+        throw new RuntimeException("Could not instantiate spec producer", e);
+      }
     }
   }
 
diff --git 
a/gobblin-runtime/src/main/java/org/apache/gobblin/runtime/AbstractJobLauncher.java
 
b/gobblin-runtime/src/main/java/org/apache/gobblin/runtime/AbstractJobLauncher.java
index f7cecd4..bd01a51 100644
--- 
a/gobblin-runtime/src/main/java/org/apache/gobblin/runtime/AbstractJobLauncher.java
+++ 
b/gobblin-runtime/src/main/java/org/apache/gobblin/runtime/AbstractJobLauncher.java
@@ -21,25 +21,20 @@ import java.io.IOException;
 import java.net.URI;
 import java.net.URISyntaxException;
 import java.util.ArrayList;
-import java.util.Arrays;
 import java.util.List;
 import java.util.Map;
 import java.util.Properties;
 import java.util.Set;
 import java.util.concurrent.ExecutorService;
 import java.util.concurrent.Executors;
+import java.util.concurrent.TimeUnit;
 
 import org.apache.commons.lang.StringUtils;
-import org.apache.gobblin.runtime.api.JobSpec;
-import org.apache.gobblin.runtime.api.JobTemplate;
-import org.apache.gobblin.runtime.api.SpecNotFoundException;
-import org.apache.gobblin.runtime.job_spec.JobSpecResolver;
-import org.apache.gobblin.runtime.job_spec.ResolvedJobSpec;
-import org.apache.gobblin.util.ConfigUtils;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 import org.slf4j.MDC;
 
+import com.codahale.metrics.MetricRegistry;
 import com.google.common.base.CaseFormat;
 import com.google.common.base.Function;
 import com.google.common.base.Optional;
@@ -65,15 +60,22 @@ import org.apache.gobblin.commit.DeliverySemantics;
 import org.apache.gobblin.configuration.ConfigurationKeys;
 import org.apache.gobblin.configuration.WorkUnitState;
 import org.apache.gobblin.converter.initializer.ConverterInitializerFactory;
+import org.apache.gobblin.metrics.ContextAwareGauge;
 import org.apache.gobblin.metrics.GobblinMetrics;
 import org.apache.gobblin.metrics.GobblinMetricsRegistry;
 import org.apache.gobblin.metrics.MetricContext;
+import org.apache.gobblin.metrics.RootMetricContext;
 import org.apache.gobblin.metrics.Tag;
 import org.apache.gobblin.metrics.event.EventName;
 import org.apache.gobblin.metrics.event.EventSubmitter;
 import org.apache.gobblin.metrics.event.JobEvent;
 import org.apache.gobblin.metrics.event.TimingEvent;
+import org.apache.gobblin.metrics.reporter.util.MetricReportUtils;
 import org.apache.gobblin.runtime.api.EventMetadataGenerator;
+import org.apache.gobblin.runtime.api.JobSpec;
+import org.apache.gobblin.runtime.api.JobTemplate;
+import org.apache.gobblin.runtime.api.SpecNotFoundException;
+import org.apache.gobblin.runtime.job_spec.JobSpecResolver;
 import org.apache.gobblin.runtime.listeners.CloseableJobListener;
 import org.apache.gobblin.runtime.listeners.JobExecutionEventSubmitterListener;
 import org.apache.gobblin.runtime.listeners.JobListener;
@@ -92,6 +94,7 @@ import org.apache.gobblin.source.workunit.WorkUnit;
 import org.apache.gobblin.source.workunit.WorkUnitStream;
 import org.apache.gobblin.util.ClassAliasResolver;
 import org.apache.gobblin.util.ClusterNameTags;
+import org.apache.gobblin.util.ConfigUtils;
 import org.apache.gobblin.util.ExecutorsUtils;
 import org.apache.gobblin.util.Id;
 import org.apache.gobblin.util.JobLauncherUtils;
@@ -412,6 +415,16 @@ public abstract class AbstractJobLauncher implements 
JobLauncher {
         
workUnitsCreationTimer.stop(this.eventMetadataGenerator.getMetadata(this.jobContext,
             EventName.WORK_UNITS_CREATION));
 
+        if (this.runtimeMetricContext.isPresent()) {
+          String workunitCreationGaugeName = MetricRegistry
+              .name(MetricReportUtils.GOBBLIN_JOB_METRICS_PREFIX, 
TimingEvent.LauncherTimings.WORK_UNITS_CREATION,
+                  jobState.getJobName());
+          long workUnitsCreationTime = workUnitsCreationTimer.getDuration() / 
TimeUnit.SECONDS.toMillis(1);
+          ContextAwareGauge<Integer> workunitCreationGauge = 
this.runtimeMetricContext.get()
+              .newContextAwareGauge(workunitCreationGaugeName, () -> (int) 
workUnitsCreationTime);
+          this.runtimeMetricContext.get().register(workunitCreationGaugeName, 
workunitCreationGauge);
+        }
+
         // The absence means there is something wrong getting the work units
         if (workUnitStream == null || workUnitStream.getWorkUnits() == null) {
           this.eventSubmitter.submit(JobEvent.WORK_UNITS_MISSING);
diff --git 
a/gobblin-service/src/main/java/org/apache/gobblin/service/modules/spec/JobExecutionPlan.java
 
b/gobblin-service/src/main/java/org/apache/gobblin/service/modules/spec/JobExecutionPlan.java
index 027bcc6..5697a64 100644
--- 
a/gobblin-service/src/main/java/org/apache/gobblin/service/modules/spec/JobExecutionPlan.java
+++ 
b/gobblin-service/src/main/java/org/apache/gobblin/service/modules/spec/JobExecutionPlan.java
@@ -134,6 +134,8 @@ public class JobExecutionPlan {
       //Add tracking config to JobSpec.
       addTrackingEventConfig(jobSpec, sysConfig);
 
+      addAdditionalConfig(jobSpec, sysConfig);
+
       // Add dynamic config to jobSpec if a dynamic config generator is 
specified in sysConfig
       DynamicConfigGenerator dynamicConfigGenerator = 
DynamicConfigGeneratorFactory.createDynamicConfigGenerator(sysConfig);
       Config dynamicConfig = 
dynamicConfigGenerator.generateDynamicConfig(jobSpec.getConfig().withFallback(sysConfig));
@@ -146,6 +148,29 @@ public class JobExecutionPlan {
     }
 
     /**
+     * A method to add any additional configurations to a JobSpec which need 
to be passed to the {@link SpecExecutor}.
+     * This enables {@link org.apache.gobblin.metrics.GobblinTrackingEvent}s
+     * to be emitted from each Gobblin job orchestrated by 
Gobblin-as-a-Service, which will then be used for tracking the
+     * execution status of the job.
+     * @param jobSpec representing a fully resolved {@link JobSpec}.
+     */
+    private static void addAdditionalConfig(JobSpec jobSpec, Config sysConfig) 
{
+      if 
(!(sysConfig.hasPath(ConfigurationKeys.SPECEXECUTOR_CONFIGS_PREFIX_KEY)
+          && !Strings.isNullOrEmpty(ConfigUtils.getString(sysConfig, 
ConfigurationKeys.SPECEXECUTOR_CONFIGS_PREFIX_KEY, ""))
+          && 
sysConfig.hasPath(sysConfig.getString(ConfigurationKeys.SPECEXECUTOR_CONFIGS_PREFIX_KEY))))
 {
+        return;
+      }
+
+      String additionalConfigsPrefix = 
sysConfig.getString(ConfigurationKeys.SPECEXECUTOR_CONFIGS_PREFIX_KEY);
+
+      Config config = 
jobSpec.getConfig().withFallback(ConfigUtils.getConfigOrEmpty(sysConfig, 
additionalConfigsPrefix));
+
+      if (!config.isEmpty()) {
+        jobSpec.setConfig(config);
+      }
+    }
+
+    /**
      * A method to add tracking event configurations to a JobSpec.
      * This enables {@link org.apache.gobblin.metrics.GobblinTrackingEvent}s
      * to be emitted from each Gobblin job orchestrated by 
Gobblin-as-a-Service, which will then be used for tracking the
diff --git 
a/gobblin-service/src/main/java/org/apache/gobblin/service/modules/topology/ConfigBasedTopologySpecFactory.java
 
b/gobblin-service/src/main/java/org/apache/gobblin/service/modules/topology/ConfigBasedTopologySpecFactory.java
index fb7c1b0..d1bb267 100644
--- 
a/gobblin-service/src/main/java/org/apache/gobblin/service/modules/topology/ConfigBasedTopologySpecFactory.java
+++ 
b/gobblin-service/src/main/java/org/apache/gobblin/service/modules/topology/ConfigBasedTopologySpecFactory.java
@@ -88,7 +88,11 @@ public class ConfigBasedTopologySpecFactory implements 
TopologySpecFactory {
                 .resolve(specExecutorClass)), topologyConfig);
       } catch (NoSuchMethodException | IllegalAccessException | 
InvocationTargetException | InstantiationException
           | ClassNotFoundException e) {
-        throw new RuntimeException(e);
+        if (e.getCause() != null) {
+          throw new RuntimeException(e.getCause());
+        } else {
+          throw new RuntimeException(e);
+        }
       }
 
       TopologySpec.Builder topologySpecBuilder = TopologySpec

Reply via email to