This is an automated email from the ASF dual-hosted git repository. suvasude pushed a commit to branch master in repository https://gitbox.apache.org/repos/asf/incubator-gobblin.git
The following commit(s) were added to refs/heads/master by this push: new cd53dcf [GOBBLIN-1027] add metrics for users running gaas jobs cd53dcf is described below commit cd53dcfa2c046f53db29689e1d687bf4412565aa Author: Arjun <ab...@linkedin.com> AuthorDate: Fri Jan 17 13:16:20 2020 -0800 [GOBBLIN-1027] add metrics for users running gaas jobs Closes #2870 from arjun4084346/gaasUsersMetrics --- .../apache/gobblin/metrics/ServiceMetricNames.java | 1 + .../orchestration/AzkabanProjectConfig.java | 5 ++-- .../apache/gobblin/runtime/api/SpecCatalog.java | 2 +- .../service/modules/orchestration/DagManager.java | 33 ++++++++++++++++++++++ .../modules/flow/MultiHopFlowCompilerTest.java | 3 +- 5 files changed, 39 insertions(+), 5 deletions(-) diff --git a/gobblin-metrics-libs/gobblin-metrics/src/main/java/org/apache/gobblin/metrics/ServiceMetricNames.java b/gobblin-metrics-libs/gobblin-metrics/src/main/java/org/apache/gobblin/metrics/ServiceMetricNames.java index 3a8ecaa..9e79c61 100644 --- a/gobblin-metrics-libs/gobblin-metrics/src/main/java/org/apache/gobblin/metrics/ServiceMetricNames.java +++ b/gobblin-metrics-libs/gobblin-metrics/src/main/java/org/apache/gobblin/metrics/ServiceMetricNames.java @@ -37,5 +37,6 @@ public class ServiceMetricNames { public static final String RUN_IMMEDIATELY_FLOW_METER = "RunImmediatelyFlow"; public static final String RUNNING_FLOWS_COUNTER = "RunningFlows"; + public static final String SERVICE_USERS = "ServiceUsers"; public static final String COMPILED = "Compiled"; } diff --git a/gobblin-modules/gobblin-azkaban/src/main/java/org/apache/gobblin/service/modules/orchestration/AzkabanProjectConfig.java b/gobblin-modules/gobblin-azkaban/src/main/java/org/apache/gobblin/service/modules/orchestration/AzkabanProjectConfig.java index 4b8bb9c..3ad9785 100644 --- a/gobblin-modules/gobblin-azkaban/src/main/java/org/apache/gobblin/service/modules/orchestration/AzkabanProjectConfig.java +++ b/gobblin-modules/gobblin-azkaban/src/main/java/org/apache/gobblin/service/modules/orchestration/AzkabanProjectConfig.java @@ -40,21 +40,20 @@ import com.typesafe.config.ConfigFactory; */ public class AzkabanProjectConfig { private final String azkabanServerUrl; - private final String azkabanProjectName; private final String azkabanProjectDescription; private final String azkabanProjectFlowName; private final String azkabanGroupAdminUsers; private final Optional<String> azkabanUserToProxy; - private final Optional<List<String>> azkabanZipJarNames; private final Optional<String> azkabanZipJarUrlTemplate; private final Optional<String> azkabanZipJarVersion; private final Optional<List<String>> azkabanZipAdditionalFiles; private final Boolean failIfJarNotFound; - private final JobSpec jobSpec; + public static final String USER_TO_PROXY = "user.to.proxy"; + public AzkabanProjectConfig(JobSpec jobSpec) { // Extract config objects this.jobSpec = jobSpec; diff --git a/gobblin-runtime/src/main/java/org/apache/gobblin/runtime/api/SpecCatalog.java b/gobblin-runtime/src/main/java/org/apache/gobblin/runtime/api/SpecCatalog.java index d5c1e35..cc55cc4 100644 --- a/gobblin-runtime/src/main/java/org/apache/gobblin/runtime/api/SpecCatalog.java +++ b/gobblin-runtime/src/main/java/org/apache/gobblin/runtime/api/SpecCatalog.java @@ -119,7 +119,7 @@ public interface SpecCatalog extends SpecCatalogListenersContainer, Instrumentab } public void updateGetSpecTime(long startTime) { - log.info("updateGetSpecTime..."); + log.debug("updateGetSpecTime..."); Instrumented.updateTimer(Optional.of(this.timeForSpecCatalogGet), System.currentTimeMillis() - startTime, TimeUnit.MILLISECONDS); } diff --git a/gobblin-service/src/main/java/org/apache/gobblin/service/modules/orchestration/DagManager.java b/gobblin-service/src/main/java/org/apache/gobblin/service/modules/orchestration/DagManager.java index 62b1cfc..1dff94a 100644 --- a/gobblin-service/src/main/java/org/apache/gobblin/service/modules/orchestration/DagManager.java +++ b/gobblin-service/src/main/java/org/apache/gobblin/service/modules/orchestration/DagManager.java @@ -36,6 +36,8 @@ import java.util.concurrent.LinkedBlockingDeque; import java.util.concurrent.ScheduledExecutorService; import java.util.concurrent.TimeUnit; +import org.apache.commons.lang3.StringUtils; + import com.codahale.metrics.MetricRegistry; import com.codahale.metrics.Timer; import com.google.common.base.Optional; @@ -63,6 +65,8 @@ import org.apache.gobblin.runtime.api.SpecProducer; import org.apache.gobblin.runtime.api.TopologySpec; import org.apache.gobblin.service.ExecutionStatus; import org.apache.gobblin.service.FlowConfigResourceLocalHandler; +import org.apache.gobblin.service.RequesterService; +import org.apache.gobblin.service.ServiceRequester; import org.apache.gobblin.service.modules.flowgraph.Dag; import org.apache.gobblin.service.modules.flowgraph.Dag.DagNode; import org.apache.gobblin.service.modules.spec.JobExecutionPlan; @@ -701,6 +705,7 @@ public class DagManager extends AbstractIdleService { if (this.metricContext != null) { getRunningJobsCounter(dagNode).inc(); + getRunningJobsCounterForUser(dagNode).forEach(counter -> counter.inc()); } addSpecFuture.get(); @@ -738,6 +743,7 @@ public class DagManager extends AbstractIdleService { if (this.metricContext != null) { getRunningJobsCounter(dagNode).dec(); + getRunningJobsCounterForUser(dagNode).forEach(counter -> counter.dec()); } switch (jobStatus) { @@ -788,6 +794,33 @@ public class DagManager extends AbstractIdleService { dagNode.getValue().getSpecExecutor().getUri().toString())); } + private List<ContextAwareCounter> getRunningJobsCounterForUser(DagNode<JobExecutionPlan> dagNode) { + Config configs = dagNode.getValue().getJobSpec().getConfig(); + String proxy = ConfigUtils.getString(configs, AzkabanProjectConfig.USER_TO_PROXY, null); + List<ContextAwareCounter> counters = new ArrayList<>(); + + if (StringUtils.isNotEmpty(proxy)) { + counters.add(metricContext.contextAwareCounter( + MetricRegistry.name( + MetricReportUtils.GOBBLIN_SERVICE_METRICS_PREFIX, + ServiceMetricNames.SERVICE_USERS, proxy))); + } + + try { + String serializedRequesters = ConfigUtils.getString(configs, RequesterService.REQUESTER_LIST, null); + if (StringUtils.isNotEmpty(serializedRequesters)) { + List<ServiceRequester> requesters = RequesterService.deserialize(serializedRequesters); + for (ServiceRequester requester : requesters) { + counters.add(metricContext.contextAwareCounter(MetricRegistry + .name(MetricReportUtils.GOBBLIN_SERVICE_METRICS_PREFIX, ServiceMetricNames.SERVICE_USERS, requester.getName()))); + } + } + } catch (IOException e) { + log.error("Error while fetching requester list.", e); + } + + return counters; + } /** * Perform clean up. Remove a dag from the dagstore if the dag is complete and update internal state. */ diff --git a/gobblin-service/src/test/java/org/apache/gobblin/service/modules/flow/MultiHopFlowCompilerTest.java b/gobblin-service/src/test/java/org/apache/gobblin/service/modules/flow/MultiHopFlowCompilerTest.java index 644b75a..c03aa39 100644 --- a/gobblin-service/src/test/java/org/apache/gobblin/service/modules/flow/MultiHopFlowCompilerTest.java +++ b/gobblin-service/src/test/java/org/apache/gobblin/service/modules/flow/MultiHopFlowCompilerTest.java @@ -84,6 +84,7 @@ import org.apache.gobblin.service.modules.flowgraph.FlowEdge; import org.apache.gobblin.service.modules.flowgraph.FlowEdgeFactory; import org.apache.gobblin.service.modules.flowgraph.FlowGraph; import org.apache.gobblin.service.modules.flowgraph.FlowGraphConfigurationKeys; +import org.apache.gobblin.service.modules.orchestration.AzkabanProjectConfig; import org.apache.gobblin.service.modules.spec.JobExecutionPlan; import org.apache.gobblin.service.modules.template_catalog.FSFlowTemplateCatalog; import org.apache.gobblin.util.CompletedFuture; @@ -648,7 +649,7 @@ public class MultiHopFlowCompilerTest { Assert.assertTrue(dag.isEmpty()); Assert.assertEquals(spec.getCompilationErrors().size(), 1); - Assert.assertTrue(spec.getCompilationErrors().iterator().next().contains("user.to.proxy")); + Assert.assertTrue(spec.getCompilationErrors().iterator().next().contains(AzkabanProjectConfig.USER_TO_PROXY)); } @Test (dependsOnMethods = "testUnresolvedFlow")