This is an automated email from the ASF dual-hosted git repository.
vivekrai pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/gobblin.git
The following commit(s) were added to refs/heads/master by this push:
new 179baac40d Emit job metrics at RM level (#4143)
179baac40d is described below
commit 179baac40d09d1542129f6158f46c016e3247fc5
Author: thisisArjit <[email protected]>
AuthorDate: Tue Oct 7 21:36:37 2025 +0530
Emit job metrics at RM level (#4143)
---
.../gobblin/configuration/ConfigurationKeys.java | 1 +
.../apache/gobblin/metrics/event/TimingEvent.java | 1 +
.../apache/gobblin/metrics/ServiceMetricNames.java | 2 ++
.../ddm/activity/impl/GenerateWorkUnitsImpl.java | 42 ++++++++++++++++++++++
4 files changed, 46 insertions(+)
diff --git
a/gobblin-api/src/main/java/org/apache/gobblin/configuration/ConfigurationKeys.java
b/gobblin-api/src/main/java/org/apache/gobblin/configuration/ConfigurationKeys.java
index d33a99c2e5..b46ab759ea 100644
---
a/gobblin-api/src/main/java/org/apache/gobblin/configuration/ConfigurationKeys.java
+++
b/gobblin-api/src/main/java/org/apache/gobblin/configuration/ConfigurationKeys.java
@@ -184,6 +184,7 @@ public class ConfigurationKeys {
public static final String FLOW_UNSCHEDULE_KEY = "flow.unschedule";
public static final String FLOW_OWNING_GROUP_KEY = "flow.owningGroup";
public static final String FLOW_SPEC_EXECUTOR = "flow.edge.specExecutors";
+ public static final String RM_HOST_KEY = "hadoop.resource.manager.rpc";
/**
* Common topology configuration properties.
diff --git
a/gobblin-metrics-libs/gobblin-metrics-base/src/main/java/org/apache/gobblin/metrics/event/TimingEvent.java
b/gobblin-metrics-libs/gobblin-metrics-base/src/main/java/org/apache/gobblin/metrics/event/TimingEvent.java
index 879410c05d..3d68a5b6ae 100644
---
a/gobblin-metrics-libs/gobblin-metrics-base/src/main/java/org/apache/gobblin/metrics/event/TimingEvent.java
+++
b/gobblin-metrics-libs/gobblin-metrics-base/src/main/java/org/apache/gobblin/metrics/event/TimingEvent.java
@@ -92,6 +92,7 @@ public class TimingEvent extends GobblinEventBuilder
implements Closeable {
public static final String JOB_TAG_FIELD = "jobTag";
public static final String JOB_EXECUTION_ID_FIELD = "jobExecutionId";
public static final String SPEC_EXECUTOR_FIELD = "specExecutor";
+ public static final String RM_HOST_FIELD = "rmHost";
public static final String LOW_WATERMARK_FIELD = "lowWatermark";
public static final String HIGH_WATERMARK_FIELD = "highWatermark";
public static final String PROCESSED_COUNT_FIELD = "processedCount";
diff --git
a/gobblin-metrics-libs/gobblin-metrics/src/main/java/org/apache/gobblin/metrics/ServiceMetricNames.java
b/gobblin-metrics-libs/gobblin-metrics/src/main/java/org/apache/gobblin/metrics/ServiceMetricNames.java
index 7809e785f5..e3c729a7a6 100644
---
a/gobblin-metrics-libs/gobblin-metrics/src/main/java/org/apache/gobblin/metrics/ServiceMetricNames.java
+++
b/gobblin-metrics-libs/gobblin-metrics/src/main/java/org/apache/gobblin/metrics/ServiceMetricNames.java
@@ -30,6 +30,8 @@ public class ServiceMetricNames {
public static final String DATA_QUALITY_NON_EVALUATED_FILE_COUNT =
"dataQualityNonEvaluatedFileCount";
public static final String DATA_QUALITY_BYTES_READ = "dataQualityBytesRead";
public static final String DATA_QUALITY_BYTES_WRITTEN =
"dataQualityBytesWritten";
+ // RM metric names
+ public static final String RM_JOB_OBSERVED_COUNT = "rmJobObservedCount";
// Flow Compilation Meters and Timer
public static final String FLOW_COMPILATION_SUCCESSFUL_METER =
GOBBLIN_SERVICE_PREFIX_WITH_DELIMITER + "flowCompilation.successful";
diff --git
a/gobblin-temporal/src/main/java/org/apache/gobblin/temporal/ddm/activity/impl/GenerateWorkUnitsImpl.java
b/gobblin-temporal/src/main/java/org/apache/gobblin/temporal/ddm/activity/impl/GenerateWorkUnitsImpl.java
index c62468305c..9b35921158 100644
---
a/gobblin-temporal/src/main/java/org/apache/gobblin/temporal/ddm/activity/impl/GenerateWorkUnitsImpl.java
+++
b/gobblin-temporal/src/main/java/org/apache/gobblin/temporal/ddm/activity/impl/GenerateWorkUnitsImpl.java
@@ -32,6 +32,8 @@ import java.util.concurrent.atomic.AtomicLong;
import org.apache.hadoop.fs.FileSystem;
import org.apache.hadoop.fs.Path;
+import io.opentelemetry.api.common.Attributes;
+import io.opentelemetry.api.metrics.Meter;
import lombok.Data;
import lombok.extern.slf4j.Slf4j;
@@ -53,6 +55,9 @@ import
org.apache.gobblin.converter.initializer.ConverterInitializer;
import org.apache.gobblin.converter.initializer.ConverterInitializerFactory;
import org.apache.gobblin.initializer.Initializer;
import org.apache.gobblin.destination.DestinationDatasetHandlerService;
+import org.apache.gobblin.metrics.OpenTelemetryMetrics;
+import org.apache.gobblin.metrics.OpenTelemetryMetricsBase;
+import org.apache.gobblin.metrics.ServiceMetricNames;
import org.apache.gobblin.metrics.event.EventSubmitter;
import org.apache.gobblin.metrics.event.TimingEvent;
import org.apache.gobblin.runtime.AbstractJobLauncher;
@@ -80,6 +85,8 @@ import org.apache.gobblin.util.ExecutorsUtils;
import org.apache.gobblin.writer.initializer.WriterInitializer;
import org.apache.gobblin.writer.initializer.WriterInitializerFactory;
+import static
org.apache.gobblin.runtime.JobState.GAAS_OBSERVABILITY_METRICS_GROUPNAME;
+
@Slf4j
@@ -142,6 +149,8 @@ public class GenerateWorkUnitsImpl implements
GenerateWorkUnits {
// TODO: provide for job cancellation (unless handling at the
temporal-level of parent workflows)!
JobState jobState = new JobState(jobProps);
log.info("Created jobState: {}", jobState.toJsonString(true));
+ // emit jobs observed at RM level
+ emitMetrics(jobState);
int heartBeatInterval = JobStateUtils.getHeartBeatInterval(jobState);
heartBeatExecutor.scheduleAtFixedRate(() ->
activityExecutionContext.heartbeat("Running GenerateWorkUnits"),
@@ -354,4 +363,37 @@ public class GenerateWorkUnitsImpl implements
GenerateWorkUnits {
public static int getConfiguredNumSizeSummaryQuantiles(State state) {
return
state.getPropAsInt(GenerateWorkUnits.NUM_WORK_UNITS_SIZE_SUMMARY_QUANTILES,
GenerateWorkUnits.DEFAULT_NUM_WORK_UNITS_SIZE_SUMMARY_QUANTILES);
}
+
+ /**
+ * Emit metrics to indicate jobs observed at RM level
+ * @param jobState job state
+ */
+ private void emitMetrics(JobState jobState) {
+ try {
+ OpenTelemetryMetricsBase otelMetrics =
OpenTelemetryMetrics.getInstance(jobState);
+ if (otelMetrics == null) {
+ log.warn("OpenTelemetry metrics instance is null, skipping metrics
emission");
+ return;
+ }
+
+ Meter meter = otelMetrics.getMeter(GAAS_OBSERVABILITY_METRICS_GROUPNAME);
+ Attributes tags = getEventAttributes(jobState);
+ log.info("Emitting metrics for job: {}", jobState.getJobName());
+ String jobMetricDescription = "Number of Jobs observed on RM";
+ String jobMetricName = ServiceMetricNames.RM_JOB_OBSERVED_COUNT;
+
meter.counterBuilder(jobMetricName).setDescription(jobMetricDescription).build().add(1,
tags);
+ } catch (Exception e) {
+ log.error("Error in emitMetrics for job: {}", jobState.getJobName(), e);
+ }
+ }
+
+ private Attributes getEventAttributes(JobState jobState) {
+ return Attributes.builder()
+ .put(TimingEvent.FlowEventConstants.FLOW_NAME_FIELD,
jobState.getProp(ConfigurationKeys.FLOW_NAME_KEY, "NA"))
+ .put(TimingEvent.FlowEventConstants.FLOW_GROUP_FIELD,
jobState.getProp(ConfigurationKeys.FLOW_GROUP_KEY, "NA"))
+ .put(TimingEvent.FlowEventConstants.FLOW_EXECUTION_ID_FIELD,
jobState.getProp(ConfigurationKeys.FLOW_EXECUTION_ID_KEY, "NA"))
+ .put(TimingEvent.FlowEventConstants.FLOW_FABRIC,
jobState.getProp(ConfigurationKeys.METRICS_REPORTING_OPENTELEMETRY_FABRIC,
"NA"))
+ .put(TimingEvent.FlowEventConstants.RM_HOST_FIELD,
jobState.getProp(ConfigurationKeys.RM_HOST_KEY, "NA"))
+ .build();
+ }
}