This is an automated email from the ASF dual-hosted git repository.

zihanli58 pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/gobblin.git


The following commit(s) were added to refs/heads/master by this push:
     new b726a606c [GOBBLIN-1639] Prevent metrics reporting if configured, 
clean up workunit count metric (#3500)
b726a606c is described below

commit b726a606cea3deb567b1fdeeba9acbcc220e6d30
Author: William Lo <[email protected]>
AuthorDate: Wed May 18 15:34:54 2022 -0700

    [GOBBLIN-1639] Prevent metrics reporting if configured, clean up workunit 
count metric (#3500)
    
    * Move workunit count metrics emitting to Gobblin pipeline, add 
configuration to prevent metrics reporting if configured
    
    * rename config key
    
    * fix test
    
    * Fix checkstyle and other tests
    
    * Create a custom extensible hook for GaaS metrics on JobLauncher
    
    * Add tests
    
    * Fix failing tests
    
    * Address review
    
    * Address review comment
---
 .../gobblin/configuration/ConfigurationKeys.java   |   6 +-
 .../gobblin/runtime/AbstractJobLauncher.java       |  33 +++---
 .../metrics/DefaultGobblinJobMetricReporter.java   |  58 ++++++++++
 .../runtime/metrics/GobblinJobMetricReporter.java  |  30 ++++++
 .../metrics/ServiceGobblinJobMetricReporter.java   |  67 ++++++++++++
 .../gobblin/runtime/LocalJobLauncherTest.java      |  46 +++++++-
 .../metrics/GobblinJobMetricReporterTest.java      | 119 +++++++++++++++++++++
 .../service/modules/orchestration/DagManager.java  |   2 +-
 .../modules/orchestration/DagManagerUtils.java     |   4 +-
 .../service/modules/spec/JobExecutionPlan.java     |   2 +-
 .../service/monitoring/KafkaJobStatusMonitor.java  |  38 -------
 .../modules/orchestration/DagManagerTest.java      |   4 +-
 .../spec/JobExecutionPlanDagFactoryTest.java       |   6 +-
 13 files changed, 343 insertions(+), 72 deletions(-)

diff --git 
a/gobblin-api/src/main/java/org/apache/gobblin/configuration/ConfigurationKeys.java
 
b/gobblin-api/src/main/java/org/apache/gobblin/configuration/ConfigurationKeys.java
index c0aaee7ea..ec85883ae 100644
--- 
a/gobblin-api/src/main/java/org/apache/gobblin/configuration/ConfigurationKeys.java
+++ 
b/gobblin-api/src/main/java/org/apache/gobblin/configuration/ConfigurationKeys.java
@@ -1014,7 +1014,7 @@ public class ConfigurationKeys {
   public static final String DATASET_BASE_OUTPUT_PATH_KEY = 
"gobblin.flow.dataset.baseOutputPath";
   public static final String DATASET_COMBINE_KEY = 
"gobblin.flow.dataset.combine";
   public static final String WHITELISTED_EDGE_IDS = 
"gobblin.flow.whitelistedEdgeIds";
-  public static final String GOBBLIN_FLOW_ISADHOC = 
"gobblin.internal.flow.isAdhoc";
+  public static final String GOBBLIN_OUTPUT_JOB_LEVEL_METRICS = 
"gobblin.job.outputJobLevelMetrics";
   /***
    * Configuration properties related to TopologySpec Store
    */
@@ -1131,4 +1131,8 @@ public class ConfigurationKeys {
    * */
   public static final String 
TROUBLESHOOTER_IN_MEMORY_ISSUE_REPOSITORY_MAX_SIZE = 
"gobblin.troubleshooter.inMemoryIssueRepository.maxSize";
   public static final int 
DEFAULT_TROUBLESHOOTER_IN_MEMORY_ISSUE_REPOSITORY_MAX_SIZE = 100;
+
+  public static final String JOB_METRICS_REPORTER_CLASS_KEY = 
"gobblin.job.metrics.reporter.class";
+  public static final String DEFAULT_JOB_METRICS_REPORTER_CLASS = 
"org.apache.gobblin.runtime.metrics.DefaultGobblinJobMetricReporter";
+
 }
diff --git 
a/gobblin-runtime/src/main/java/org/apache/gobblin/runtime/AbstractJobLauncher.java
 
b/gobblin-runtime/src/main/java/org/apache/gobblin/runtime/AbstractJobLauncher.java
index b7a79bc36..30bfca5ce 100644
--- 
a/gobblin-runtime/src/main/java/org/apache/gobblin/runtime/AbstractJobLauncher.java
+++ 
b/gobblin-runtime/src/main/java/org/apache/gobblin/runtime/AbstractJobLauncher.java
@@ -33,9 +33,9 @@ import java.util.Set;
 import java.util.concurrent.ExecutionException;
 import java.util.concurrent.ExecutorService;
 import java.util.concurrent.Executors;
-import java.util.concurrent.TimeUnit;
 
 import org.apache.commons.lang.StringUtils;
+import org.apache.gobblin.runtime.metrics.GobblinJobMetricReporter;
 import org.apache.gobblin.service.ServiceConfigKeys;
 import org.apache.gobblin.source.InfiniteSource;
 import org.apache.gobblin.stream.WorkUnitChangeEvent;
@@ -43,7 +43,6 @@ import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 import org.slf4j.MDC;
 
-import com.codahale.metrics.MetricRegistry;
 import com.google.common.annotations.VisibleForTesting;
 import com.google.common.base.CaseFormat;
 import com.google.common.base.Function;
@@ -72,11 +71,9 @@ import org.apache.gobblin.configuration.ConfigurationKeys;
 import org.apache.gobblin.configuration.WorkUnitState;
 import org.apache.gobblin.converter.initializer.ConverterInitializerFactory;
 import org.apache.gobblin.destination.DestinationDatasetHandlerService;
-import org.apache.gobblin.metrics.ContextAwareGauge;
 import org.apache.gobblin.metrics.GobblinMetrics;
 import org.apache.gobblin.metrics.GobblinMetricsRegistry;
 import org.apache.gobblin.metrics.MetricContext;
-import org.apache.gobblin.metrics.ServiceMetricNames;
 import org.apache.gobblin.metrics.Tag;
 import org.apache.gobblin.metrics.event.EventName;
 import org.apache.gobblin.metrics.event.EventSubmitter;
@@ -186,6 +183,8 @@ public abstract class AbstractJobLauncher implements 
JobLauncher {
 
   private final AutomaticTroubleshooter troubleshooter;
 
+  protected final GobblinJobMetricReporter gobblinJobMetricsReporter;
+
   public AbstractJobLauncher(Properties jobProps, List<? extends Tag<?>> 
metadataTags)
       throws Exception {
     this(jobProps, metadataTags, null);
@@ -237,7 +236,6 @@ public abstract class AbstractJobLauncher implements 
JobLauncher {
           });
 
       this.eventSubmitter = buildEventSubmitter(metadataTags);
-
       // Add all custom tags to the JobState so that tags are added to any new 
TaskState created
       GobblinMetrics.addCustomTagToState(this.jobContext.getJobState(), 
metadataTags);
 
@@ -247,6 +245,11 @@ public abstract class AbstractJobLauncher implements 
JobLauncher {
       this.multiEventMetadataGenerator = new MultiEventMetadataGenerator(
           PropertiesUtils.getPropAsList(jobProps, 
ConfigurationKeys.EVENT_METADATA_GENERATOR_CLASS_KEY,
               ConfigurationKeys.DEFAULT_EVENT_METADATA_GENERATOR_CLASS_KEY));
+
+      String jobMetricsReporterClassName = 
this.jobProps.getProperty(ConfigurationKeys.JOB_METRICS_REPORTER_CLASS_KEY, 
ConfigurationKeys.DEFAULT_JOB_METRICS_REPORTER_CLASS);
+      this.gobblinJobMetricsReporter =  (GobblinJobMetricReporter) 
GobblinConstructorUtils.invokeLongestConstructor(Class.forName(jobMetricsReporterClassName),
+          this.runtimeMetricContext);
+
     } catch (Exception e) {
       unlockJob();
       throw e;
@@ -428,7 +431,8 @@ public abstract class AbstractJobLauncher implements 
JobLauncher {
     String jobId = this.jobContext.getJobId();
     final JobState jobState = this.jobContext.getJobState();
     boolean isWorkUnitsEmpty = false;
-
+    TimingEvent workUnitsCreationTimer =
+        
this.eventSubmitter.getTimingEvent(TimingEvent.LauncherTimings.WORK_UNITS_CREATION);
     try {
       MDC.put(ConfigurationKeys.JOB_NAME_KEY, this.jobContext.getJobName());
       MDC.put(ConfigurationKeys.JOB_KEY_KEY, this.jobContext.getJobKey());
@@ -451,8 +455,6 @@ public abstract class AbstractJobLauncher implements 
JobLauncher {
           executeUnfinishedCommitSequences(jobState.getJobName());
         }
 
-        TimingEvent workUnitsCreationTimer =
-            
this.eventSubmitter.getTimingEvent(TimingEvent.LauncherTimings.WORK_UNITS_CREATION);
         Source<?, ?> source = this.jobContext.getSource();
         if (source instanceof InfiniteSource) {
           ((InfiniteSource) source).getEventBus().register(this);
@@ -470,16 +472,7 @@ public abstract class AbstractJobLauncher implements 
JobLauncher {
         
workUnitsCreationTimer.stop(this.multiEventMetadataGenerator.getMetadata(this.jobContext,
             EventName.WORK_UNITS_CREATION));
 
-        if (this.runtimeMetricContext.isPresent()) {
-          String workunitCreationGaugeName = MetricRegistry
-              .name(ServiceMetricNames.GOBBLIN_JOB_METRICS_PREFIX, 
TimingEvent.LauncherTimings.WORK_UNITS_CREATION,
-                  jobState.getJobName());
-          long workUnitsCreationTime = workUnitsCreationTimer.getDuration() / 
TimeUnit.SECONDS.toMillis(1);
-          ContextAwareGauge<Integer> workunitCreationGauge = 
this.runtimeMetricContext.get()
-              .newContextAwareGauge(workunitCreationGaugeName, () -> (int) 
workUnitsCreationTime);
-          this.runtimeMetricContext.get().register(workunitCreationGaugeName, 
workunitCreationGauge);
-        }
-
+        
this.gobblinJobMetricsReporter.reportWorkUnitCreationTimerMetrics(workUnitsCreationTimer,
 jobState);
         // The absence means there is something wrong getting the work units
         if (workUnitStream == null || workUnitStream.getWorkUnits() == null) {
           this.eventSubmitter.submit(JobEvent.WORK_UNITS_MISSING);
@@ -487,6 +480,7 @@ public abstract class AbstractJobLauncher implements 
JobLauncher {
           String errMsg = "Failed to get work units for job " + jobId;
           this.jobContext.getJobState().setJobFailureMessage(errMsg);
           this.jobContext.getJobState().setProp(NUM_WORKUNITS, 0);
+          this.gobblinJobMetricsReporter.reportWorkUnitCountMetrics(0, 
jobState);
           throw new JobException(errMsg);
         }
 
@@ -497,6 +491,7 @@ public abstract class AbstractJobLauncher implements 
JobLauncher {
           jobState.setState(JobState.RunningState.COMMITTED);
           isWorkUnitsEmpty = true;
           this.jobContext.getJobState().setProp(NUM_WORKUNITS, 0);
+          this.gobblinJobMetricsReporter.reportWorkUnitCountMetrics(0, 
jobState);
           return;
         }
 
@@ -563,7 +558,7 @@ public abstract class AbstractJobLauncher implements 
JobLauncher {
           // If it is a streaming source, workunits cannot be counted
           this.jobContext.getJobState().setProp(NUM_WORKUNITS,
               workUnitStream.isSafeToMaterialize() ? 
workUnitStream.getMaterializedWorkUnitCollection().size() : 0);
-
+          
this.gobblinJobMetricsReporter.reportWorkUnitCountMetrics(this.jobContext.getJobState().getPropAsInt(NUM_WORKUNITS),
 jobState);
           // dump the work unit if tracking logs are enabled
           if 
(jobState.getPropAsBoolean(ConfigurationKeys.WORK_UNIT_ENABLE_TRACKING_LOGS)) {
             workUnitStream = workUnitStream.transform(new Function<WorkUnit, 
WorkUnit>() {
diff --git 
a/gobblin-runtime/src/main/java/org/apache/gobblin/runtime/metrics/DefaultGobblinJobMetricReporter.java
 
b/gobblin-runtime/src/main/java/org/apache/gobblin/runtime/metrics/DefaultGobblinJobMetricReporter.java
new file mode 100644
index 000000000..bf3d745dd
--- /dev/null
+++ 
b/gobblin-runtime/src/main/java/org/apache/gobblin/runtime/metrics/DefaultGobblinJobMetricReporter.java
@@ -0,0 +1,58 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.gobblin.runtime.metrics;
+
+import com.codahale.metrics.MetricRegistry;
+import com.google.common.base.Optional;
+import java.util.concurrent.TimeUnit;
+import org.apache.gobblin.metrics.ContextAwareGauge;
+import org.apache.gobblin.metrics.MetricContext;
+import org.apache.gobblin.metrics.ServiceMetricNames;
+import org.apache.gobblin.metrics.event.TimingEvent;
+import org.apache.gobblin.runtime.JobState;
+
+
+/**
+ * A metrics reporter that reports only workunitsCreationTimer - which is the 
current default behavior for all Gobblin jobs not emitted by GaaS
+ * Emit metrics with JobMetrics as the prefix
+ */
+public class DefaultGobblinJobMetricReporter implements 
GobblinJobMetricReporter {
+
+  private Optional<MetricContext> metricContext;
+
+  public DefaultGobblinJobMetricReporter(Optional<MetricContext> 
metricContext) {
+     this.metricContext = metricContext;
+  }
+
+  public void reportWorkUnitCreationTimerMetrics(TimingEvent 
workUnitsCreationTimer, JobState jobState) {
+    if (!this.metricContext.isPresent()) {
+      return;
+    }
+    String workunitCreationGaugeName = 
MetricRegistry.name(ServiceMetricNames.GOBBLIN_JOB_METRICS_PREFIX,
+        TimingEvent.LauncherTimings.WORK_UNITS_CREATION, 
jobState.getJobName());
+    long workUnitsCreationTime = workUnitsCreationTimer.getDuration() / 
TimeUnit.SECONDS.toMillis(1);
+    ContextAwareGauge<Integer> workunitCreationGauge = this.metricContext.get()
+        .newContextAwareGauge(workunitCreationGaugeName, () -> (int) 
workUnitsCreationTime);
+    this.metricContext.get().register(workunitCreationGaugeName, 
workunitCreationGauge);
+  }
+
+  public void reportWorkUnitCountMetrics(int workUnitCount, JobState jobstate) 
{
+    return;
+  }
+
+}
diff --git 
a/gobblin-runtime/src/main/java/org/apache/gobblin/runtime/metrics/GobblinJobMetricReporter.java
 
b/gobblin-runtime/src/main/java/org/apache/gobblin/runtime/metrics/GobblinJobMetricReporter.java
new file mode 100644
index 000000000..5df052186
--- /dev/null
+++ 
b/gobblin-runtime/src/main/java/org/apache/gobblin/runtime/metrics/GobblinJobMetricReporter.java
@@ -0,0 +1,30 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.gobblin.runtime.metrics;
+
+import org.apache.gobblin.metrics.event.TimingEvent;
+import org.apache.gobblin.runtime.JobState;
+
+
+public interface GobblinJobMetricReporter {
+
+   void reportWorkUnitCreationTimerMetrics(TimingEvent workUnitsCreationTimer, 
JobState jobState);
+
+   void reportWorkUnitCountMetrics(int workUnitCount, JobState jobState);
+
+}
diff --git 
a/gobblin-runtime/src/main/java/org/apache/gobblin/runtime/metrics/ServiceGobblinJobMetricReporter.java
 
b/gobblin-runtime/src/main/java/org/apache/gobblin/runtime/metrics/ServiceGobblinJobMetricReporter.java
new file mode 100644
index 000000000..66ce46b67
--- /dev/null
+++ 
b/gobblin-runtime/src/main/java/org/apache/gobblin/runtime/metrics/ServiceGobblinJobMetricReporter.java
@@ -0,0 +1,67 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.gobblin.runtime.metrics;
+
+import com.codahale.metrics.MetricRegistry;
+import com.google.common.base.Optional;
+import java.util.concurrent.TimeUnit;
+import org.apache.gobblin.configuration.ConfigurationKeys;
+import org.apache.gobblin.metrics.ContextAwareGauge;
+import org.apache.gobblin.metrics.MetricContext;
+import org.apache.gobblin.metrics.ServiceMetricNames;
+import org.apache.gobblin.metrics.event.JobEvent;
+import org.apache.gobblin.metrics.event.TimingEvent;
+import org.apache.gobblin.runtime.JobState;
+
+
+/**
+ * A metrics reporter to report job-level metrics to Gobblin-as-a-Service
+ * Metrics should have the name FLOW_GROUP.FLOW_NAME.EDGE_ID.METRIC_NAME
+ * If edge ID does not exist due to a different flowgraph being used, use the 
jobName as default
+ */
+public class ServiceGobblinJobMetricReporter implements 
GobblinJobMetricReporter {
+  static String FLOW_EDGE_ID_KEY = "flow.edge.id";
+  private Optional<MetricContext> metricContext;
+
+  public ServiceGobblinJobMetricReporter(Optional<MetricContext> 
metricContext) {
+    this.metricContext = metricContext;
+  }
+
+  public void reportWorkUnitCreationTimerMetrics(TimingEvent 
workUnitsCreationTimer, JobState jobState) {
+    if (!this.metricContext.isPresent() || 
!jobState.getPropAsBoolean(ConfigurationKeys.GOBBLIN_OUTPUT_JOB_LEVEL_METRICS, 
true)) {
+      return;
+    }
+    String workunitCreationGaugeName = 
MetricRegistry.name(ServiceMetricNames.GOBBLIN_SERVICE_PREFIX, 
jobState.getProp(ConfigurationKeys.FLOW_GROUP_KEY),
+        jobState.getProp(ConfigurationKeys.FLOW_NAME_KEY), 
jobState.getProp(FLOW_EDGE_ID_KEY, jobState.getJobName()), 
TimingEvent.LauncherTimings.WORK_UNITS_CREATION);
+    long workUnitsCreationTime = workUnitsCreationTimer.getDuration() / 
TimeUnit.SECONDS.toMillis(1);
+    ContextAwareGauge<Integer> workunitCreationGauge =
+        
this.metricContext.get().newContextAwareGauge(workunitCreationGaugeName, () -> 
(int) workUnitsCreationTime);
+    this.metricContext.get().register(workunitCreationGaugeName, 
workunitCreationGauge);
+  }
+
+  public void reportWorkUnitCountMetrics(int workUnitCount, JobState jobState) 
{
+    if (!this.metricContext.isPresent() || 
!jobState.getPropAsBoolean(ConfigurationKeys.GOBBLIN_OUTPUT_JOB_LEVEL_METRICS, 
true)) {
+      return;
+    }
+    String workunitCountGaugeName =  
MetricRegistry.name(ServiceMetricNames.GOBBLIN_SERVICE_PREFIX, 
jobState.getProp(ConfigurationKeys.FLOW_GROUP_KEY),
+        jobState.getProp(ConfigurationKeys.FLOW_NAME_KEY), 
jobState.getProp(FLOW_EDGE_ID_KEY, jobState.getJobName()), 
JobEvent.WORK_UNITS_CREATED);
+    ContextAwareGauge<Integer> workunitCountGauge = this.metricContext.get()
+        .newContextAwareGauge(workunitCountGaugeName, () -> 
Integer.valueOf(workUnitCount));
+    this.metricContext.get().register(workunitCountGaugeName, 
workunitCountGauge);
+  }
+}
diff --git 
a/gobblin-runtime/src/test/java/org/apache/gobblin/runtime/LocalJobLauncherTest.java
 
b/gobblin-runtime/src/test/java/org/apache/gobblin/runtime/LocalJobLauncherTest.java
index 1d781c732..a8916045a 100644
--- 
a/gobblin-runtime/src/test/java/org/apache/gobblin/runtime/LocalJobLauncherTest.java
+++ 
b/gobblin-runtime/src/test/java/org/apache/gobblin/runtime/LocalJobLauncherTest.java
@@ -17,8 +17,10 @@
 
 package org.apache.gobblin.runtime;
 
+import com.codahale.metrics.Metric;
 import java.io.FileReader;
 import java.io.IOException;
+import java.util.Map;
 import java.util.Properties;
 
 import org.apache.gobblin.configuration.ConfigurationKeys;
@@ -27,6 +29,7 @@ import org.apache.gobblin.metastore.StateStore;
 import org.apache.gobblin.metastore.testing.ITestMetastoreDatabase;
 import org.apache.gobblin.metastore.testing.TestMetastoreDatabaseFactory;
 import org.apache.gobblin.runtime.local.LocalJobLauncher;
+import org.apache.gobblin.runtime.metrics.ServiceGobblinJobMetricReporter;
 import org.apache.gobblin.util.JobLauncherUtils;
 import org.apache.gobblin.util.limiter.BaseLimiterType;
 import org.apache.gobblin.util.limiter.DefaultLimiterFactory;
@@ -63,8 +66,7 @@ public class LocalJobLauncherTest {
     
this.launcherProps.setProperty(ConfigurationKeys.JOB_HISTORY_STORE_ENABLED_KEY, 
"true");
     this.launcherProps.setProperty(ConfigurationKeys.METRICS_ENABLED_KEY, 
"true");
     
this.launcherProps.setProperty(ConfigurationKeys.METRICS_REPORTING_FILE_ENABLED_KEY,
 "false");
-    this.launcherProps.setProperty(ConfigurationKeys.JOB_HISTORY_STORE_URL_KEY,
-        testMetastoreDatabase.getJdbcUrl());
+    
this.launcherProps.setProperty(ConfigurationKeys.JOB_HISTORY_STORE_URL_KEY, 
testMetastoreDatabase.getJdbcUrl());
 
     StateStore<JobState.DatasetState> datasetStateStore =
         new 
FsStateStore<>(this.launcherProps.getProperty(ConfigurationKeys.STATE_STORE_FS_URI_KEY),
@@ -128,7 +130,6 @@ public class LocalJobLauncherTest {
     Assert.assertEquals(jobContext.getJobState().getProp("templated0"), "x");
     Assert.assertEquals(jobContext.getJobState().getProp("templated1"), "y");
 
-
     // Verify multi-resolution with inheritance.
     jobProps.setProperty(GOBBLIN_JOB_MULTI_TEMPLATE_KEY,
         
"resource:///templates/test-multitemplate-with-inheritance.template,resource:///templates/test.template");
@@ -198,7 +199,7 @@ public class LocalJobLauncherTest {
     }
   }
 
-  @Test(groups = { "ignore" })
+  @Test(groups = {"ignore"})
   public void testCancelJob() throws Exception {
     this.jobLauncherTestHelper.runTestWithCancellation(loadJobProps());
   }
@@ -293,6 +294,41 @@ public class LocalJobLauncherTest {
     }
   }
 
+  @Test
+  public void testLaunchJobWithDefaultMetricsReporter() throws Exception {
+    Properties jobProps = loadJobProps();
+    jobProps.setProperty(ConfigurationKeys.JOB_NAME_KEY,
+        jobProps.getProperty(ConfigurationKeys.JOB_NAME_KEY) + 
"-testDefaultMetricsReporter");
+    try {
+      JobContext jobContext = this.jobLauncherTestHelper.runTest(jobProps);
+      Map<String, Metric> metrics = 
jobContext.getJobMetricsOptional().get().getMetricContext().getMetrics();
+      
Assert.assertTrue(metrics.containsKey("JobMetrics.WorkUnitsCreationTimer.GobblinTest1-testDefaultMetricsReporter"));
+
+    } finally {
+      
this.jobLauncherTestHelper.deleteStateStore(jobProps.getProperty(ConfigurationKeys.JOB_NAME_KEY));
+    }
+  }
+
+
+  @Test
+  public void testLaunchJobWithServiceMetricsReporter() throws Exception {
+    Properties jobProps = loadJobProps();
+    jobProps.setProperty(ConfigurationKeys.GOBBLIN_OUTPUT_JOB_LEVEL_METRICS, 
"true");
+    jobProps.setProperty(ConfigurationKeys.JOB_METRICS_REPORTER_CLASS_KEY, 
ServiceGobblinJobMetricReporter.class.getName());
+    jobProps.setProperty(ConfigurationKeys.JOB_NAME_KEY, 
"FlowName_FlowGroup_JobName_EdgeId_Hash");
+    jobProps.setProperty(ConfigurationKeys.FLOW_GROUP_KEY, "FlowGroup");
+    jobProps.setProperty(ConfigurationKeys.FLOW_NAME_KEY, "FlowName");
+    jobProps.setProperty("flow.edge.id", "EdgeId");
+    try {
+      JobContext jobContext = this.jobLauncherTestHelper.runTest(jobProps);
+      Map<String, Metric> metrics = 
jobContext.getJobMetricsOptional().get().getMetricContext().getMetrics();
+      
Assert.assertTrue(metrics.containsKey("GobblinService.FlowGroup.FlowName.EdgeId.WorkUnitsCreated"));
+      
Assert.assertTrue(metrics.containsKey("GobblinService.FlowGroup.FlowName.EdgeId.WorkUnitsCreationTimer"));
+    } finally {
+      
this.jobLauncherTestHelper.deleteStateStore(jobProps.getProperty(ConfigurationKeys.JOB_NAME_KEY));
+    }
+  }
+
   @AfterClass(alwaysRun = true)
   public void tearDown() throws IOException {
     if (testMetastoreDatabase != null) {
@@ -310,4 +346,4 @@ public class LocalJobLauncherTest {
 
     return jobProps;
   }
-}
+}
\ No newline at end of file
diff --git 
a/gobblin-runtime/src/test/java/org/apache/gobblin/runtime/metrics/GobblinJobMetricReporterTest.java
 
b/gobblin-runtime/src/test/java/org/apache/gobblin/runtime/metrics/GobblinJobMetricReporterTest.java
new file mode 100644
index 000000000..fe84432f9
--- /dev/null
+++ 
b/gobblin-runtime/src/test/java/org/apache/gobblin/runtime/metrics/GobblinJobMetricReporterTest.java
@@ -0,0 +1,119 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.gobblin.runtime.metrics;
+
+import com.codahale.metrics.Metric;
+import java.io.FileReader;
+import java.io.IOException;
+import java.util.Map;
+import java.util.Properties;
+
+import org.apache.gobblin.configuration.ConfigurationKeys;
+import org.apache.gobblin.metastore.FsStateStore;
+import org.apache.gobblin.metastore.StateStore;
+import org.apache.gobblin.metastore.testing.ITestMetastoreDatabase;
+import org.apache.gobblin.metastore.testing.TestMetastoreDatabaseFactory;
+import org.apache.gobblin.runtime.JobContext;
+import org.apache.gobblin.runtime.JobLauncherTestHelper;
+import org.apache.gobblin.runtime.JobState;
+import org.junit.Assert;
+import org.testng.annotations.AfterClass;
+import org.testng.annotations.BeforeClass;
+import org.testng.annotations.Test;
+
+
+
+/**
+ * Unit test for {@link GobblinJobMetricReporter and its implementation 
classes}.
+ */
+@Test(groups = { "gobblin.runtime.local" })
+public class GobblinJobMetricReporterTest {
+
+  private Properties launcherProps;
+  private JobLauncherTestHelper jobLauncherTestHelper;
+  private ITestMetastoreDatabase testMetastoreDatabase;
+
+  @BeforeClass
+  public void startUp() throws Exception {
+    testMetastoreDatabase = TestMetastoreDatabaseFactory.get();
+    this.launcherProps = new Properties();
+    this.launcherProps.load(new 
FileReader("gobblin-test/resource/gobblin.test.properties"));
+    
this.launcherProps.setProperty(ConfigurationKeys.JOB_HISTORY_STORE_ENABLED_KEY, 
"true");
+    this.launcherProps.setProperty(ConfigurationKeys.METRICS_ENABLED_KEY, 
"true");
+    
this.launcherProps.setProperty(ConfigurationKeys.METRICS_REPORTING_FILE_ENABLED_KEY,
 "false");
+    
this.launcherProps.setProperty(ConfigurationKeys.JOB_HISTORY_STORE_URL_KEY, 
testMetastoreDatabase.getJdbcUrl());
+
+    StateStore<JobState.DatasetState> datasetStateStore =
+        new 
FsStateStore<>(this.launcherProps.getProperty(ConfigurationKeys.STATE_STORE_FS_URI_KEY),
+            
this.launcherProps.getProperty(ConfigurationKeys.STATE_STORE_ROOT_DIR_KEY), 
JobState.DatasetState.class);
+
+    this.jobLauncherTestHelper = new JobLauncherTestHelper(this.launcherProps, 
datasetStateStore);
+  }
+
+  @Test
+  public void testLaunchJobWithDefaultMetricsReporter() throws Exception {
+    Properties jobProps = loadJobProps();
+    jobProps.setProperty(ConfigurationKeys.JOB_NAME_KEY,
+        jobProps.getProperty(ConfigurationKeys.JOB_NAME_KEY) + 
"-testDefaultMetricsReporter");
+    try {
+      JobContext jobContext = this.jobLauncherTestHelper.runTest(jobProps);
+      Map<String, Metric> metrics = 
jobContext.getJobMetricsOptional().get().getMetricContext().getMetrics();
+      
Assert.assertTrue(metrics.containsKey("JobMetrics.WorkUnitsCreationTimer.GobblinTest1-testDefaultMetricsReporter"));
+
+    } finally {
+      
this.jobLauncherTestHelper.deleteStateStore(jobProps.getProperty(ConfigurationKeys.JOB_NAME_KEY));
+    }
+  }
+
+
+  @Test
+  public void testLaunchJobWithServiceMetricsReporter() throws Exception {
+    Properties jobProps = loadJobProps();
+    jobProps.setProperty(ConfigurationKeys.GOBBLIN_OUTPUT_JOB_LEVEL_METRICS, 
"true");
+    jobProps.setProperty(ConfigurationKeys.JOB_METRICS_REPORTER_CLASS_KEY, 
ServiceGobblinJobMetricReporter.class.getName());
+    jobProps.setProperty(ConfigurationKeys.JOB_NAME_KEY, 
"FlowName_FlowGroup_JobName_EdgeId_Hash");
+    jobProps.setProperty(ConfigurationKeys.FLOW_GROUP_KEY, "FlowGroup");
+    jobProps.setProperty(ConfigurationKeys.FLOW_NAME_KEY, "FlowName");
+    jobProps.setProperty("flow.edge.id", "EdgeId");
+    try {
+      JobContext jobContext = this.jobLauncherTestHelper.runTest(jobProps);
+      Map<String, Metric> metrics = 
jobContext.getJobMetricsOptional().get().getMetricContext().getMetrics();
+      
Assert.assertTrue(metrics.containsKey("GobblinService.FlowGroup.FlowName.EdgeId.WorkUnitsCreated"));
+      
Assert.assertTrue(metrics.containsKey("GobblinService.FlowGroup.FlowName.EdgeId.WorkUnitsCreationTimer"));
+    } finally {
+      
this.jobLauncherTestHelper.deleteStateStore(jobProps.getProperty(ConfigurationKeys.JOB_NAME_KEY));
+    }
+  }
+
+  @AfterClass(alwaysRun = true)
+  public void tearDown() throws IOException {
+    if (testMetastoreDatabase != null) {
+      testMetastoreDatabase.close();
+    }
+  }
+
+  private Properties loadJobProps() throws IOException {
+    Properties jobProps = new Properties();
+    jobProps.load(new 
FileReader("gobblin-test/resource/job-conf/GobblinTest1.pull"));
+    jobProps.putAll(this.launcherProps);
+    jobProps.setProperty(JobLauncherTestHelper.SOURCE_FILE_LIST_KEY,
+        "gobblin-test/resource/source/test.avro.0," + 
"gobblin-test/resource/source/test.avro.1,"
+            + "gobblin-test/resource/source/test.avro.2," + 
"gobblin-test/resource/source/test.avro.3");
+
+    return jobProps;
+  }
+}
\ No newline at end of file
diff --git 
a/gobblin-service/src/main/java/org/apache/gobblin/service/modules/orchestration/DagManager.java
 
b/gobblin-service/src/main/java/org/apache/gobblin/service/modules/orchestration/DagManager.java
index d106aaddb..f57169553 100644
--- 
a/gobblin-service/src/main/java/org/apache/gobblin/service/modules/orchestration/DagManager.java
+++ 
b/gobblin-service/src/main/java/org/apache/gobblin/service/modules/orchestration/DagManager.java
@@ -681,7 +681,7 @@ public class DagManager extends AbstractIdleService {
 
       FlowId flowId = DagManagerUtils.getFlowId(dag);
       // Do not register flow-specific metrics for a flow
-      if (!flowGauges.containsKey(flowId.toString()) && 
!DagManagerUtils.isDagFromAdhocFlow(dag)) {
+      if (!flowGauges.containsKey(flowId.toString()) && 
DagManagerUtils.shouldFlowOutputMetrics(dag)) {
         String flowStateGaugeName = 
MetricRegistry.name(ServiceMetricNames.GOBBLIN_SERVICE_PREFIX, 
flowId.getFlowGroup(),
             flowId.getFlowName(), ServiceMetricNames.RUNNING_STATUS);
         flowGauges.put(flowId.toString(), FlowState.RUNNING);
diff --git 
a/gobblin-service/src/main/java/org/apache/gobblin/service/modules/orchestration/DagManagerUtils.java
 
b/gobblin-service/src/main/java/org/apache/gobblin/service/modules/orchestration/DagManagerUtils.java
index 5ec95793a..b59a9636e 100644
--- 
a/gobblin-service/src/main/java/org/apache/gobblin/service/modules/orchestration/DagManagerUtils.java
+++ 
b/gobblin-service/src/main/java/org/apache/gobblin/service/modules/orchestration/DagManagerUtils.java
@@ -312,9 +312,9 @@ public class DagManagerUtils {
     return dag.getStartNodes().get(0).getValue().getJobSpec().getConfig();
   }
 
-  static boolean isDagFromAdhocFlow(Dag<JobExecutionPlan> dag) {
+  static boolean shouldFlowOutputMetrics(Dag<JobExecutionPlan> dag) {
     // defaults to false (so metrics are still tracked) if the dag property is 
not configured due to old dags
-    return ConfigUtils.getBoolean(getDagJobConfig(dag), 
ConfigurationKeys.GOBBLIN_FLOW_ISADHOC,false);
+    return ConfigUtils.getBoolean(getDagJobConfig(dag), 
ConfigurationKeys.GOBBLIN_OUTPUT_JOB_LEVEL_METRICS, true);
   }
 
   static String getSpecExecutorName(DagNode<JobExecutionPlan> dagNode) {
diff --git 
a/gobblin-service/src/main/java/org/apache/gobblin/service/modules/spec/JobExecutionPlan.java
 
b/gobblin-service/src/main/java/org/apache/gobblin/service/modules/spec/JobExecutionPlan.java
index 89d8527ab..50d2a5e14 100644
--- 
a/gobblin-service/src/main/java/org/apache/gobblin/service/modules/spec/JobExecutionPlan.java
+++ 
b/gobblin-service/src/main/java/org/apache/gobblin/service/modules/spec/JobExecutionPlan.java
@@ -127,7 +127,7 @@ public class JobExecutionPlan {
           //Add flow execution id
           .withValue(ConfigurationKeys.FLOW_EXECUTION_ID_KEY, 
ConfigValueFactory.fromAnyRef(flowExecutionId))
           // Remove schedule due to namespace conflict with azkaban schedule 
key, but still keep track if flow is scheduled or not
-          .withValue(ConfigurationKeys.GOBBLIN_FLOW_ISADHOC, 
ConfigValueFactory.fromAnyRef(!jobSpec.getConfig().hasPath(ConfigurationKeys.JOB_SCHEDULE_KEY)))
+          .withValue(ConfigurationKeys.GOBBLIN_OUTPUT_JOB_LEVEL_METRICS, 
ConfigValueFactory.fromAnyRef(jobSpec.getConfig().hasPath(ConfigurationKeys.JOB_SCHEDULE_KEY)))
           .withoutPath(ConfigurationKeys.JOB_SCHEDULE_KEY)
           //Remove template uri
           .withoutPath(GOBBLIN_JOB_TEMPLATE_KEY)
diff --git 
a/gobblin-service/src/main/java/org/apache/gobblin/service/monitoring/KafkaJobStatusMonitor.java
 
b/gobblin-service/src/main/java/org/apache/gobblin/service/monitoring/KafkaJobStatusMonitor.java
index 0a372b3ce..363d5c744 100644
--- 
a/gobblin-service/src/main/java/org/apache/gobblin/service/monitoring/KafkaJobStatusMonitor.java
+++ 
b/gobblin-service/src/main/java/org/apache/gobblin/service/monitoring/KafkaJobStatusMonitor.java
@@ -22,14 +22,11 @@ import java.time.Duration;
 import java.util.List;
 import java.util.Optional;
 import java.util.Properties;
-import java.util.SortedMap;
-import java.util.concurrent.ConcurrentHashMap;
 import java.util.concurrent.ExecutionException;
 import java.util.concurrent.Executors;
 import java.util.concurrent.ScheduledExecutorService;
 import java.util.concurrent.TimeUnit;
 
-import com.codahale.metrics.Gauge;
 import com.codahale.metrics.MetricRegistry;
 import com.codahale.metrics.Timer;
 import com.github.rholder.retry.Attempt;
@@ -53,11 +50,8 @@ import org.apache.gobblin.kafka.client.DecodeableKafkaRecord;
 import org.apache.gobblin.metastore.FileContextBasedFsStateStore;
 import org.apache.gobblin.metastore.FileContextBasedFsStateStoreFactory;
 import org.apache.gobblin.metastore.StateStore;
-import org.apache.gobblin.metrics.ContextAwareGauge;
 import org.apache.gobblin.metrics.GobblinTrackingEvent;
 import org.apache.gobblin.metrics.ServiceMetricNames;
-import org.apache.gobblin.metrics.event.CountEventBuilder;
-import org.apache.gobblin.metrics.event.JobEvent;
 import org.apache.gobblin.metrics.event.TimingEvent;
 import org.apache.gobblin.runtime.TaskContext;
 import org.apache.gobblin.runtime.kafka.HighLevelConsumer;
@@ -114,8 +108,6 @@ public abstract class KafkaJobStatusMonitor extends 
HighLevelConsumer<byte[], by
 
   private final JobIssueEventHandler jobIssueEventHandler;
 
-  private final ConcurrentHashMap<String, Long> flowNameGroupToWorkUnitCount;
-
   private final Retryer<Void> persistJobStatusRetryer;
 
   public KafkaJobStatusMonitor(String topic, Config config, int numThreads, 
JobIssueEventHandler jobIssueEventHandler)
@@ -129,8 +121,6 @@ public abstract class KafkaJobStatusMonitor extends 
HighLevelConsumer<byte[], by
 
     this.jobIssueEventHandler = jobIssueEventHandler;
 
-    this.flowNameGroupToWorkUnitCount = new ConcurrentHashMap<>();
-
     Config retryerOverridesConfig = 
config.hasPath(KafkaJobStatusMonitor.JOB_STATUS_MONITOR_PREFIX)
         ? config.getConfig(KafkaJobStatusMonitor.JOB_STATUS_MONITOR_PREFIX)
         : ConfigFactory.empty();
@@ -191,11 +181,6 @@ public abstract class KafkaJobStatusMonitor extends 
HighLevelConsumer<byte[], by
       }
     }
 
-    if (gobblinTrackingEvent.getName().equals(JobEvent.WORK_UNITS_CREATED)) {
-      emitWorkUnitCountMetric(gobblinTrackingEvent);
-      return;
-    }
-
     try {
       persistJobStatusRetryer.call(() -> {
         // re-create `jobStatus` on each attempt, since mutated within 
`addJobStatusToStateStore`
@@ -314,29 +299,6 @@ public abstract class KafkaJobStatusMonitor extends 
HighLevelConsumer<byte[], by
     return new org.apache.gobblin.configuration.State(mergedState);
   }
 
-  private void emitWorkUnitCountMetric(GobblinTrackingEvent event) {
-    Properties properties = new Properties();
-    properties.putAll(event.getMetadata());
-
-    Long numWorkUnits = 
Long.parseLong(properties.getProperty(CountEventBuilder.COUNT_KEY));
-    String workUnitCountName = 
MetricRegistry.name(ServiceMetricNames.GOBBLIN_SERVICE_PREFIX,
-        
properties.getProperty(TimingEvent.FlowEventConstants.FLOW_GROUP_FIELD),
-        properties.getProperty(TimingEvent.FlowEventConstants.FLOW_NAME_FIELD),
-        JobEvent.WORK_UNITS_CREATED);
-
-    SortedMap<String, Gauge> existingGauges = 
this.getMetricContext().getGauges(
-        (name, metric) -> name.equals(workUnitCountName));
-
-    // If gauge for this flow name and group exists, then value will be 
updated by reference. Otherwise create
-    // a new gauge and save a reference to the value in the HashMap
-    this.flowNameGroupToWorkUnitCount.put(workUnitCountName, numWorkUnits);
-    if (existingGauges.isEmpty()) {
-      ContextAwareGauge gauge = 
this.getMetricContext().newContextAwareGauge(workUnitCountName,
-          () -> this.flowNameGroupToWorkUnitCount.get(workUnitCountName));
-      this.getMetricContext().register(workUnitCountName, gauge);
-    }
-  }
-
   public static String jobStatusTableName(String flowExecutionId, String 
jobGroup, String jobName) {
     return 
Joiner.on(ServiceConfigKeys.STATE_STORE_KEY_SEPARATION_CHARACTER).join(flowExecutionId,
 jobGroup, jobName, ServiceConfigKeys.STATE_STORE_TABLE_SUFFIX);
   }
diff --git 
a/gobblin-service/src/test/java/org/apache/gobblin/service/modules/orchestration/DagManagerTest.java
 
b/gobblin-service/src/test/java/org/apache/gobblin/service/modules/orchestration/DagManagerTest.java
index 53f4d3284..3b685011f 100644
--- 
a/gobblin-service/src/test/java/org/apache/gobblin/service/modules/orchestration/DagManagerTest.java
+++ 
b/gobblin-service/src/test/java/org/apache/gobblin/service/modules/orchestration/DagManagerTest.java
@@ -953,7 +953,7 @@ public class DagManagerTest {
 
     Long flowId = System.currentTimeMillis();
     Dag<JobExecutionPlan> adhocDag = buildDag(String.valueOf(flowId), flowId, 
"FINISH_RUNNING", 1, "proxyUser",
-        
ConfigBuilder.create().addPrimitive(ConfigurationKeys.GOBBLIN_FLOW_ISADHOC, 
true).build());    //Add a dag to the queue of dags
+        
ConfigBuilder.create().addPrimitive(ConfigurationKeys.GOBBLIN_OUTPUT_JOB_LEVEL_METRICS,
 false).build());    //Add a dag to the queue of dags
     this.queue.offer(adhocDag);
 
     Iterator<JobStatus> jobStatusIterator1 =
@@ -976,7 +976,7 @@ public class DagManagerTest {
     
Assert.assertNull(metricContext.getParent().get().getGauges().get(flowStateGaugeName0));
 
     Dag<JobExecutionPlan> scheduledDag = buildDag(String.valueOf(flowId+1), 
flowId+1, "FINISH_RUNNING", 1, "proxyUser",
-        
ConfigBuilder.create().addPrimitive(ConfigurationKeys.GOBBLIN_FLOW_ISADHOC, 
false).build());
+        
ConfigBuilder.create().addPrimitive(ConfigurationKeys.GOBBLIN_OUTPUT_JOB_LEVEL_METRICS,
 true).build());
     this.queue.offer(scheduledDag);
     this._dagManagerThread.run();
     String flowStateGaugeName1 = 
MetricRegistry.name(ServiceMetricNames.GOBBLIN_SERVICE_PREFIX, 
"group"+(flowId+1),
diff --git 
a/gobblin-service/src/test/java/org/apache/gobblin/service/modules/spec/JobExecutionPlanDagFactoryTest.java
 
b/gobblin-service/src/test/java/org/apache/gobblin/service/modules/spec/JobExecutionPlanDagFactoryTest.java
index d07775fb3..cba53dc67 100644
--- 
a/gobblin-service/src/test/java/org/apache/gobblin/service/modules/spec/JobExecutionPlanDagFactoryTest.java
+++ 
b/gobblin-service/src/test/java/org/apache/gobblin/service/modules/spec/JobExecutionPlanDagFactoryTest.java
@@ -199,9 +199,9 @@ public class JobExecutionPlanDagFactoryTest {
     Assert.assertEquals(dag2.getStartNodes().size(), 1);
     Assert.assertEquals(dag2.getEndNodes().size(), 1);
     Assert.assertEquals(dag2.getNodes().size(), 1);
-    // Dag1 is scheduled so should be adhoc, but not dag2
-    
Assert.assertFalse(dag1.getStartNodes().get(0).getValue().getJobSpec().getConfig().getBoolean(ConfigurationKeys.GOBBLIN_FLOW_ISADHOC));
-    
Assert.assertTrue(dag2.getStartNodes().get(0).getValue().getJobSpec().getConfig().getBoolean(ConfigurationKeys.GOBBLIN_FLOW_ISADHOC));
+    // Dag1 is scheduled so should be adhoc and output metrics, but not dag2
+    
Assert.assertTrue(dag1.getStartNodes().get(0).getValue().getJobSpec().getConfig().getBoolean(ConfigurationKeys.GOBBLIN_OUTPUT_JOB_LEVEL_METRICS));
+    
Assert.assertFalse(dag2.getStartNodes().get(0).getValue().getJobSpec().getConfig().getBoolean(ConfigurationKeys.GOBBLIN_OUTPUT_JOB_LEVEL_METRICS));
   }
 
 }
\ No newline at end of file

Reply via email to