This is an automated email from the ASF dual-hosted git repository.

vivekrai pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/gobblin.git


The following commit(s) were added to refs/heads/master by this push:
     new 179baac40d Emit job metrics at RM level (#4143)
179baac40d is described below

commit 179baac40d09d1542129f6158f46c016e3247fc5
Author: thisisArjit <[email protected]>
AuthorDate: Tue Oct 7 21:36:37 2025 +0530

    Emit job metrics at RM level (#4143)
---
 .../gobblin/configuration/ConfigurationKeys.java   |  1 +
 .../apache/gobblin/metrics/event/TimingEvent.java  |  1 +
 .../apache/gobblin/metrics/ServiceMetricNames.java |  2 ++
 .../ddm/activity/impl/GenerateWorkUnitsImpl.java   | 42 ++++++++++++++++++++++
 4 files changed, 46 insertions(+)

diff --git 
a/gobblin-api/src/main/java/org/apache/gobblin/configuration/ConfigurationKeys.java
 
b/gobblin-api/src/main/java/org/apache/gobblin/configuration/ConfigurationKeys.java
index d33a99c2e5..b46ab759ea 100644
--- 
a/gobblin-api/src/main/java/org/apache/gobblin/configuration/ConfigurationKeys.java
+++ 
b/gobblin-api/src/main/java/org/apache/gobblin/configuration/ConfigurationKeys.java
@@ -184,6 +184,7 @@ public class ConfigurationKeys {
   public static final String FLOW_UNSCHEDULE_KEY = "flow.unschedule";
   public static final String FLOW_OWNING_GROUP_KEY = "flow.owningGroup";
   public static final String FLOW_SPEC_EXECUTOR = "flow.edge.specExecutors";
+  public static final String RM_HOST_KEY = "hadoop.resource.manager.rpc";
 
   /**
    * Common topology configuration properties.
diff --git 
a/gobblin-metrics-libs/gobblin-metrics-base/src/main/java/org/apache/gobblin/metrics/event/TimingEvent.java
 
b/gobblin-metrics-libs/gobblin-metrics-base/src/main/java/org/apache/gobblin/metrics/event/TimingEvent.java
index 879410c05d..3d68a5b6ae 100644
--- 
a/gobblin-metrics-libs/gobblin-metrics-base/src/main/java/org/apache/gobblin/metrics/event/TimingEvent.java
+++ 
b/gobblin-metrics-libs/gobblin-metrics-base/src/main/java/org/apache/gobblin/metrics/event/TimingEvent.java
@@ -92,6 +92,7 @@ public class TimingEvent extends GobblinEventBuilder 
implements Closeable {
     public static final String JOB_TAG_FIELD = "jobTag";
     public static final String JOB_EXECUTION_ID_FIELD = "jobExecutionId";
     public static final String SPEC_EXECUTOR_FIELD = "specExecutor";
+    public static final String RM_HOST_FIELD = "rmHost";
     public static final String LOW_WATERMARK_FIELD = "lowWatermark";
     public static final String HIGH_WATERMARK_FIELD = "highWatermark";
     public static final String PROCESSED_COUNT_FIELD = "processedCount";
diff --git 
a/gobblin-metrics-libs/gobblin-metrics/src/main/java/org/apache/gobblin/metrics/ServiceMetricNames.java
 
b/gobblin-metrics-libs/gobblin-metrics/src/main/java/org/apache/gobblin/metrics/ServiceMetricNames.java
index 7809e785f5..e3c729a7a6 100644
--- 
a/gobblin-metrics-libs/gobblin-metrics/src/main/java/org/apache/gobblin/metrics/ServiceMetricNames.java
+++ 
b/gobblin-metrics-libs/gobblin-metrics/src/main/java/org/apache/gobblin/metrics/ServiceMetricNames.java
@@ -30,6 +30,8 @@ public class ServiceMetricNames {
   public static final String DATA_QUALITY_NON_EVALUATED_FILE_COUNT = 
"dataQualityNonEvaluatedFileCount";
   public static final String DATA_QUALITY_BYTES_READ = "dataQualityBytesRead";
   public static final String DATA_QUALITY_BYTES_WRITTEN = 
"dataQualityBytesWritten";
+  // RM metric names
+  public static final String RM_JOB_OBSERVED_COUNT = "rmJobObservedCount";
 
   // Flow Compilation Meters and Timer
   public static final String FLOW_COMPILATION_SUCCESSFUL_METER = 
GOBBLIN_SERVICE_PREFIX_WITH_DELIMITER + "flowCompilation.successful";
diff --git 
a/gobblin-temporal/src/main/java/org/apache/gobblin/temporal/ddm/activity/impl/GenerateWorkUnitsImpl.java
 
b/gobblin-temporal/src/main/java/org/apache/gobblin/temporal/ddm/activity/impl/GenerateWorkUnitsImpl.java
index c62468305c..9b35921158 100644
--- 
a/gobblin-temporal/src/main/java/org/apache/gobblin/temporal/ddm/activity/impl/GenerateWorkUnitsImpl.java
+++ 
b/gobblin-temporal/src/main/java/org/apache/gobblin/temporal/ddm/activity/impl/GenerateWorkUnitsImpl.java
@@ -32,6 +32,8 @@ import java.util.concurrent.atomic.AtomicLong;
 import org.apache.hadoop.fs.FileSystem;
 import org.apache.hadoop.fs.Path;
 
+import io.opentelemetry.api.common.Attributes;
+import io.opentelemetry.api.metrics.Meter;
 import lombok.Data;
 import lombok.extern.slf4j.Slf4j;
 
@@ -53,6 +55,9 @@ import 
org.apache.gobblin.converter.initializer.ConverterInitializer;
 import org.apache.gobblin.converter.initializer.ConverterInitializerFactory;
 import org.apache.gobblin.initializer.Initializer;
 import org.apache.gobblin.destination.DestinationDatasetHandlerService;
+import org.apache.gobblin.metrics.OpenTelemetryMetrics;
+import org.apache.gobblin.metrics.OpenTelemetryMetricsBase;
+import org.apache.gobblin.metrics.ServiceMetricNames;
 import org.apache.gobblin.metrics.event.EventSubmitter;
 import org.apache.gobblin.metrics.event.TimingEvent;
 import org.apache.gobblin.runtime.AbstractJobLauncher;
@@ -80,6 +85,8 @@ import org.apache.gobblin.util.ExecutorsUtils;
 import org.apache.gobblin.writer.initializer.WriterInitializer;
 import org.apache.gobblin.writer.initializer.WriterInitializerFactory;
 
+import static 
org.apache.gobblin.runtime.JobState.GAAS_OBSERVABILITY_METRICS_GROUPNAME;
+
 
 
 @Slf4j
@@ -142,6 +149,8 @@ public class GenerateWorkUnitsImpl implements 
GenerateWorkUnits {
     // TODO: provide for job cancellation (unless handling at the 
temporal-level of parent workflows)!
     JobState jobState = new JobState(jobProps);
     log.info("Created jobState: {}", jobState.toJsonString(true));
+    // emit jobs observed at RM level
+    emitMetrics(jobState);
 
     int heartBeatInterval = JobStateUtils.getHeartBeatInterval(jobState);
     heartBeatExecutor.scheduleAtFixedRate(() -> 
activityExecutionContext.heartbeat("Running GenerateWorkUnits"),
@@ -354,4 +363,37 @@ public class GenerateWorkUnitsImpl implements 
GenerateWorkUnits {
   public static int getConfiguredNumSizeSummaryQuantiles(State state) {
     return 
state.getPropAsInt(GenerateWorkUnits.NUM_WORK_UNITS_SIZE_SUMMARY_QUANTILES, 
GenerateWorkUnits.DEFAULT_NUM_WORK_UNITS_SIZE_SUMMARY_QUANTILES);
   }
+
+  /**
+   * Emit metrics to indicate jobs observed at RM level
+   * @param jobState job state
+   */
+  private void emitMetrics(JobState jobState) {
+    try {
+      OpenTelemetryMetricsBase otelMetrics = 
OpenTelemetryMetrics.getInstance(jobState);
+      if (otelMetrics == null) {
+        log.warn("OpenTelemetry metrics instance is null, skipping metrics 
emission");
+        return;
+      }
+
+      Meter meter = otelMetrics.getMeter(GAAS_OBSERVABILITY_METRICS_GROUPNAME);
+      Attributes tags = getEventAttributes(jobState);
+      log.info("Emitting metrics for job: {}", jobState.getJobName());
+      String jobMetricDescription = "Number of Jobs observed on RM";
+      String jobMetricName = ServiceMetricNames.RM_JOB_OBSERVED_COUNT;
+      
meter.counterBuilder(jobMetricName).setDescription(jobMetricDescription).build().add(1,
 tags);
+    } catch (Exception e) {
+      log.error("Error in emitMetrics for job: {}", jobState.getJobName(), e);
+    }
+  }
+
+  private Attributes getEventAttributes(JobState jobState) {
+    return Attributes.builder()
+        .put(TimingEvent.FlowEventConstants.FLOW_NAME_FIELD, 
jobState.getProp(ConfigurationKeys.FLOW_NAME_KEY, "NA"))
+        .put(TimingEvent.FlowEventConstants.FLOW_GROUP_FIELD, 
jobState.getProp(ConfigurationKeys.FLOW_GROUP_KEY, "NA"))
+        .put(TimingEvent.FlowEventConstants.FLOW_EXECUTION_ID_FIELD, 
jobState.getProp(ConfigurationKeys.FLOW_EXECUTION_ID_KEY, "NA"))
+        .put(TimingEvent.FlowEventConstants.FLOW_FABRIC, 
jobState.getProp(ConfigurationKeys.METRICS_REPORTING_OPENTELEMETRY_FABRIC, 
"NA"))
+        .put(TimingEvent.FlowEventConstants.RM_HOST_FIELD, 
jobState.getProp(ConfigurationKeys.RM_HOST_KEY, "NA"))
+        .build();
+  }
 }

Reply via email to