This is an automated email from the ASF dual-hosted git repository.
suvasude pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/incubator-gobblin.git
The following commit(s) were added to refs/heads/master by this push:
new 678c8f0 [GOBBLIN-1041][Gobblin-1041] send metrics for workunit
creation time
678c8f0 is described below
commit 678c8f0538287990f44974bc08d5a8a7f4d4876c
Author: Arjun <[email protected]>
AuthorDate: Mon Feb 10 17:51:55 2020 -0800
[GOBBLIN-1041][Gobblin-1041] send metrics for workunit creation time
Closes #2881 from arjun4084346/benchmarking
---
.../gobblin/configuration/ConfigurationKeys.java | 1 +
.../metrics/reporter/util/MetricReportUtils.java | 3 ++-
.../modules/orchestration/AzkabanSpecExecutor.java | 18 +++++++++------
.../gobblin/runtime/AbstractJobLauncher.java | 27 ++++++++++++++++------
.../service/modules/spec/JobExecutionPlan.java | 25 ++++++++++++++++++++
.../topology/ConfigBasedTopologySpecFactory.java | 6 ++++-
6 files changed, 64 insertions(+), 16 deletions(-)
diff --git
a/gobblin-api/src/main/java/org/apache/gobblin/configuration/ConfigurationKeys.java
b/gobblin-api/src/main/java/org/apache/gobblin/configuration/ConfigurationKeys.java
index 79d1280..df86caa 100644
---
a/gobblin-api/src/main/java/org/apache/gobblin/configuration/ConfigurationKeys.java
+++
b/gobblin-api/src/main/java/org/apache/gobblin/configuration/ConfigurationKeys.java
@@ -938,6 +938,7 @@ public class ConfigurationKeys {
*/
public static final String SPECEXECUTOR_INSTANCE_URI_KEY =
"specExecInstance.uri";
public static final String SPECEXECUTOR_INSTANCE_CAPABILITIES_KEY =
"specExecInstance.capabilities";
+ public static final String SPECEXECUTOR_CONFIGS_PREFIX_KEY =
"specExecutor.additional.configs.key";
/***
* Configuration properties related to Spec Producer
diff --git
a/gobblin-metrics-libs/gobblin-metrics-base/src/main/java/org/apache/gobblin/metrics/reporter/util/MetricReportUtils.java
b/gobblin-metrics-libs/gobblin-metrics-base/src/main/java/org/apache/gobblin/metrics/reporter/util/MetricReportUtils.java
index 3cdb0a4..2d49606 100644
---
a/gobblin-metrics-libs/gobblin-metrics-base/src/main/java/org/apache/gobblin/metrics/reporter/util/MetricReportUtils.java
+++
b/gobblin-metrics-libs/gobblin-metrics-base/src/main/java/org/apache/gobblin/metrics/reporter/util/MetricReportUtils.java
@@ -38,9 +38,10 @@ public class MetricReportUtils {
public static final int SCHEMA_VERSION = 1;
private static Optional<SpecificDatumReader<MetricReport>> READER =
Optional.absent();
- // This prefix can be used to distinguish metrics reported by GobblinService
from other metrics reported by Gobblin
+ // These prefixes can be used to distinguish metrics reported by
GobblinService from other metrics reported by Gobblin
// This can be used in conjunction with MetricNameRegexFilter to filter out
metrics in any MetricReporter
public static final String GOBBLIN_SERVICE_METRICS_PREFIX = "GobblinService";
+ public static final String GOBBLIN_JOB_METRICS_PREFIX = "JobMetrics";
/**
* Parses a {@link org.apache.gobblin.metrics.MetricReport} from a byte
array representing a json input.
diff --git
a/gobblin-modules/gobblin-azkaban/src/main/java/org/apache/gobblin/service/modules/orchestration/AzkabanSpecExecutor.java
b/gobblin-modules/gobblin-azkaban/src/main/java/org/apache/gobblin/service/modules/orchestration/AzkabanSpecExecutor.java
index ba78904..0e6c4a0 100644
---
a/gobblin-modules/gobblin-azkaban/src/main/java/org/apache/gobblin/service/modules/orchestration/AzkabanSpecExecutor.java
+++
b/gobblin-modules/gobblin-azkaban/src/main/java/org/apache/gobblin/service/modules/orchestration/AzkabanSpecExecutor.java
@@ -18,6 +18,12 @@ package org.apache.gobblin.service.modules.orchestration;
import java.util.concurrent.Future;
+import org.slf4j.Logger;
+
+import com.google.common.base.Optional;
+import com.typesafe.config.Config;
+import com.typesafe.config.ConfigFactory;
+
import org.apache.gobblin.runtime.api.Spec;
import org.apache.gobblin.runtime.api.SpecProducer;
import org.apache.gobblin.runtime.spec_executorInstance.AbstractSpecExecutor;
@@ -25,12 +31,6 @@ import org.apache.gobblin.util.CompletedFuture;
import org.apache.gobblin.util.ConfigUtils;
import org.apache.gobblin.util.reflection.GobblinConstructorUtils;
-import org.slf4j.Logger;
-
-import com.google.common.base.Optional;
-import com.typesafe.config.Config;
-import com.typesafe.config.ConfigFactory;
-
public class AzkabanSpecExecutor extends AbstractSpecExecutor {
@@ -55,7 +55,11 @@ public class AzkabanSpecExecutor extends
AbstractSpecExecutor {
azkabanSpecProducer = (SpecProducer<Spec>) GobblinConstructorUtils
.invokeLongestConstructor(producerClass, _config);
} catch (ReflectiveOperationException e) {
- throw new RuntimeException("Could not instantiate kafka pusher", e);
+ if (e.getCause() != null) {
+ throw new RuntimeException("Could not instantiate spec producer",
e.getCause());
+ } else {
+ throw new RuntimeException("Could not instantiate spec producer", e);
+ }
}
}
diff --git
a/gobblin-runtime/src/main/java/org/apache/gobblin/runtime/AbstractJobLauncher.java
b/gobblin-runtime/src/main/java/org/apache/gobblin/runtime/AbstractJobLauncher.java
index f7cecd4..bd01a51 100644
---
a/gobblin-runtime/src/main/java/org/apache/gobblin/runtime/AbstractJobLauncher.java
+++
b/gobblin-runtime/src/main/java/org/apache/gobblin/runtime/AbstractJobLauncher.java
@@ -21,25 +21,20 @@ import java.io.IOException;
import java.net.URI;
import java.net.URISyntaxException;
import java.util.ArrayList;
-import java.util.Arrays;
import java.util.List;
import java.util.Map;
import java.util.Properties;
import java.util.Set;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors;
+import java.util.concurrent.TimeUnit;
import org.apache.commons.lang.StringUtils;
-import org.apache.gobblin.runtime.api.JobSpec;
-import org.apache.gobblin.runtime.api.JobTemplate;
-import org.apache.gobblin.runtime.api.SpecNotFoundException;
-import org.apache.gobblin.runtime.job_spec.JobSpecResolver;
-import org.apache.gobblin.runtime.job_spec.ResolvedJobSpec;
-import org.apache.gobblin.util.ConfigUtils;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import org.slf4j.MDC;
+import com.codahale.metrics.MetricRegistry;
import com.google.common.base.CaseFormat;
import com.google.common.base.Function;
import com.google.common.base.Optional;
@@ -65,15 +60,22 @@ import org.apache.gobblin.commit.DeliverySemantics;
import org.apache.gobblin.configuration.ConfigurationKeys;
import org.apache.gobblin.configuration.WorkUnitState;
import org.apache.gobblin.converter.initializer.ConverterInitializerFactory;
+import org.apache.gobblin.metrics.ContextAwareGauge;
import org.apache.gobblin.metrics.GobblinMetrics;
import org.apache.gobblin.metrics.GobblinMetricsRegistry;
import org.apache.gobblin.metrics.MetricContext;
+import org.apache.gobblin.metrics.RootMetricContext;
import org.apache.gobblin.metrics.Tag;
import org.apache.gobblin.metrics.event.EventName;
import org.apache.gobblin.metrics.event.EventSubmitter;
import org.apache.gobblin.metrics.event.JobEvent;
import org.apache.gobblin.metrics.event.TimingEvent;
+import org.apache.gobblin.metrics.reporter.util.MetricReportUtils;
import org.apache.gobblin.runtime.api.EventMetadataGenerator;
+import org.apache.gobblin.runtime.api.JobSpec;
+import org.apache.gobblin.runtime.api.JobTemplate;
+import org.apache.gobblin.runtime.api.SpecNotFoundException;
+import org.apache.gobblin.runtime.job_spec.JobSpecResolver;
import org.apache.gobblin.runtime.listeners.CloseableJobListener;
import org.apache.gobblin.runtime.listeners.JobExecutionEventSubmitterListener;
import org.apache.gobblin.runtime.listeners.JobListener;
@@ -92,6 +94,7 @@ import org.apache.gobblin.source.workunit.WorkUnit;
import org.apache.gobblin.source.workunit.WorkUnitStream;
import org.apache.gobblin.util.ClassAliasResolver;
import org.apache.gobblin.util.ClusterNameTags;
+import org.apache.gobblin.util.ConfigUtils;
import org.apache.gobblin.util.ExecutorsUtils;
import org.apache.gobblin.util.Id;
import org.apache.gobblin.util.JobLauncherUtils;
@@ -412,6 +415,16 @@ public abstract class AbstractJobLauncher implements
JobLauncher {
workUnitsCreationTimer.stop(this.eventMetadataGenerator.getMetadata(this.jobContext,
EventName.WORK_UNITS_CREATION));
+ if (this.runtimeMetricContext.isPresent()) {
+ String workunitCreationGaugeName = MetricRegistry
+ .name(MetricReportUtils.GOBBLIN_JOB_METRICS_PREFIX,
TimingEvent.LauncherTimings.WORK_UNITS_CREATION,
+ jobState.getJobName());
+ long workUnitsCreationTime = workUnitsCreationTimer.getDuration() /
TimeUnit.SECONDS.toMillis(1);
+ ContextAwareGauge<Integer> workunitCreationGauge =
this.runtimeMetricContext.get()
+ .newContextAwareGauge(workunitCreationGaugeName, () -> (int)
workUnitsCreationTime);
+ this.runtimeMetricContext.get().register(workunitCreationGaugeName,
workunitCreationGauge);
+ }
+
// The absence means there is something wrong getting the work units
if (workUnitStream == null || workUnitStream.getWorkUnits() == null) {
this.eventSubmitter.submit(JobEvent.WORK_UNITS_MISSING);
diff --git
a/gobblin-service/src/main/java/org/apache/gobblin/service/modules/spec/JobExecutionPlan.java
b/gobblin-service/src/main/java/org/apache/gobblin/service/modules/spec/JobExecutionPlan.java
index 027bcc6..5697a64 100644
---
a/gobblin-service/src/main/java/org/apache/gobblin/service/modules/spec/JobExecutionPlan.java
+++
b/gobblin-service/src/main/java/org/apache/gobblin/service/modules/spec/JobExecutionPlan.java
@@ -134,6 +134,8 @@ public class JobExecutionPlan {
//Add tracking config to JobSpec.
addTrackingEventConfig(jobSpec, sysConfig);
+ addAdditionalConfig(jobSpec, sysConfig);
+
// Add dynamic config to jobSpec if a dynamic config generator is
specified in sysConfig
DynamicConfigGenerator dynamicConfigGenerator =
DynamicConfigGeneratorFactory.createDynamicConfigGenerator(sysConfig);
Config dynamicConfig =
dynamicConfigGenerator.generateDynamicConfig(jobSpec.getConfig().withFallback(sysConfig));
@@ -146,6 +148,29 @@ public class JobExecutionPlan {
}
/**
+ * A method to add any additional configurations to a JobSpec which need
to be passed to the {@link SpecExecutor}.
+ * This enables {@link org.apache.gobblin.metrics.GobblinTrackingEvent}s
+ * to be emitted from each Gobblin job orchestrated by
Gobblin-as-a-Service, which will then be used for tracking the
+ * execution status of the job.
+ * @param jobSpec representing a fully resolved {@link JobSpec}.
+ */
+ private static void addAdditionalConfig(JobSpec jobSpec, Config sysConfig)
{
+ if
(!(sysConfig.hasPath(ConfigurationKeys.SPECEXECUTOR_CONFIGS_PREFIX_KEY)
+ && !Strings.isNullOrEmpty(ConfigUtils.getString(sysConfig,
ConfigurationKeys.SPECEXECUTOR_CONFIGS_PREFIX_KEY, ""))
+ &&
sysConfig.hasPath(sysConfig.getString(ConfigurationKeys.SPECEXECUTOR_CONFIGS_PREFIX_KEY))))
{
+ return;
+ }
+
+ String additionalConfigsPrefix =
sysConfig.getString(ConfigurationKeys.SPECEXECUTOR_CONFIGS_PREFIX_KEY);
+
+ Config config =
jobSpec.getConfig().withFallback(ConfigUtils.getConfigOrEmpty(sysConfig,
additionalConfigsPrefix));
+
+ if (!config.isEmpty()) {
+ jobSpec.setConfig(config);
+ }
+ }
+
+ /**
* A method to add tracking event configurations to a JobSpec.
* This enables {@link org.apache.gobblin.metrics.GobblinTrackingEvent}s
* to be emitted from each Gobblin job orchestrated by
Gobblin-as-a-Service, which will then be used for tracking the
diff --git
a/gobblin-service/src/main/java/org/apache/gobblin/service/modules/topology/ConfigBasedTopologySpecFactory.java
b/gobblin-service/src/main/java/org/apache/gobblin/service/modules/topology/ConfigBasedTopologySpecFactory.java
index fb7c1b0..d1bb267 100644
---
a/gobblin-service/src/main/java/org/apache/gobblin/service/modules/topology/ConfigBasedTopologySpecFactory.java
+++
b/gobblin-service/src/main/java/org/apache/gobblin/service/modules/topology/ConfigBasedTopologySpecFactory.java
@@ -88,7 +88,11 @@ public class ConfigBasedTopologySpecFactory implements
TopologySpecFactory {
.resolve(specExecutorClass)), topologyConfig);
} catch (NoSuchMethodException | IllegalAccessException |
InvocationTargetException | InstantiationException
| ClassNotFoundException e) {
- throw new RuntimeException(e);
+ if (e.getCause() != null) {
+ throw new RuntimeException(e.getCause());
+ } else {
+ throw new RuntimeException(e);
+ }
}
TopologySpec.Builder topologySpecBuilder = TopologySpec