This is an automated email from the ASF dual-hosted git repository.
zihanli58 pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/gobblin.git
The following commit(s) were added to refs/heads/master by this push:
new b726a606c [GOBBLIN-1639] Prevent metrics reporting if configured,
clean up workunit count metric (#3500)
b726a606c is described below
commit b726a606cea3deb567b1fdeeba9acbcc220e6d30
Author: William Lo <[email protected]>
AuthorDate: Wed May 18 15:34:54 2022 -0700
[GOBBLIN-1639] Prevent metrics reporting if configured, clean up workunit
count metric (#3500)
* Move workunit count metrics emitting to Gobblin pipeline, add
configuration to prevent metrics reporting if configured
* rename config key
* fix test
* Fix checkstyle and other tests
* Create a custom extensible hook for GaaS metrics on JobLauncher
* Add tests
* Fix failing tests
* Address review
* Address review comment
---
.../gobblin/configuration/ConfigurationKeys.java | 6 +-
.../gobblin/runtime/AbstractJobLauncher.java | 33 +++---
.../metrics/DefaultGobblinJobMetricReporter.java | 58 ++++++++++
.../runtime/metrics/GobblinJobMetricReporter.java | 30 ++++++
.../metrics/ServiceGobblinJobMetricReporter.java | 67 ++++++++++++
.../gobblin/runtime/LocalJobLauncherTest.java | 46 +++++++-
.../metrics/GobblinJobMetricReporterTest.java | 119 +++++++++++++++++++++
.../service/modules/orchestration/DagManager.java | 2 +-
.../modules/orchestration/DagManagerUtils.java | 4 +-
.../service/modules/spec/JobExecutionPlan.java | 2 +-
.../service/monitoring/KafkaJobStatusMonitor.java | 38 -------
.../modules/orchestration/DagManagerTest.java | 4 +-
.../spec/JobExecutionPlanDagFactoryTest.java | 6 +-
13 files changed, 343 insertions(+), 72 deletions(-)
diff --git
a/gobblin-api/src/main/java/org/apache/gobblin/configuration/ConfigurationKeys.java
b/gobblin-api/src/main/java/org/apache/gobblin/configuration/ConfigurationKeys.java
index c0aaee7ea..ec85883ae 100644
---
a/gobblin-api/src/main/java/org/apache/gobblin/configuration/ConfigurationKeys.java
+++
b/gobblin-api/src/main/java/org/apache/gobblin/configuration/ConfigurationKeys.java
@@ -1014,7 +1014,7 @@ public class ConfigurationKeys {
public static final String DATASET_BASE_OUTPUT_PATH_KEY =
"gobblin.flow.dataset.baseOutputPath";
public static final String DATASET_COMBINE_KEY =
"gobblin.flow.dataset.combine";
public static final String WHITELISTED_EDGE_IDS =
"gobblin.flow.whitelistedEdgeIds";
- public static final String GOBBLIN_FLOW_ISADHOC =
"gobblin.internal.flow.isAdhoc";
+ public static final String GOBBLIN_OUTPUT_JOB_LEVEL_METRICS =
"gobblin.job.outputJobLevelMetrics";
/***
* Configuration properties related to TopologySpec Store
*/
@@ -1131,4 +1131,8 @@ public class ConfigurationKeys {
* */
public static final String
TROUBLESHOOTER_IN_MEMORY_ISSUE_REPOSITORY_MAX_SIZE =
"gobblin.troubleshooter.inMemoryIssueRepository.maxSize";
public static final int
DEFAULT_TROUBLESHOOTER_IN_MEMORY_ISSUE_REPOSITORY_MAX_SIZE = 100;
+
+ public static final String JOB_METRICS_REPORTER_CLASS_KEY =
"gobblin.job.metrics.reporter.class";
+ public static final String DEFAULT_JOB_METRICS_REPORTER_CLASS =
"org.apache.gobblin.runtime.metrics.DefaultGobblinJobMetricReporter";
+
}
diff --git
a/gobblin-runtime/src/main/java/org/apache/gobblin/runtime/AbstractJobLauncher.java
b/gobblin-runtime/src/main/java/org/apache/gobblin/runtime/AbstractJobLauncher.java
index b7a79bc36..30bfca5ce 100644
---
a/gobblin-runtime/src/main/java/org/apache/gobblin/runtime/AbstractJobLauncher.java
+++
b/gobblin-runtime/src/main/java/org/apache/gobblin/runtime/AbstractJobLauncher.java
@@ -33,9 +33,9 @@ import java.util.Set;
import java.util.concurrent.ExecutionException;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors;
-import java.util.concurrent.TimeUnit;
import org.apache.commons.lang.StringUtils;
+import org.apache.gobblin.runtime.metrics.GobblinJobMetricReporter;
import org.apache.gobblin.service.ServiceConfigKeys;
import org.apache.gobblin.source.InfiniteSource;
import org.apache.gobblin.stream.WorkUnitChangeEvent;
@@ -43,7 +43,6 @@ import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import org.slf4j.MDC;
-import com.codahale.metrics.MetricRegistry;
import com.google.common.annotations.VisibleForTesting;
import com.google.common.base.CaseFormat;
import com.google.common.base.Function;
@@ -72,11 +71,9 @@ import org.apache.gobblin.configuration.ConfigurationKeys;
import org.apache.gobblin.configuration.WorkUnitState;
import org.apache.gobblin.converter.initializer.ConverterInitializerFactory;
import org.apache.gobblin.destination.DestinationDatasetHandlerService;
-import org.apache.gobblin.metrics.ContextAwareGauge;
import org.apache.gobblin.metrics.GobblinMetrics;
import org.apache.gobblin.metrics.GobblinMetricsRegistry;
import org.apache.gobblin.metrics.MetricContext;
-import org.apache.gobblin.metrics.ServiceMetricNames;
import org.apache.gobblin.metrics.Tag;
import org.apache.gobblin.metrics.event.EventName;
import org.apache.gobblin.metrics.event.EventSubmitter;
@@ -186,6 +183,8 @@ public abstract class AbstractJobLauncher implements
JobLauncher {
private final AutomaticTroubleshooter troubleshooter;
+ protected final GobblinJobMetricReporter gobblinJobMetricsReporter;
+
public AbstractJobLauncher(Properties jobProps, List<? extends Tag<?>>
metadataTags)
throws Exception {
this(jobProps, metadataTags, null);
@@ -237,7 +236,6 @@ public abstract class AbstractJobLauncher implements
JobLauncher {
});
this.eventSubmitter = buildEventSubmitter(metadataTags);
-
// Add all custom tags to the JobState so that tags are added to any new
TaskState created
GobblinMetrics.addCustomTagToState(this.jobContext.getJobState(),
metadataTags);
@@ -247,6 +245,11 @@ public abstract class AbstractJobLauncher implements
JobLauncher {
this.multiEventMetadataGenerator = new MultiEventMetadataGenerator(
PropertiesUtils.getPropAsList(jobProps,
ConfigurationKeys.EVENT_METADATA_GENERATOR_CLASS_KEY,
ConfigurationKeys.DEFAULT_EVENT_METADATA_GENERATOR_CLASS_KEY));
+
+ String jobMetricsReporterClassName =
this.jobProps.getProperty(ConfigurationKeys.JOB_METRICS_REPORTER_CLASS_KEY,
ConfigurationKeys.DEFAULT_JOB_METRICS_REPORTER_CLASS);
+ this.gobblinJobMetricsReporter = (GobblinJobMetricReporter)
GobblinConstructorUtils.invokeLongestConstructor(Class.forName(jobMetricsReporterClassName),
+ this.runtimeMetricContext);
+
} catch (Exception e) {
unlockJob();
throw e;
@@ -428,7 +431,8 @@ public abstract class AbstractJobLauncher implements
JobLauncher {
String jobId = this.jobContext.getJobId();
final JobState jobState = this.jobContext.getJobState();
boolean isWorkUnitsEmpty = false;
-
+ TimingEvent workUnitsCreationTimer =
+
this.eventSubmitter.getTimingEvent(TimingEvent.LauncherTimings.WORK_UNITS_CREATION);
try {
MDC.put(ConfigurationKeys.JOB_NAME_KEY, this.jobContext.getJobName());
MDC.put(ConfigurationKeys.JOB_KEY_KEY, this.jobContext.getJobKey());
@@ -451,8 +455,6 @@ public abstract class AbstractJobLauncher implements
JobLauncher {
executeUnfinishedCommitSequences(jobState.getJobName());
}
- TimingEvent workUnitsCreationTimer =
-
this.eventSubmitter.getTimingEvent(TimingEvent.LauncherTimings.WORK_UNITS_CREATION);
Source<?, ?> source = this.jobContext.getSource();
if (source instanceof InfiniteSource) {
((InfiniteSource) source).getEventBus().register(this);
@@ -470,16 +472,7 @@ public abstract class AbstractJobLauncher implements
JobLauncher {
workUnitsCreationTimer.stop(this.multiEventMetadataGenerator.getMetadata(this.jobContext,
EventName.WORK_UNITS_CREATION));
- if (this.runtimeMetricContext.isPresent()) {
- String workunitCreationGaugeName = MetricRegistry
- .name(ServiceMetricNames.GOBBLIN_JOB_METRICS_PREFIX,
TimingEvent.LauncherTimings.WORK_UNITS_CREATION,
- jobState.getJobName());
- long workUnitsCreationTime = workUnitsCreationTimer.getDuration() /
TimeUnit.SECONDS.toMillis(1);
- ContextAwareGauge<Integer> workunitCreationGauge =
this.runtimeMetricContext.get()
- .newContextAwareGauge(workunitCreationGaugeName, () -> (int)
workUnitsCreationTime);
- this.runtimeMetricContext.get().register(workunitCreationGaugeName,
workunitCreationGauge);
- }
-
+
this.gobblinJobMetricsReporter.reportWorkUnitCreationTimerMetrics(workUnitsCreationTimer,
jobState);
// The absence means there is something wrong getting the work units
if (workUnitStream == null || workUnitStream.getWorkUnits() == null) {
this.eventSubmitter.submit(JobEvent.WORK_UNITS_MISSING);
@@ -487,6 +480,7 @@ public abstract class AbstractJobLauncher implements
JobLauncher {
String errMsg = "Failed to get work units for job " + jobId;
this.jobContext.getJobState().setJobFailureMessage(errMsg);
this.jobContext.getJobState().setProp(NUM_WORKUNITS, 0);
+ this.gobblinJobMetricsReporter.reportWorkUnitCountMetrics(0,
jobState);
throw new JobException(errMsg);
}
@@ -497,6 +491,7 @@ public abstract class AbstractJobLauncher implements
JobLauncher {
jobState.setState(JobState.RunningState.COMMITTED);
isWorkUnitsEmpty = true;
this.jobContext.getJobState().setProp(NUM_WORKUNITS, 0);
+ this.gobblinJobMetricsReporter.reportWorkUnitCountMetrics(0,
jobState);
return;
}
@@ -563,7 +558,7 @@ public abstract class AbstractJobLauncher implements
JobLauncher {
// If it is a streaming source, workunits cannot be counted
this.jobContext.getJobState().setProp(NUM_WORKUNITS,
workUnitStream.isSafeToMaterialize() ?
workUnitStream.getMaterializedWorkUnitCollection().size() : 0);
-
+
this.gobblinJobMetricsReporter.reportWorkUnitCountMetrics(this.jobContext.getJobState().getPropAsInt(NUM_WORKUNITS),
jobState);
// dump the work unit if tracking logs are enabled
if
(jobState.getPropAsBoolean(ConfigurationKeys.WORK_UNIT_ENABLE_TRACKING_LOGS)) {
workUnitStream = workUnitStream.transform(new Function<WorkUnit,
WorkUnit>() {
diff --git
a/gobblin-runtime/src/main/java/org/apache/gobblin/runtime/metrics/DefaultGobblinJobMetricReporter.java
b/gobblin-runtime/src/main/java/org/apache/gobblin/runtime/metrics/DefaultGobblinJobMetricReporter.java
new file mode 100644
index 000000000..bf3d745dd
--- /dev/null
+++
b/gobblin-runtime/src/main/java/org/apache/gobblin/runtime/metrics/DefaultGobblinJobMetricReporter.java
@@ -0,0 +1,58 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.gobblin.runtime.metrics;
+
+import com.codahale.metrics.MetricRegistry;
+import com.google.common.base.Optional;
+import java.util.concurrent.TimeUnit;
+import org.apache.gobblin.metrics.ContextAwareGauge;
+import org.apache.gobblin.metrics.MetricContext;
+import org.apache.gobblin.metrics.ServiceMetricNames;
+import org.apache.gobblin.metrics.event.TimingEvent;
+import org.apache.gobblin.runtime.JobState;
+
+
+/**
+ * A metrics reporter that reports only workunitsCreationTimer - which is the
current default behavior for all Gobblin jobs not emitted by GaaS
+ * Emit metrics with JobMetrics as the prefix
+ */
+public class DefaultGobblinJobMetricReporter implements
GobblinJobMetricReporter {
+
+ private Optional<MetricContext> metricContext;
+
+ public DefaultGobblinJobMetricReporter(Optional<MetricContext>
metricContext) {
+ this.metricContext = metricContext;
+ }
+
+ public void reportWorkUnitCreationTimerMetrics(TimingEvent
workUnitsCreationTimer, JobState jobState) {
+ if (!this.metricContext.isPresent()) {
+ return;
+ }
+ String workunitCreationGaugeName =
MetricRegistry.name(ServiceMetricNames.GOBBLIN_JOB_METRICS_PREFIX,
+ TimingEvent.LauncherTimings.WORK_UNITS_CREATION,
jobState.getJobName());
+ long workUnitsCreationTime = workUnitsCreationTimer.getDuration() /
TimeUnit.SECONDS.toMillis(1);
+ ContextAwareGauge<Integer> workunitCreationGauge = this.metricContext.get()
+ .newContextAwareGauge(workunitCreationGaugeName, () -> (int)
workUnitsCreationTime);
+ this.metricContext.get().register(workunitCreationGaugeName,
workunitCreationGauge);
+ }
+
+ public void reportWorkUnitCountMetrics(int workUnitCount, JobState jobstate)
{
+ return;
+ }
+
+}
diff --git
a/gobblin-runtime/src/main/java/org/apache/gobblin/runtime/metrics/GobblinJobMetricReporter.java
b/gobblin-runtime/src/main/java/org/apache/gobblin/runtime/metrics/GobblinJobMetricReporter.java
new file mode 100644
index 000000000..5df052186
--- /dev/null
+++
b/gobblin-runtime/src/main/java/org/apache/gobblin/runtime/metrics/GobblinJobMetricReporter.java
@@ -0,0 +1,30 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.gobblin.runtime.metrics;
+
+import org.apache.gobblin.metrics.event.TimingEvent;
+import org.apache.gobblin.runtime.JobState;
+
+
+public interface GobblinJobMetricReporter {
+
+ void reportWorkUnitCreationTimerMetrics(TimingEvent workUnitsCreationTimer,
JobState jobState);
+
+ void reportWorkUnitCountMetrics(int workUnitCount, JobState jobState);
+
+}
diff --git
a/gobblin-runtime/src/main/java/org/apache/gobblin/runtime/metrics/ServiceGobblinJobMetricReporter.java
b/gobblin-runtime/src/main/java/org/apache/gobblin/runtime/metrics/ServiceGobblinJobMetricReporter.java
new file mode 100644
index 000000000..66ce46b67
--- /dev/null
+++
b/gobblin-runtime/src/main/java/org/apache/gobblin/runtime/metrics/ServiceGobblinJobMetricReporter.java
@@ -0,0 +1,67 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.gobblin.runtime.metrics;
+
+import com.codahale.metrics.MetricRegistry;
+import com.google.common.base.Optional;
+import java.util.concurrent.TimeUnit;
+import org.apache.gobblin.configuration.ConfigurationKeys;
+import org.apache.gobblin.metrics.ContextAwareGauge;
+import org.apache.gobblin.metrics.MetricContext;
+import org.apache.gobblin.metrics.ServiceMetricNames;
+import org.apache.gobblin.metrics.event.JobEvent;
+import org.apache.gobblin.metrics.event.TimingEvent;
+import org.apache.gobblin.runtime.JobState;
+
+
+/**
+ * A metrics reporter to report job-level metrics to Gobblin-as-a-Service
+ * Metrics should have the name FLOW_GROUP.FLOW_NAME.EDGE_ID.METRIC_NAME
+ * If edge ID does not exist due to a different flowgraph being used, use the
jobName as default
+ */
+public class ServiceGobblinJobMetricReporter implements
GobblinJobMetricReporter {
+ static String FLOW_EDGE_ID_KEY = "flow.edge.id";
+ private Optional<MetricContext> metricContext;
+
+ public ServiceGobblinJobMetricReporter(Optional<MetricContext>
metricContext) {
+ this.metricContext = metricContext;
+ }
+
+ public void reportWorkUnitCreationTimerMetrics(TimingEvent
workUnitsCreationTimer, JobState jobState) {
+ if (!this.metricContext.isPresent() ||
!jobState.getPropAsBoolean(ConfigurationKeys.GOBBLIN_OUTPUT_JOB_LEVEL_METRICS,
true)) {
+ return;
+ }
+ String workunitCreationGaugeName =
MetricRegistry.name(ServiceMetricNames.GOBBLIN_SERVICE_PREFIX,
jobState.getProp(ConfigurationKeys.FLOW_GROUP_KEY),
+ jobState.getProp(ConfigurationKeys.FLOW_NAME_KEY),
jobState.getProp(FLOW_EDGE_ID_KEY, jobState.getJobName()),
TimingEvent.LauncherTimings.WORK_UNITS_CREATION);
+ long workUnitsCreationTime = workUnitsCreationTimer.getDuration() /
TimeUnit.SECONDS.toMillis(1);
+ ContextAwareGauge<Integer> workunitCreationGauge =
+
this.metricContext.get().newContextAwareGauge(workunitCreationGaugeName, () ->
(int) workUnitsCreationTime);
+ this.metricContext.get().register(workunitCreationGaugeName,
workunitCreationGauge);
+ }
+
+ public void reportWorkUnitCountMetrics(int workUnitCount, JobState jobState)
{
+ if (!this.metricContext.isPresent() ||
!jobState.getPropAsBoolean(ConfigurationKeys.GOBBLIN_OUTPUT_JOB_LEVEL_METRICS,
true)) {
+ return;
+ }
+ String workunitCountGaugeName =
MetricRegistry.name(ServiceMetricNames.GOBBLIN_SERVICE_PREFIX,
jobState.getProp(ConfigurationKeys.FLOW_GROUP_KEY),
+ jobState.getProp(ConfigurationKeys.FLOW_NAME_KEY),
jobState.getProp(FLOW_EDGE_ID_KEY, jobState.getJobName()),
JobEvent.WORK_UNITS_CREATED);
+ ContextAwareGauge<Integer> workunitCountGauge = this.metricContext.get()
+ .newContextAwareGauge(workunitCountGaugeName, () ->
Integer.valueOf(workUnitCount));
+ this.metricContext.get().register(workunitCountGaugeName,
workunitCountGauge);
+ }
+}
diff --git
a/gobblin-runtime/src/test/java/org/apache/gobblin/runtime/LocalJobLauncherTest.java
b/gobblin-runtime/src/test/java/org/apache/gobblin/runtime/LocalJobLauncherTest.java
index 1d781c732..a8916045a 100644
---
a/gobblin-runtime/src/test/java/org/apache/gobblin/runtime/LocalJobLauncherTest.java
+++
b/gobblin-runtime/src/test/java/org/apache/gobblin/runtime/LocalJobLauncherTest.java
@@ -17,8 +17,10 @@
package org.apache.gobblin.runtime;
+import com.codahale.metrics.Metric;
import java.io.FileReader;
import java.io.IOException;
+import java.util.Map;
import java.util.Properties;
import org.apache.gobblin.configuration.ConfigurationKeys;
@@ -27,6 +29,7 @@ import org.apache.gobblin.metastore.StateStore;
import org.apache.gobblin.metastore.testing.ITestMetastoreDatabase;
import org.apache.gobblin.metastore.testing.TestMetastoreDatabaseFactory;
import org.apache.gobblin.runtime.local.LocalJobLauncher;
+import org.apache.gobblin.runtime.metrics.ServiceGobblinJobMetricReporter;
import org.apache.gobblin.util.JobLauncherUtils;
import org.apache.gobblin.util.limiter.BaseLimiterType;
import org.apache.gobblin.util.limiter.DefaultLimiterFactory;
@@ -63,8 +66,7 @@ public class LocalJobLauncherTest {
this.launcherProps.setProperty(ConfigurationKeys.JOB_HISTORY_STORE_ENABLED_KEY,
"true");
this.launcherProps.setProperty(ConfigurationKeys.METRICS_ENABLED_KEY,
"true");
this.launcherProps.setProperty(ConfigurationKeys.METRICS_REPORTING_FILE_ENABLED_KEY,
"false");
- this.launcherProps.setProperty(ConfigurationKeys.JOB_HISTORY_STORE_URL_KEY,
- testMetastoreDatabase.getJdbcUrl());
+
this.launcherProps.setProperty(ConfigurationKeys.JOB_HISTORY_STORE_URL_KEY,
testMetastoreDatabase.getJdbcUrl());
StateStore<JobState.DatasetState> datasetStateStore =
new
FsStateStore<>(this.launcherProps.getProperty(ConfigurationKeys.STATE_STORE_FS_URI_KEY),
@@ -128,7 +130,6 @@ public class LocalJobLauncherTest {
Assert.assertEquals(jobContext.getJobState().getProp("templated0"), "x");
Assert.assertEquals(jobContext.getJobState().getProp("templated1"), "y");
-
// Verify multi-resolution with inheritance.
jobProps.setProperty(GOBBLIN_JOB_MULTI_TEMPLATE_KEY,
"resource:///templates/test-multitemplate-with-inheritance.template,resource:///templates/test.template");
@@ -198,7 +199,7 @@ public class LocalJobLauncherTest {
}
}
- @Test(groups = { "ignore" })
+ @Test(groups = {"ignore"})
public void testCancelJob() throws Exception {
this.jobLauncherTestHelper.runTestWithCancellation(loadJobProps());
}
@@ -293,6 +294,41 @@ public class LocalJobLauncherTest {
}
}
+ @Test
+ public void testLaunchJobWithDefaultMetricsReporter() throws Exception {
+ Properties jobProps = loadJobProps();
+ jobProps.setProperty(ConfigurationKeys.JOB_NAME_KEY,
+ jobProps.getProperty(ConfigurationKeys.JOB_NAME_KEY) +
"-testDefaultMetricsReporter");
+ try {
+ JobContext jobContext = this.jobLauncherTestHelper.runTest(jobProps);
+ Map<String, Metric> metrics =
jobContext.getJobMetricsOptional().get().getMetricContext().getMetrics();
+
Assert.assertTrue(metrics.containsKey("JobMetrics.WorkUnitsCreationTimer.GobblinTest1-testDefaultMetricsReporter"));
+
+ } finally {
+
this.jobLauncherTestHelper.deleteStateStore(jobProps.getProperty(ConfigurationKeys.JOB_NAME_KEY));
+ }
+ }
+
+
+ @Test
+ public void testLaunchJobWithServiceMetricsReporter() throws Exception {
+ Properties jobProps = loadJobProps();
+ jobProps.setProperty(ConfigurationKeys.GOBBLIN_OUTPUT_JOB_LEVEL_METRICS,
"true");
+ jobProps.setProperty(ConfigurationKeys.JOB_METRICS_REPORTER_CLASS_KEY,
ServiceGobblinJobMetricReporter.class.getName());
+ jobProps.setProperty(ConfigurationKeys.JOB_NAME_KEY,
"FlowName_FlowGroup_JobName_EdgeId_Hash");
+ jobProps.setProperty(ConfigurationKeys.FLOW_GROUP_KEY, "FlowGroup");
+ jobProps.setProperty(ConfigurationKeys.FLOW_NAME_KEY, "FlowName");
+ jobProps.setProperty("flow.edge.id", "EdgeId");
+ try {
+ JobContext jobContext = this.jobLauncherTestHelper.runTest(jobProps);
+ Map<String, Metric> metrics =
jobContext.getJobMetricsOptional().get().getMetricContext().getMetrics();
+
Assert.assertTrue(metrics.containsKey("GobblinService.FlowGroup.FlowName.EdgeId.WorkUnitsCreated"));
+
Assert.assertTrue(metrics.containsKey("GobblinService.FlowGroup.FlowName.EdgeId.WorkUnitsCreationTimer"));
+ } finally {
+
this.jobLauncherTestHelper.deleteStateStore(jobProps.getProperty(ConfigurationKeys.JOB_NAME_KEY));
+ }
+ }
+
@AfterClass(alwaysRun = true)
public void tearDown() throws IOException {
if (testMetastoreDatabase != null) {
@@ -310,4 +346,4 @@ public class LocalJobLauncherTest {
return jobProps;
}
-}
+}
\ No newline at end of file
diff --git
a/gobblin-runtime/src/test/java/org/apache/gobblin/runtime/metrics/GobblinJobMetricReporterTest.java
b/gobblin-runtime/src/test/java/org/apache/gobblin/runtime/metrics/GobblinJobMetricReporterTest.java
new file mode 100644
index 000000000..fe84432f9
--- /dev/null
+++
b/gobblin-runtime/src/test/java/org/apache/gobblin/runtime/metrics/GobblinJobMetricReporterTest.java
@@ -0,0 +1,119 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.gobblin.runtime.metrics;
+
+import com.codahale.metrics.Metric;
+import java.io.FileReader;
+import java.io.IOException;
+import java.util.Map;
+import java.util.Properties;
+
+import org.apache.gobblin.configuration.ConfigurationKeys;
+import org.apache.gobblin.metastore.FsStateStore;
+import org.apache.gobblin.metastore.StateStore;
+import org.apache.gobblin.metastore.testing.ITestMetastoreDatabase;
+import org.apache.gobblin.metastore.testing.TestMetastoreDatabaseFactory;
+import org.apache.gobblin.runtime.JobContext;
+import org.apache.gobblin.runtime.JobLauncherTestHelper;
+import org.apache.gobblin.runtime.JobState;
+import org.junit.Assert;
+import org.testng.annotations.AfterClass;
+import org.testng.annotations.BeforeClass;
+import org.testng.annotations.Test;
+
+
+
+/**
+ * Unit test for {@link GobblinJobMetricReporter and its implementation
classes}.
+ */
+@Test(groups = { "gobblin.runtime.local" })
+public class GobblinJobMetricReporterTest {
+
+ private Properties launcherProps;
+ private JobLauncherTestHelper jobLauncherTestHelper;
+ private ITestMetastoreDatabase testMetastoreDatabase;
+
+ @BeforeClass
+ public void startUp() throws Exception {
+ testMetastoreDatabase = TestMetastoreDatabaseFactory.get();
+ this.launcherProps = new Properties();
+ this.launcherProps.load(new
FileReader("gobblin-test/resource/gobblin.test.properties"));
+
this.launcherProps.setProperty(ConfigurationKeys.JOB_HISTORY_STORE_ENABLED_KEY,
"true");
+ this.launcherProps.setProperty(ConfigurationKeys.METRICS_ENABLED_KEY,
"true");
+
this.launcherProps.setProperty(ConfigurationKeys.METRICS_REPORTING_FILE_ENABLED_KEY,
"false");
+
this.launcherProps.setProperty(ConfigurationKeys.JOB_HISTORY_STORE_URL_KEY,
testMetastoreDatabase.getJdbcUrl());
+
+ StateStore<JobState.DatasetState> datasetStateStore =
+ new
FsStateStore<>(this.launcherProps.getProperty(ConfigurationKeys.STATE_STORE_FS_URI_KEY),
+
this.launcherProps.getProperty(ConfigurationKeys.STATE_STORE_ROOT_DIR_KEY),
JobState.DatasetState.class);
+
+ this.jobLauncherTestHelper = new JobLauncherTestHelper(this.launcherProps,
datasetStateStore);
+ }
+
+ @Test
+ public void testLaunchJobWithDefaultMetricsReporter() throws Exception {
+ Properties jobProps = loadJobProps();
+ jobProps.setProperty(ConfigurationKeys.JOB_NAME_KEY,
+ jobProps.getProperty(ConfigurationKeys.JOB_NAME_KEY) +
"-testDefaultMetricsReporter");
+ try {
+ JobContext jobContext = this.jobLauncherTestHelper.runTest(jobProps);
+ Map<String, Metric> metrics =
jobContext.getJobMetricsOptional().get().getMetricContext().getMetrics();
+
Assert.assertTrue(metrics.containsKey("JobMetrics.WorkUnitsCreationTimer.GobblinTest1-testDefaultMetricsReporter"));
+
+ } finally {
+
this.jobLauncherTestHelper.deleteStateStore(jobProps.getProperty(ConfigurationKeys.JOB_NAME_KEY));
+ }
+ }
+
+
+ @Test
+ public void testLaunchJobWithServiceMetricsReporter() throws Exception {
+ Properties jobProps = loadJobProps();
+ jobProps.setProperty(ConfigurationKeys.GOBBLIN_OUTPUT_JOB_LEVEL_METRICS,
"true");
+ jobProps.setProperty(ConfigurationKeys.JOB_METRICS_REPORTER_CLASS_KEY,
ServiceGobblinJobMetricReporter.class.getName());
+ jobProps.setProperty(ConfigurationKeys.JOB_NAME_KEY,
"FlowName_FlowGroup_JobName_EdgeId_Hash");
+ jobProps.setProperty(ConfigurationKeys.FLOW_GROUP_KEY, "FlowGroup");
+ jobProps.setProperty(ConfigurationKeys.FLOW_NAME_KEY, "FlowName");
+ jobProps.setProperty("flow.edge.id", "EdgeId");
+ try {
+ JobContext jobContext = this.jobLauncherTestHelper.runTest(jobProps);
+ Map<String, Metric> metrics =
jobContext.getJobMetricsOptional().get().getMetricContext().getMetrics();
+
Assert.assertTrue(metrics.containsKey("GobblinService.FlowGroup.FlowName.EdgeId.WorkUnitsCreated"));
+
Assert.assertTrue(metrics.containsKey("GobblinService.FlowGroup.FlowName.EdgeId.WorkUnitsCreationTimer"));
+ } finally {
+
this.jobLauncherTestHelper.deleteStateStore(jobProps.getProperty(ConfigurationKeys.JOB_NAME_KEY));
+ }
+ }
+
+ @AfterClass(alwaysRun = true)
+ public void tearDown() throws IOException {
+ if (testMetastoreDatabase != null) {
+ testMetastoreDatabase.close();
+ }
+ }
+
+ private Properties loadJobProps() throws IOException {
+ Properties jobProps = new Properties();
+ jobProps.load(new
FileReader("gobblin-test/resource/job-conf/GobblinTest1.pull"));
+ jobProps.putAll(this.launcherProps);
+ jobProps.setProperty(JobLauncherTestHelper.SOURCE_FILE_LIST_KEY,
+ "gobblin-test/resource/source/test.avro.0," +
"gobblin-test/resource/source/test.avro.1,"
+ + "gobblin-test/resource/source/test.avro.2," +
"gobblin-test/resource/source/test.avro.3");
+
+ return jobProps;
+ }
+}
\ No newline at end of file
diff --git
a/gobblin-service/src/main/java/org/apache/gobblin/service/modules/orchestration/DagManager.java
b/gobblin-service/src/main/java/org/apache/gobblin/service/modules/orchestration/DagManager.java
index d106aaddb..f57169553 100644
---
a/gobblin-service/src/main/java/org/apache/gobblin/service/modules/orchestration/DagManager.java
+++
b/gobblin-service/src/main/java/org/apache/gobblin/service/modules/orchestration/DagManager.java
@@ -681,7 +681,7 @@ public class DagManager extends AbstractIdleService {
FlowId flowId = DagManagerUtils.getFlowId(dag);
// Do not register flow-specific metrics for a flow
- if (!flowGauges.containsKey(flowId.toString()) &&
!DagManagerUtils.isDagFromAdhocFlow(dag)) {
+ if (!flowGauges.containsKey(flowId.toString()) &&
DagManagerUtils.shouldFlowOutputMetrics(dag)) {
String flowStateGaugeName =
MetricRegistry.name(ServiceMetricNames.GOBBLIN_SERVICE_PREFIX,
flowId.getFlowGroup(),
flowId.getFlowName(), ServiceMetricNames.RUNNING_STATUS);
flowGauges.put(flowId.toString(), FlowState.RUNNING);
diff --git
a/gobblin-service/src/main/java/org/apache/gobblin/service/modules/orchestration/DagManagerUtils.java
b/gobblin-service/src/main/java/org/apache/gobblin/service/modules/orchestration/DagManagerUtils.java
index 5ec95793a..b59a9636e 100644
---
a/gobblin-service/src/main/java/org/apache/gobblin/service/modules/orchestration/DagManagerUtils.java
+++
b/gobblin-service/src/main/java/org/apache/gobblin/service/modules/orchestration/DagManagerUtils.java
@@ -312,9 +312,9 @@ public class DagManagerUtils {
return dag.getStartNodes().get(0).getValue().getJobSpec().getConfig();
}
- static boolean isDagFromAdhocFlow(Dag<JobExecutionPlan> dag) {
+ static boolean shouldFlowOutputMetrics(Dag<JobExecutionPlan> dag) {
// defaults to false (so metrics are still tracked) if the dag property is
not configured due to old dags
- return ConfigUtils.getBoolean(getDagJobConfig(dag),
ConfigurationKeys.GOBBLIN_FLOW_ISADHOC,false);
+ return ConfigUtils.getBoolean(getDagJobConfig(dag),
ConfigurationKeys.GOBBLIN_OUTPUT_JOB_LEVEL_METRICS, true);
}
static String getSpecExecutorName(DagNode<JobExecutionPlan> dagNode) {
diff --git
a/gobblin-service/src/main/java/org/apache/gobblin/service/modules/spec/JobExecutionPlan.java
b/gobblin-service/src/main/java/org/apache/gobblin/service/modules/spec/JobExecutionPlan.java
index 89d8527ab..50d2a5e14 100644
---
a/gobblin-service/src/main/java/org/apache/gobblin/service/modules/spec/JobExecutionPlan.java
+++
b/gobblin-service/src/main/java/org/apache/gobblin/service/modules/spec/JobExecutionPlan.java
@@ -127,7 +127,7 @@ public class JobExecutionPlan {
//Add flow execution id
.withValue(ConfigurationKeys.FLOW_EXECUTION_ID_KEY,
ConfigValueFactory.fromAnyRef(flowExecutionId))
// Remove schedule due to namespace conflict with azkaban schedule
key, but still keep track if flow is scheduled or not
- .withValue(ConfigurationKeys.GOBBLIN_FLOW_ISADHOC,
ConfigValueFactory.fromAnyRef(!jobSpec.getConfig().hasPath(ConfigurationKeys.JOB_SCHEDULE_KEY)))
+ .withValue(ConfigurationKeys.GOBBLIN_OUTPUT_JOB_LEVEL_METRICS,
ConfigValueFactory.fromAnyRef(jobSpec.getConfig().hasPath(ConfigurationKeys.JOB_SCHEDULE_KEY)))
.withoutPath(ConfigurationKeys.JOB_SCHEDULE_KEY)
//Remove template uri
.withoutPath(GOBBLIN_JOB_TEMPLATE_KEY)
diff --git
a/gobblin-service/src/main/java/org/apache/gobblin/service/monitoring/KafkaJobStatusMonitor.java
b/gobblin-service/src/main/java/org/apache/gobblin/service/monitoring/KafkaJobStatusMonitor.java
index 0a372b3ce..363d5c744 100644
---
a/gobblin-service/src/main/java/org/apache/gobblin/service/monitoring/KafkaJobStatusMonitor.java
+++
b/gobblin-service/src/main/java/org/apache/gobblin/service/monitoring/KafkaJobStatusMonitor.java
@@ -22,14 +22,11 @@ import java.time.Duration;
import java.util.List;
import java.util.Optional;
import java.util.Properties;
-import java.util.SortedMap;
-import java.util.concurrent.ConcurrentHashMap;
import java.util.concurrent.ExecutionException;
import java.util.concurrent.Executors;
import java.util.concurrent.ScheduledExecutorService;
import java.util.concurrent.TimeUnit;
-import com.codahale.metrics.Gauge;
import com.codahale.metrics.MetricRegistry;
import com.codahale.metrics.Timer;
import com.github.rholder.retry.Attempt;
@@ -53,11 +50,8 @@ import org.apache.gobblin.kafka.client.DecodeableKafkaRecord;
import org.apache.gobblin.metastore.FileContextBasedFsStateStore;
import org.apache.gobblin.metastore.FileContextBasedFsStateStoreFactory;
import org.apache.gobblin.metastore.StateStore;
-import org.apache.gobblin.metrics.ContextAwareGauge;
import org.apache.gobblin.metrics.GobblinTrackingEvent;
import org.apache.gobblin.metrics.ServiceMetricNames;
-import org.apache.gobblin.metrics.event.CountEventBuilder;
-import org.apache.gobblin.metrics.event.JobEvent;
import org.apache.gobblin.metrics.event.TimingEvent;
import org.apache.gobblin.runtime.TaskContext;
import org.apache.gobblin.runtime.kafka.HighLevelConsumer;
@@ -114,8 +108,6 @@ public abstract class KafkaJobStatusMonitor extends
HighLevelConsumer<byte[], by
private final JobIssueEventHandler jobIssueEventHandler;
- private final ConcurrentHashMap<String, Long> flowNameGroupToWorkUnitCount;
-
private final Retryer<Void> persistJobStatusRetryer;
public KafkaJobStatusMonitor(String topic, Config config, int numThreads,
JobIssueEventHandler jobIssueEventHandler)
@@ -129,8 +121,6 @@ public abstract class KafkaJobStatusMonitor extends
HighLevelConsumer<byte[], by
this.jobIssueEventHandler = jobIssueEventHandler;
- this.flowNameGroupToWorkUnitCount = new ConcurrentHashMap<>();
-
Config retryerOverridesConfig =
config.hasPath(KafkaJobStatusMonitor.JOB_STATUS_MONITOR_PREFIX)
? config.getConfig(KafkaJobStatusMonitor.JOB_STATUS_MONITOR_PREFIX)
: ConfigFactory.empty();
@@ -191,11 +181,6 @@ public abstract class KafkaJobStatusMonitor extends
HighLevelConsumer<byte[], by
}
}
- if (gobblinTrackingEvent.getName().equals(JobEvent.WORK_UNITS_CREATED)) {
- emitWorkUnitCountMetric(gobblinTrackingEvent);
- return;
- }
-
try {
persistJobStatusRetryer.call(() -> {
// re-create `jobStatus` on each attempt, since mutated within
`addJobStatusToStateStore`
@@ -314,29 +299,6 @@ public abstract class KafkaJobStatusMonitor extends
HighLevelConsumer<byte[], by
return new org.apache.gobblin.configuration.State(mergedState);
}
- private void emitWorkUnitCountMetric(GobblinTrackingEvent event) {
- Properties properties = new Properties();
- properties.putAll(event.getMetadata());
-
- Long numWorkUnits =
Long.parseLong(properties.getProperty(CountEventBuilder.COUNT_KEY));
- String workUnitCountName =
MetricRegistry.name(ServiceMetricNames.GOBBLIN_SERVICE_PREFIX,
-
properties.getProperty(TimingEvent.FlowEventConstants.FLOW_GROUP_FIELD),
- properties.getProperty(TimingEvent.FlowEventConstants.FLOW_NAME_FIELD),
- JobEvent.WORK_UNITS_CREATED);
-
- SortedMap<String, Gauge> existingGauges =
this.getMetricContext().getGauges(
- (name, metric) -> name.equals(workUnitCountName));
-
- // If gauge for this flow name and group exists, then value will be
updated by reference. Otherwise create
- // a new gauge and save a reference to the value in the HashMap
- this.flowNameGroupToWorkUnitCount.put(workUnitCountName, numWorkUnits);
- if (existingGauges.isEmpty()) {
- ContextAwareGauge gauge =
this.getMetricContext().newContextAwareGauge(workUnitCountName,
- () -> this.flowNameGroupToWorkUnitCount.get(workUnitCountName));
- this.getMetricContext().register(workUnitCountName, gauge);
- }
- }
-
public static String jobStatusTableName(String flowExecutionId, String
jobGroup, String jobName) {
return
Joiner.on(ServiceConfigKeys.STATE_STORE_KEY_SEPARATION_CHARACTER).join(flowExecutionId,
jobGroup, jobName, ServiceConfigKeys.STATE_STORE_TABLE_SUFFIX);
}
diff --git
a/gobblin-service/src/test/java/org/apache/gobblin/service/modules/orchestration/DagManagerTest.java
b/gobblin-service/src/test/java/org/apache/gobblin/service/modules/orchestration/DagManagerTest.java
index 53f4d3284..3b685011f 100644
---
a/gobblin-service/src/test/java/org/apache/gobblin/service/modules/orchestration/DagManagerTest.java
+++
b/gobblin-service/src/test/java/org/apache/gobblin/service/modules/orchestration/DagManagerTest.java
@@ -953,7 +953,7 @@ public class DagManagerTest {
Long flowId = System.currentTimeMillis();
Dag<JobExecutionPlan> adhocDag = buildDag(String.valueOf(flowId), flowId,
"FINISH_RUNNING", 1, "proxyUser",
-
ConfigBuilder.create().addPrimitive(ConfigurationKeys.GOBBLIN_FLOW_ISADHOC,
true).build()); //Add a dag to the queue of dags
+
ConfigBuilder.create().addPrimitive(ConfigurationKeys.GOBBLIN_OUTPUT_JOB_LEVEL_METRICS,
false).build()); //Add a dag to the queue of dags
this.queue.offer(adhocDag);
Iterator<JobStatus> jobStatusIterator1 =
@@ -976,7 +976,7 @@ public class DagManagerTest {
Assert.assertNull(metricContext.getParent().get().getGauges().get(flowStateGaugeName0));
Dag<JobExecutionPlan> scheduledDag = buildDag(String.valueOf(flowId+1),
flowId+1, "FINISH_RUNNING", 1, "proxyUser",
-
ConfigBuilder.create().addPrimitive(ConfigurationKeys.GOBBLIN_FLOW_ISADHOC,
false).build());
+
ConfigBuilder.create().addPrimitive(ConfigurationKeys.GOBBLIN_OUTPUT_JOB_LEVEL_METRICS,
true).build());
this.queue.offer(scheduledDag);
this._dagManagerThread.run();
String flowStateGaugeName1 =
MetricRegistry.name(ServiceMetricNames.GOBBLIN_SERVICE_PREFIX,
"group"+(flowId+1),
diff --git
a/gobblin-service/src/test/java/org/apache/gobblin/service/modules/spec/JobExecutionPlanDagFactoryTest.java
b/gobblin-service/src/test/java/org/apache/gobblin/service/modules/spec/JobExecutionPlanDagFactoryTest.java
index d07775fb3..cba53dc67 100644
---
a/gobblin-service/src/test/java/org/apache/gobblin/service/modules/spec/JobExecutionPlanDagFactoryTest.java
+++
b/gobblin-service/src/test/java/org/apache/gobblin/service/modules/spec/JobExecutionPlanDagFactoryTest.java
@@ -199,9 +199,9 @@ public class JobExecutionPlanDagFactoryTest {
Assert.assertEquals(dag2.getStartNodes().size(), 1);
Assert.assertEquals(dag2.getEndNodes().size(), 1);
Assert.assertEquals(dag2.getNodes().size(), 1);
- // Dag1 is scheduled so should be adhoc, but not dag2
-
Assert.assertFalse(dag1.getStartNodes().get(0).getValue().getJobSpec().getConfig().getBoolean(ConfigurationKeys.GOBBLIN_FLOW_ISADHOC));
-
Assert.assertTrue(dag2.getStartNodes().get(0).getValue().getJobSpec().getConfig().getBoolean(ConfigurationKeys.GOBBLIN_FLOW_ISADHOC));
+ // Dag1 is scheduled so should be adhoc and output metrics, but not dag2
+
Assert.assertTrue(dag1.getStartNodes().get(0).getValue().getJobSpec().getConfig().getBoolean(ConfigurationKeys.GOBBLIN_OUTPUT_JOB_LEVEL_METRICS));
+
Assert.assertFalse(dag2.getStartNodes().get(0).getValue().getJobSpec().getConfig().getBoolean(ConfigurationKeys.GOBBLIN_OUTPUT_JOB_LEVEL_METRICS));
}
}
\ No newline at end of file