This is an automated email from the ASF dual-hosted git repository.

abhijain pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/gobblin.git


The following commit(s) were added to refs/heads/master by this push:
     new 9e557b872f [GOBBLIN-2246] Make Orchestrator MDM metric dimesnion 
configurable  (#4164)
9e557b872f is described below

commit 9e557b872f626d3d054fb9d29ba499bd4ca0488e
Author: Gulbarga Adithya Rao <[email protected]>
AuthorDate: Thu Feb 19 16:35:21 2026 +0530

    [GOBBLIN-2246] Make Orchestrator MDM metric dimesnion configurable  (#4164)
    
    Added configurable mdm dimensions to jobSucceeded
---
 .../org/apache/gobblin/runtime/util/GsonUtils.java |   5 +
 .../GaaSJobObservabilityEventProducer.java         | 276 ++++++-
 .../GaaSJobObservabilityProducerTest.java          | 801 +++++++++++++++------
 3 files changed, 865 insertions(+), 217 deletions(-)

diff --git 
a/gobblin-runtime/src/main/java/org/apache/gobblin/runtime/util/GsonUtils.java 
b/gobblin-runtime/src/main/java/org/apache/gobblin/runtime/util/GsonUtils.java
index 32a0672703..72b7aca4bf 100644
--- 
a/gobblin-runtime/src/main/java/org/apache/gobblin/runtime/util/GsonUtils.java
+++ 
b/gobblin-runtime/src/main/java/org/apache/gobblin/runtime/util/GsonUtils.java
@@ -24,6 +24,11 @@ import com.google.gson.GsonBuilder;
 
 public final class GsonUtils {
 
+  /**
+   * Plain GSON serializer for general-purpose JSON parsing where no special 
adapters are needed.
+   */
+  public static final Gson GSON = new Gson();
+
   /**
    * GSON serializer that can convert Java 8 date and time types to ISO 
standard representation.
    */
diff --git 
a/gobblin-service/src/main/java/org/apache/gobblin/service/monitoring/GaaSJobObservabilityEventProducer.java
 
b/gobblin-service/src/main/java/org/apache/gobblin/service/monitoring/GaaSJobObservabilityEventProducer.java
index 860f3ea587..836584bf01 100644
--- 
a/gobblin-service/src/main/java/org/apache/gobblin/service/monitoring/GaaSJobObservabilityEventProducer.java
+++ 
b/gobblin-service/src/main/java/org/apache/gobblin/service/monitoring/GaaSJobObservabilityEventProducer.java
@@ -21,16 +21,21 @@ import java.io.Closeable;
 import java.io.IOException;
 import java.lang.reflect.Type;
 import java.util.ArrayList;
+import java.util.LinkedHashMap;
 import java.util.List;
+import java.util.Map;
 import java.util.Properties;
 import java.util.stream.Collectors;
 
 import org.apache.commons.lang3.StringUtils;
 
 import com.codahale.metrics.MetricRegistry;
+import com.google.common.base.Splitter;
 import com.google.gson.reflect.TypeToken;
+import org.apache.avro.Schema.Field;
 
 import io.opentelemetry.api.common.Attributes;
+import io.opentelemetry.api.common.AttributesBuilder;
 import io.opentelemetry.api.metrics.ObservableLongMeasurement;
 import lombok.extern.slf4j.Slf4j;
 
@@ -77,6 +82,55 @@ public abstract class GaaSJobObservabilityEventProducer 
implements Closeable {
   public static final String GAAS_OBSERVABILITY_METRICS_GROUPNAME = 
GAAS_JOB_OBSERVABILITY_EVENT_PRODUCER_PREFIX + "metrics";
   public static final String GAAS_OBSERVABILITY_JOB_SUCCEEDED_METRIC_NAME = 
"jobSucceeded";
   private static final String DEFAULT_OPENTELEMETRY_ATTRIBUTE_VALUE = "-";
+  private static final Splitter COMMA_SPLITTER = 
Splitter.on(',').omitEmptyStrings().trimResults();
+
+  /**
+   * JSON map config: {@code <mdm_dim_key>: <gaas_obs_event_field_key>}.
+   * This comes from orchestrator configs. This serves as default set of 
dimensions to emit.
+   */
+  public static final String JOB_SUCCEEDED_DIMENSIONS_MAP_KEY =
+          "metrics.reporting.opentelemetry.jobSucceeded.dimensionsMap";
+
+  /**
+   * This comes from orchestrator configs.
+   * Flag to enable/disable per-run extra dimensions for {@link 
#GAAS_OBSERVABILITY_JOB_SUCCEEDED_METRIC_NAME}.
+   */
+  public static final String JOB_SUCCEEDED_EXTRA_DIMENSIONS_ENABLED_KEY =
+          
"metrics.reporting.opentelemetry.jobSucceeded.extraDimensions.enabled";
+
+  /**
+   * This comes from common.properties
+   * Job property (passed by template(/user)) listing extra dimension keys 
(comma-separated).
+   */
+  public static final String JOB_SUCCEEDED_EXTRA_DIMENSIONS_KEYS_JOBPROP =
+          "metrics.reporting.opentelemetry.jobSucceeded.extraDimensions.keys";
+
+  /**
+   * This comes from orchestrator configs.
+   * Maximum number of dimensions to emit for {@code jobSucceeded}. Defaults 
to 20.
+   */
+  public static final String JOB_SUCCEEDED_MAX_DIMENSIONS_KEY =
+          "metrics.reporting.opentelemetry.jobSucceeded.maxDimensions";
+  private static final int DEFAULT_JOB_SUCCEEDED_MAX_DIMENSIONS = 20;
+
+  /**
+   * Baseline dimensions for the {@code jobSucceeded} metric. These should 
always be present for backward compatibility,
+   * even if the orchestrator-provided {@link 
#JOB_SUCCEEDED_DIMENSIONS_MAP_KEY} is incomplete.
+   *
+   * <p>Map values are Avro field names on {@link GaaSJobObservabilityEvent} 
(strict).</p>
+   */
+  private static final Map<String, String> 
JOB_SUCCEEDED_BASELINE_DIMENSIONS_MAP = 
createJobSucceededBaselineDimensionsMap();
+
+  private static Map<String, String> createJobSucceededBaselineDimensionsMap() 
{
+    Map<String, String> baseline = new LinkedHashMap<>();
+    baseline.put(TimingEvent.FlowEventConstants.FLOW_NAME_FIELD, "flowName");
+    baseline.put(TimingEvent.FlowEventConstants.FLOW_GROUP_FIELD, "flowGroup");
+    baseline.put(TimingEvent.FlowEventConstants.JOB_NAME_FIELD, "jobName");
+    baseline.put(TimingEvent.FlowEventConstants.FLOW_EXECUTION_ID_FIELD, 
"flowExecutionId");
+    baseline.put(TimingEvent.FlowEventConstants.SPEC_EXECUTOR_FIELD, 
"executorId");
+    baseline.put(TimingEvent.FlowEventConstants.FLOW_EDGE_FIELD, "flowEdgeId");
+    return baseline;
+  }
 
   protected MetricContext metricContext;
   protected State state;
@@ -138,15 +192,221 @@ public abstract class GaaSJobObservabilityEventProducer 
implements Closeable {
     this.eventCollector.add(event);
   }
 
+  /**
+   * Creates dimensions for the OpenTelemetry event based on the configured 
mapping of MDM dimension keys to GaaS observability event field keys.
+   * If the config is not present or fails to parse, falls back to a default 
set of dimensions using hardcoded mappings.
+   * The dimension values are read from the GaaS observability event.
+   * If a value is missing or cannot be read, it defaults to "NA".
+   * @param event
+   * @return
+   */
   public Attributes getEventAttributes(GaaSJobObservabilityEvent event) {
-    Attributes tags = 
Attributes.builder().put(TimingEvent.FlowEventConstants.FLOW_NAME_FIELD, 
getOrDefault(event.getFlowName(), DEFAULT_OPENTELEMETRY_ATTRIBUTE_VALUE))
-        .put(TimingEvent.FlowEventConstants.FLOW_GROUP_FIELD, 
getOrDefault(event.getFlowGroup(), DEFAULT_OPENTELEMETRY_ATTRIBUTE_VALUE))
-        .put(TimingEvent.FlowEventConstants.JOB_NAME_FIELD, 
getOrDefault(event.getJobName(), DEFAULT_OPENTELEMETRY_ATTRIBUTE_VALUE))
-        .put(TimingEvent.FlowEventConstants.FLOW_EXECUTION_ID_FIELD, 
event.getFlowExecutionId())
-        .put(TimingEvent.FlowEventConstants.SPEC_EXECUTOR_FIELD, 
getOrDefault(event.getExecutorId(), DEFAULT_OPENTELEMETRY_ATTRIBUTE_VALUE))
-        .put(TimingEvent.FlowEventConstants.FLOW_EDGE_FIELD, 
getOrDefault(event.getFlowEdgeId(), DEFAULT_OPENTELEMETRY_ATTRIBUTE_VALUE))
-        .build();
-    return tags;
+    int maxDimensions = getMaxJobSucceededDimensions();
+    Map<String, String> configuredMap = 
getConfiguredJobSucceededDimensionsMap(this.state);
+    if (configuredMap == null || configuredMap.isEmpty()) {
+      // Backward-compatible fallback: always emit the baseline dimensions.
+      // If the extra-dimensions gate is enabled, honor per-run dimensions 
from job properties.
+      AttributesBuilder builder = Attributes.builder();
+      for (Map.Entry<String, String> entry : 
JOB_SUCCEEDED_BASELINE_DIMENSIONS_MAP.entrySet()) {
+        addObsEventFieldAttribute(builder, entry.getKey(), entry.getValue(), 
event);
+      }
+      addExtraDimensionsFromJobProperties(builder, 
JOB_SUCCEEDED_BASELINE_DIMENSIONS_MAP, event, maxDimensions);
+      return builder.build();
+    }
+
+    // Ensure baseline dimensions are always emitted, even if orchestrator 
config is missing some keys.
+    Map<String, String> effectiveMap = new LinkedHashMap<>(configuredMap);
+    for (Map.Entry<String, String> baselineEntry : 
JOB_SUCCEEDED_BASELINE_DIMENSIONS_MAP.entrySet()) {
+      effectiveMap.putIfAbsent(baselineEntry.getKey(), 
baselineEntry.getValue());
+    }
+
+    // If orchestrator config would push us past the cap, drop orchestrator 
dimensions (keep baseline only).
+    if (effectiveMap.size() > maxDimensions) {
+      log.warn("jobSucceeded would emit {} baseline+orchestrator dimensions 
which exceeds maxDimensions={} (`{}`); dropping orchestrator dimensions",
+          effectiveMap.size(), maxDimensions, 
JOB_SUCCEEDED_MAX_DIMENSIONS_KEY);
+      effectiveMap = new 
LinkedHashMap<>(JOB_SUCCEEDED_BASELINE_DIMENSIONS_MAP);
+    }
+
+    AttributesBuilder builder = Attributes.builder();
+    for (Map.Entry<String, String> entry : effectiveMap.entrySet()) {
+      String mdmDimensionKey = entry.getKey();
+      String obsEventFieldKey = entry.getValue();
+      addObsEventFieldAttribute(builder, mdmDimensionKey, obsEventFieldKey, 
event);
+    }
+
+    addExtraDimensionsFromJobProperties(builder, effectiveMap, event, 
maxDimensions);
+
+    return builder.build();
+  }
+
+  private int getMaxJobSucceededDimensions() {
+    int jobSucceededMaxDimensions = DEFAULT_JOB_SUCCEEDED_MAX_DIMENSIONS;
+    try {
+      jobSucceededMaxDimensions = 
this.state.getPropAsInt(JOB_SUCCEEDED_MAX_DIMENSIONS_KEY, 
DEFAULT_JOB_SUCCEEDED_MAX_DIMENSIONS);
+    } catch (Exception e) {
+      log.warn("Invalid `{}` value; using default {}", 
JOB_SUCCEEDED_MAX_DIMENSIONS_KEY, DEFAULT_JOB_SUCCEEDED_MAX_DIMENSIONS, e);
+    }
+    int baselineSize = JOB_SUCCEEDED_BASELINE_DIMENSIONS_MAP.size();
+    if (jobSucceededMaxDimensions < baselineSize) {
+      log.warn("Configured `{}`={} is less than baseline dimension count {}; 
using {} instead",
+          JOB_SUCCEEDED_MAX_DIMENSIONS_KEY, jobSucceededMaxDimensions, 
baselineSize, baselineSize);
+      return baselineSize;
+    }
+    return jobSucceededMaxDimensions;
+  }
+
+  /**
+   * Reads the JSON string config for job succeeded dimensions map, which is 
expected to be in the format of
+   *  {@code <mdm_dim_key>: <gaas_obs_event_field_key>}.
+   * @param state
+   * @return
+   */
+  static Map<String, String> getConfiguredJobSucceededDimensionsMap(State 
state) {
+    String raw = state.getProp(JOB_SUCCEEDED_DIMENSIONS_MAP_KEY, "");
+    if (StringUtils.isBlank(raw)) {
+      return null;
+    }
+    try {
+      Type type = new TypeToken<Map<String, String>>() {}.getType();
+      Map<String, String> parsed = GsonUtils.GSON.fromJson(raw, type);
+      if (parsed == null || parsed.isEmpty()) {
+        return null;
+      }
+      // Normalize into a deterministic insertion-order map.
+      return new LinkedHashMap<>(parsed);
+    } catch (Exception e) {
+      log.warn("Failed parsing jobSucceeded dimensionsMap config `{}`; falling 
back to hardcoded defaults. Raw value: {}",
+              JOB_SUCCEEDED_DIMENSIONS_MAP_KEY, raw, e);
+      return null;
+    }
+  }
+
+  /**
+   * Adds an attribute to the OpenTelemetry event builder based on the mapping 
of MDM dimension key to GaaS observability event field key.
+   * @param builder
+   * @param mdmDimensionKey
+   * @param obsEventFieldKey
+   * @param event
+   */
+  private void addObsEventFieldAttribute(AttributesBuilder builder, String 
mdmDimensionKey, String obsEventFieldKey,
+                                         GaaSJobObservabilityEvent event) {
+    if (StringUtils.isBlank(mdmDimensionKey) || 
StringUtils.isBlank(obsEventFieldKey)) {
+      return;
+    }
+    String normalizedEventKey = obsEventFieldKey.trim();
+
+    Object value = getObsEventFieldValue(normalizedEventKey, event);
+    if (value == null) {
+      builder.put(mdmDimensionKey, DEFAULT_OPENTELEMETRY_ATTRIBUTE_VALUE);
+      return;
+    }
+    if (value instanceof Number) {
+      // OpenTelemetry attributes support numeric types, but for this metric 
we standardize all numeric
+      // dimension values as a LONG attribute for consistency (dimensions are 
expected to be discrete).
+      // Note: if a floating-point value (e.g., Double) ever shows up here, 
the fractional part will be truncated.
+      builder.put(mdmDimensionKey, ((Number) value).longValue());
+      return;
+    }
+    if (value instanceof CharSequence) {
+      builder.put(mdmDimensionKey, getOrDefault(value.toString(), 
DEFAULT_OPENTELEMETRY_ATTRIBUTE_VALUE));
+      return;
+    }
+    builder.put(mdmDimensionKey, getOrDefault(String.valueOf(value), 
DEFAULT_OPENTELEMETRY_ATTRIBUTE_VALUE));
+  }
+
+  /**
+   * Generic getter for {@link GaaSJobObservabilityEvent} fields used for 
dimension resolution.
+   *
+   * <p>Reads values by field name from the Avro schema (SpecificRecord {@code 
getSchema()/get(int)}),
+   */
+  private static Object getObsEventFieldValue(String obsEventFieldKey, 
GaaSJobObservabilityEvent event) {
+    try {
+      Field field = event.getSchema().getField(obsEventFieldKey);
+      if (field == null) {
+        return null;
+      }
+      return event.get(field.pos());
+    } catch (Exception e) {
+      return null;
+    }
+  }
+
+  /**
+   * Reads extra dimension keys from job properties, and adds them as 
attributes if they are not already present in the default dimensions.
+   * The value for the extra dimension is read from job properties using the 
same key.
+   * This allows users to specify additional dimensions on a per-job basis 
without needing to change the service configuration.
+   * @param builder
+   * @param defaultDims
+   * @param event
+   */
+  private void addExtraDimensionsFromJobProperties(AttributesBuilder builder, 
Map<String, String> defaultDims,
+                                                   GaaSJobObservabilityEvent 
event, int maxDimensions) {
+    if 
(!this.state.getPropAsBoolean(JOB_SUCCEEDED_EXTRA_DIMENSIONS_ENABLED_KEY, 
false)) {
+      return;
+    }
+
+    Map<String, String> extraDims = 
getExtraDimensionsFromJobProperties(defaultDims, event);
+    if (extraDims.isEmpty()) {
+      return;
+    }
+
+    int total = defaultDims.size() + extraDims.size();
+    if (total > maxDimensions) {
+      log.warn("jobSucceeded extra dimensions would exceed maxDimensions={} 
(`{}`): default={} + extra={} => {}; skipping all extra dimensions",
+          maxDimensions, JOB_SUCCEEDED_MAX_DIMENSIONS_KEY, defaultDims.size(), 
extraDims.size(), total);
+      return;
+    }
+
+    for (Map.Entry<String, String> entry : extraDims.entrySet()) {
+      builder.put(entry.getKey(), entry.getValue());
+    }
+  }
+
+  private static Map<String, String> 
getExtraDimensionsFromJobProperties(Map<String, String> defaultDims,
+      GaaSJobObservabilityEvent event) {
+    Map<String, String> jobProps = parseJobPropertiesJson(event);
+    if (jobProps == null || jobProps.isEmpty()) {
+      return new LinkedHashMap<>();
+    }
+    String extraDimensionKeys = 
jobProps.getOrDefault(JOB_SUCCEEDED_EXTRA_DIMENSIONS_KEYS_JOBPROP, "");
+    if (StringUtils.isBlank(extraDimensionKeys)) {
+      return new LinkedHashMap<>();
+    }
+
+    Map<String, String> extras = new LinkedHashMap<>();
+    for (String key : COMMA_SPLITTER.split(extraDimensionKeys)) {
+      if (StringUtils.isBlank(key)) {
+        continue;
+      }
+      // Do not override service-default keys (map keys are the emitted OTel 
keys)
+      if (defaultDims.containsKey(key)) {
+        continue;
+      }
+      String value = jobProps.get(key);
+      if (StringUtils.isBlank(value)) {
+        continue;
+      }
+      extras.put(key, value);
+    }
+    return extras;
+  }
+
+  /**
+   * Parses the job properties JSON string from the event into a Map. Returns 
null if the raw string is blank or if parsing fails.
+   * @param event
+   * @return
+   */
+  private static Map<String, String> 
parseJobPropertiesJson(GaaSJobObservabilityEvent event) {
+    try {
+      String raw = event.getJobProperties();
+      if (StringUtils.isBlank(raw)) {
+        return null;
+      }
+      Type type = new TypeToken<Map<String, String>>() {}.getType();
+      return GsonUtils.GSON.fromJson(raw, type);
+    } catch (Exception e) {
+      return null;
+    }
   }
 
   /**
diff --git 
a/gobblin-service/src/test/java/org/apache/gobblin/service/monitoring/GaaSJobObservabilityProducerTest.java
 
b/gobblin-service/src/test/java/org/apache/gobblin/service/monitoring/GaaSJobObservabilityProducerTest.java
index 366a1408f7..f168407f43 100644
--- 
a/gobblin-service/src/test/java/org/apache/gobblin/service/monitoring/GaaSJobObservabilityProducerTest.java
+++ 
b/gobblin-service/src/test/java/org/apache/gobblin/service/monitoring/GaaSJobObservabilityProducerTest.java
@@ -88,64 +88,66 @@ public class GaaSJobObservabilityProducerTest {
 
     State state = new State();
     state.setProp(ServiceConfigKeys.GOBBLIN_SERVICE_INSTANCE_NAME, 
"testCluster");
-    MockGaaSJobObservabilityEventProducer producer = new 
MockGaaSJobObservabilityEventProducer(state, this.issueRepository, false);
-    Map<String, String> gteEventMetadata = Maps.newHashMap();
-    gteEventMetadata.put(TimingEvent.FlowEventConstants.FLOW_GROUP_FIELD, 
flowGroup);
-    gteEventMetadata.put(TimingEvent.FlowEventConstants.FLOW_NAME_FIELD, 
flowName);
-    
gteEventMetadata.put(TimingEvent.FlowEventConstants.FLOW_EXECUTION_ID_FIELD, 
flowExecutionId);
-    gteEventMetadata.put(TimingEvent.FlowEventConstants.JOB_NAME_FIELD, 
jobName);
-    gteEventMetadata.put(TimingEvent.FlowEventConstants.JOB_GROUP_FIELD, 
flowName);
-    gteEventMetadata.put(TimingEvent.FlowEventConstants.FLOW_EDGE_FIELD, 
"sourceNode_destinationNode_flowEdge");
-    gteEventMetadata.put(TimingEvent.FlowEventConstants.SPEC_EXECUTOR_FIELD, 
"specExecutor");
-    gteEventMetadata.put(AzkabanProjectConfig.USER_TO_PROXY, "azkabanUser");
-    gteEventMetadata.put(TimingEvent.METADATA_MESSAGE, "hostName");
-    gteEventMetadata.put(TimingEvent.JOB_START_TIME, "20");
-    gteEventMetadata.put(TimingEvent.JOB_END_TIME, "100");
-    gteEventMetadata.put(JobStatusRetriever.EVENT_NAME_FIELD, 
ExecutionStatus.COMPLETE.name());
-    gteEventMetadata.put(TimingEvent.JOB_ORCHESTRATED_TIME, "1");
-    
gteEventMetadata.put(TimingEvent.FlowEventConstants.FLOW_MODIFICATION_TIME_FIELD,
 "20");
-    gteEventMetadata.put(TimingEvent.DATASET_TASK_SUMMARIES, 
GsonUtils.GSON_WITH_DATE_HANDLING.toJson(summaries));
-    gteEventMetadata.put(JobExecutionPlan.JOB_PROPS_KEY, 
PropertiesUtils.serialize(jobProps));
-    Properties jobStatusProps = new Properties();
-    jobStatusProps.putAll(gteEventMetadata);
-    producer.emitObservabilityEvent(new State(jobStatusProps));
-
-    List<GaaSJobObservabilityEvent> emittedEvents = 
producer.getTestEmittedJobEvents();
-
-    Assert.assertEquals(emittedEvents.size(), 1);
-    Iterator<GaaSJobObservabilityEvent> iterator = emittedEvents.iterator();
-    GaaSJobObservabilityEvent event = iterator.next();
-    Assert.assertEquals(event.getFlowGroup(), flowGroup);
-    Assert.assertEquals(event.getFlowName(), flowName);
-    Assert.assertEquals(event.getJobName(), jobName);
-    Assert.assertEquals(event.getFlowExecutionId(), 
Long.valueOf(flowExecutionId));
-    Assert.assertEquals(event.getJobStatus(), JobStatus.SUCCEEDED);
-    Assert.assertEquals(event.getExecutorUrl(), "hostName");
-    Assert.assertEquals(event.getIssues().size(), 1);
-    Assert.assertEquals(event.getFlowEdgeId(), "flowEdge");
-    Assert.assertEquals(event.getSourceNode(), "sourceNode");
-    Assert.assertEquals(event.getDestinationNode(), "destinationNode");
-    Assert.assertEquals(event.getExecutorId(), "specExecutor");
-    Assert.assertEquals(event.getEffectiveUserUrn(), "azkabanUser");
-    Assert.assertEquals(event.getJobOrchestratedTimestamp(), Long.valueOf(1));
-    Assert.assertEquals(event.getLastFlowModificationTimestamp(), 
Long.valueOf(20));
-    Assert.assertEquals(event.getJobStartTimestamp(), Long.valueOf(20));
-    Assert.assertEquals(event.getJobEndTimestamp(), Long.valueOf(100));
-    Assert.assertEquals(event.getDatasetsMetrics().size(), 2);
-    Assert.assertEquals(event.getDatasetsMetrics().get(0).getDatasetUrn(), 
dataset1.getDatasetUrn());
-    
Assert.assertEquals(event.getDatasetsMetrics().get(0).getEntitiesWritten(), 
Long.valueOf(dataset1.getRecordsWritten()));
-    Assert.assertEquals(event.getDatasetsMetrics().get(0).getBytesWritten(), 
Long.valueOf(dataset1.getBytesWritten()));
-    
Assert.assertEquals(event.getDatasetsMetrics().get(0).getSuccessfullyCommitted(),
 Boolean.valueOf(dataset1.isSuccessfullyCommitted()));
-    Assert.assertEquals(event.getDatasetsMetrics().get(1).getDatasetUrn(), 
dataset2.getDatasetUrn());
-    
Assert.assertEquals(event.getDatasetsMetrics().get(1).getEntitiesWritten(), 
Long.valueOf(dataset2.getRecordsWritten()));
-    Assert.assertEquals(event.getDatasetsMetrics().get(1).getBytesWritten(), 
Long.valueOf(dataset2.getBytesWritten()));
-    
Assert.assertEquals(event.getDatasetsMetrics().get(1).getSuccessfullyCommitted(),
 Boolean.valueOf(dataset2.isSuccessfullyCommitted()));
-    JsonParser.parseString(event.getJobProperties()); // Should not throw
-    Assert.assertEquals(event.getGaasId(), "testCluster");
-    AvroSerializer<GaaSJobObservabilityEvent> serializer = new 
AvroBinarySerializer<>(
-        GaaSJobObservabilityEvent.SCHEMA$, new NoopSchemaVersionWriter()
-    );
-    serializer.serializeRecord(event);
+    try (MockGaaSJobObservabilityEventProducer producer =
+                 new MockGaaSJobObservabilityEventProducer(state, 
this.issueRepository, false)) {
+      Map<String, String> gteEventMetadata = Maps.newHashMap();
+      gteEventMetadata.put(TimingEvent.FlowEventConstants.FLOW_GROUP_FIELD, 
flowGroup);
+      gteEventMetadata.put(TimingEvent.FlowEventConstants.FLOW_NAME_FIELD, 
flowName);
+      
gteEventMetadata.put(TimingEvent.FlowEventConstants.FLOW_EXECUTION_ID_FIELD, 
flowExecutionId);
+      gteEventMetadata.put(TimingEvent.FlowEventConstants.JOB_NAME_FIELD, 
jobName);
+      gteEventMetadata.put(TimingEvent.FlowEventConstants.JOB_GROUP_FIELD, 
flowName);
+      gteEventMetadata.put(TimingEvent.FlowEventConstants.FLOW_EDGE_FIELD, 
"sourceNode_destinationNode_flowEdge");
+      gteEventMetadata.put(TimingEvent.FlowEventConstants.SPEC_EXECUTOR_FIELD, 
"specExecutor");
+      gteEventMetadata.put(AzkabanProjectConfig.USER_TO_PROXY, "azkabanUser");
+      gteEventMetadata.put(TimingEvent.METADATA_MESSAGE, "hostName");
+      gteEventMetadata.put(TimingEvent.JOB_START_TIME, "20");
+      gteEventMetadata.put(TimingEvent.JOB_END_TIME, "100");
+      gteEventMetadata.put(JobStatusRetriever.EVENT_NAME_FIELD, 
ExecutionStatus.COMPLETE.name());
+      gteEventMetadata.put(TimingEvent.JOB_ORCHESTRATED_TIME, "1");
+      
gteEventMetadata.put(TimingEvent.FlowEventConstants.FLOW_MODIFICATION_TIME_FIELD,
 "20");
+      gteEventMetadata.put(TimingEvent.DATASET_TASK_SUMMARIES, 
GsonUtils.GSON_WITH_DATE_HANDLING.toJson(summaries));
+      gteEventMetadata.put(JobExecutionPlan.JOB_PROPS_KEY, 
PropertiesUtils.serialize(jobProps));
+      Properties jobStatusProps = new Properties();
+      jobStatusProps.putAll(gteEventMetadata);
+      producer.emitObservabilityEvent(new State(jobStatusProps));
+
+      List<GaaSJobObservabilityEvent> emittedEvents = 
producer.getTestEmittedJobEvents();
+
+      Assert.assertEquals(emittedEvents.size(), 1);
+      Iterator<GaaSJobObservabilityEvent> iterator = emittedEvents.iterator();
+      GaaSJobObservabilityEvent event = iterator.next();
+      Assert.assertEquals(event.getFlowGroup(), flowGroup);
+      Assert.assertEquals(event.getFlowName(), flowName);
+      Assert.assertEquals(event.getJobName(), jobName);
+      Assert.assertEquals(event.getFlowExecutionId(), 
Long.valueOf(flowExecutionId));
+      Assert.assertEquals(event.getJobStatus(), JobStatus.SUCCEEDED);
+      Assert.assertEquals(event.getExecutorUrl(), "hostName");
+      Assert.assertEquals(event.getIssues().size(), 1);
+      Assert.assertEquals(event.getFlowEdgeId(), "flowEdge");
+      Assert.assertEquals(event.getSourceNode(), "sourceNode");
+      Assert.assertEquals(event.getDestinationNode(), "destinationNode");
+      Assert.assertEquals(event.getExecutorId(), "specExecutor");
+      Assert.assertEquals(event.getEffectiveUserUrn(), "azkabanUser");
+      Assert.assertEquals(event.getJobOrchestratedTimestamp(), 
Long.valueOf(1));
+      Assert.assertEquals(event.getLastFlowModificationTimestamp(), 
Long.valueOf(20));
+      Assert.assertEquals(event.getJobStartTimestamp(), Long.valueOf(20));
+      Assert.assertEquals(event.getJobEndTimestamp(), Long.valueOf(100));
+      Assert.assertEquals(event.getDatasetsMetrics().size(), 2);
+      Assert.assertEquals(event.getDatasetsMetrics().get(0).getDatasetUrn(), 
dataset1.getDatasetUrn());
+      
Assert.assertEquals(event.getDatasetsMetrics().get(0).getEntitiesWritten(), 
Long.valueOf(dataset1.getRecordsWritten()));
+      Assert.assertEquals(event.getDatasetsMetrics().get(0).getBytesWritten(), 
Long.valueOf(dataset1.getBytesWritten()));
+      
Assert.assertEquals(event.getDatasetsMetrics().get(0).getSuccessfullyCommitted(),
 Boolean.valueOf(dataset1.isSuccessfullyCommitted()));
+      Assert.assertEquals(event.getDatasetsMetrics().get(1).getDatasetUrn(), 
dataset2.getDatasetUrn());
+      
Assert.assertEquals(event.getDatasetsMetrics().get(1).getEntitiesWritten(), 
Long.valueOf(dataset2.getRecordsWritten()));
+      Assert.assertEquals(event.getDatasetsMetrics().get(1).getBytesWritten(), 
Long.valueOf(dataset2.getBytesWritten()));
+      
Assert.assertEquals(event.getDatasetsMetrics().get(1).getSuccessfullyCommitted(),
 Boolean.valueOf(dataset2.isSuccessfullyCommitted()));
+      JsonParser.parseString(event.getJobProperties()); // Should not throw
+      Assert.assertEquals(event.getGaasId(), "testCluster");
+      try (AvroSerializer<GaaSJobObservabilityEvent> serializer = new 
AvroBinarySerializer<>(
+              GaaSJobObservabilityEvent.SCHEMA$, new 
NoopSchemaVersionWriter())) {
+        serializer.serializeRecord(event);
+      }
+    }
   }
 
   @Test
@@ -155,47 +157,48 @@ public class GaaSJobObservabilityProducerTest {
     String jobName = String.format("%s_%s_%s", flowGroup, flowName, 
"testJobName1");
     String flowExecutionId = "1";
     this.issueRepository.put(
-        TroubleshooterUtils.getContextIdForJob(flowGroup, flowName, 
flowExecutionId, jobName),
-        createTestIssue("issueSummary", "issueCode", IssueSeverity.INFO)
-    );
-    MockGaaSJobObservabilityEventProducer
-        producer = new MockGaaSJobObservabilityEventProducer(new State(), 
this.issueRepository, false);
-    Map<String, String> gteEventMetadata = Maps.newHashMap();
-    gteEventMetadata.put(TimingEvent.FlowEventConstants.FLOW_GROUP_FIELD, 
flowGroup);
-    gteEventMetadata.put(TimingEvent.FlowEventConstants.FLOW_NAME_FIELD, 
flowName);
-    
gteEventMetadata.put(TimingEvent.FlowEventConstants.FLOW_EXECUTION_ID_FIELD, 
"1");
-    gteEventMetadata.put(TimingEvent.FlowEventConstants.JOB_NAME_FIELD, 
jobName);
-    gteEventMetadata.put(TimingEvent.FlowEventConstants.JOB_GROUP_FIELD, 
flowName);
-    gteEventMetadata.put(TimingEvent.FlowEventConstants.FLOW_EDGE_FIELD, 
"flowEdge");
-    gteEventMetadata.put(TimingEvent.FlowEventConstants.SPEC_EXECUTOR_FIELD, 
"specExecutor");
-    gteEventMetadata.put(JobStatusRetriever.EVENT_NAME_FIELD, 
ExecutionStatus.CANCELLED.name());
-
-    Properties jobStatusProps = new Properties();
-    jobStatusProps.putAll(gteEventMetadata);
-    producer.emitObservabilityEvent(new State(jobStatusProps));
-
-    List<GaaSJobObservabilityEvent> emittedEvents = 
producer.getTestEmittedJobEvents();
-
-    Assert.assertEquals(emittedEvents.size(), 1);
-    Iterator<GaaSJobObservabilityEvent> iterator = emittedEvents.iterator();
-    GaaSJobObservabilityEvent event = iterator.next();
-    Assert.assertEquals(event.getFlowGroup(), flowGroup);
-    Assert.assertEquals(event.getFlowName(), flowName);
-    Assert.assertEquals(event.getJobName(), jobName);
-    Assert.assertEquals(event.getFlowExecutionId(), 
Long.valueOf(flowExecutionId));
-    Assert.assertEquals(event.getJobStatus(), JobStatus.CANCELLED);
-    Assert.assertEquals(event.getIssues().size(), 1);
-    Assert.assertEquals(event.getFlowEdgeId(), "flowEdge");
-    Assert.assertEquals(event.getExecutorId(), "specExecutor");
-    Assert.assertEquals(event.getJobOrchestratedTimestamp(), null);
-    Assert.assertEquals(event.getJobStartTimestamp(), null);
-    Assert.assertEquals(event.getEffectiveUserUrn(), null);
-    Assert.assertEquals(event.getExecutorUrl(), null);
-
-    AvroSerializer<GaaSJobObservabilityEvent> serializer = new 
AvroBinarySerializer<>(
-        GaaSJobObservabilityEvent.SCHEMA$, new NoopSchemaVersionWriter()
+            TroubleshooterUtils.getContextIdForJob(flowGroup, flowName, 
flowExecutionId, jobName),
+            createTestIssue("issueSummary", "issueCode", IssueSeverity.INFO)
     );
-    serializer.serializeRecord(event);
+    try (MockGaaSJobObservabilityEventProducer producer =
+                 new MockGaaSJobObservabilityEventProducer(new State(), 
this.issueRepository, false)) {
+      Map<String, String> gteEventMetadata = Maps.newHashMap();
+      gteEventMetadata.put(TimingEvent.FlowEventConstants.FLOW_GROUP_FIELD, 
flowGroup);
+      gteEventMetadata.put(TimingEvent.FlowEventConstants.FLOW_NAME_FIELD, 
flowName);
+      
gteEventMetadata.put(TimingEvent.FlowEventConstants.FLOW_EXECUTION_ID_FIELD, 
"1");
+      gteEventMetadata.put(TimingEvent.FlowEventConstants.JOB_NAME_FIELD, 
jobName);
+      gteEventMetadata.put(TimingEvent.FlowEventConstants.JOB_GROUP_FIELD, 
flowName);
+      gteEventMetadata.put(TimingEvent.FlowEventConstants.FLOW_EDGE_FIELD, 
"flowEdge");
+      gteEventMetadata.put(TimingEvent.FlowEventConstants.SPEC_EXECUTOR_FIELD, 
"specExecutor");
+      gteEventMetadata.put(JobStatusRetriever.EVENT_NAME_FIELD, 
ExecutionStatus.CANCELLED.name());
+
+      Properties jobStatusProps = new Properties();
+      jobStatusProps.putAll(gteEventMetadata);
+      producer.emitObservabilityEvent(new State(jobStatusProps));
+
+      List<GaaSJobObservabilityEvent> emittedEvents = 
producer.getTestEmittedJobEvents();
+
+      Assert.assertEquals(emittedEvents.size(), 1);
+      Iterator<GaaSJobObservabilityEvent> iterator = emittedEvents.iterator();
+      GaaSJobObservabilityEvent event = iterator.next();
+      Assert.assertEquals(event.getFlowGroup(), flowGroup);
+      Assert.assertEquals(event.getFlowName(), flowName);
+      Assert.assertEquals(event.getJobName(), jobName);
+      Assert.assertEquals(event.getFlowExecutionId(), 
Long.valueOf(flowExecutionId));
+      Assert.assertEquals(event.getJobStatus(), JobStatus.CANCELLED);
+      Assert.assertEquals(event.getIssues().size(), 1);
+      Assert.assertEquals(event.getFlowEdgeId(), "flowEdge");
+      Assert.assertEquals(event.getExecutorId(), "specExecutor");
+      Assert.assertEquals(event.getJobOrchestratedTimestamp(), null);
+      Assert.assertEquals(event.getJobStartTimestamp(), null);
+      Assert.assertEquals(event.getEffectiveUserUrn(), null);
+      Assert.assertEquals(event.getExecutorUrl(), null);
+
+      try (AvroSerializer<GaaSJobObservabilityEvent> serializer = new 
AvroBinarySerializer<>(
+              GaaSJobObservabilityEvent.SCHEMA$, new 
NoopSchemaVersionWriter())) {
+        serializer.serializeRecord(event);
+      }
+    }
   }
 
   @Test
@@ -205,43 +208,44 @@ public class GaaSJobObservabilityProducerTest {
     String jobName = JobStatusRetriever.NA_KEY;
     String flowExecutionId = "1";
     this.issueRepository.put(
-        TroubleshooterUtils.getContextIdForJob(flowGroup, flowName, 
flowExecutionId, jobName),
-        createTestIssue("issueSummary", "issueCode", IssueSeverity.INFO)
+            TroubleshooterUtils.getContextIdForJob(flowGroup, flowName, 
flowExecutionId, jobName),
+            createTestIssue("issueSummary", "issueCode", IssueSeverity.INFO)
     );
     State producerState = new State();
     
producerState.setProp(GaaSJobObservabilityEventProducer.EMIT_FLOW_OBSERVABILITY_EVENT,
 "true");
-    MockGaaSJobObservabilityEventProducer
-        producer = new MockGaaSJobObservabilityEventProducer(producerState, 
this.issueRepository, false);
-    Map<String, String> gteEventMetadata = Maps.newHashMap();
-    gteEventMetadata.put(TimingEvent.FlowEventConstants.FLOW_GROUP_FIELD, 
flowGroup);
-    gteEventMetadata.put(TimingEvent.FlowEventConstants.FLOW_NAME_FIELD, 
flowName);
-    
gteEventMetadata.put(TimingEvent.FlowEventConstants.FLOW_EXECUTION_ID_FIELD, 
"1");
-    gteEventMetadata.put(TimingEvent.FlowEventConstants.JOB_NAME_FIELD, 
jobName);
-    gteEventMetadata.put(TimingEvent.FlowEventConstants.JOB_GROUP_FIELD, 
flowName);
-    gteEventMetadata.put(TimingEvent.FlowEventConstants.SPEC_EXECUTOR_FIELD, 
"specExecutor");
-    gteEventMetadata.put(JobStatusRetriever.EVENT_NAME_FIELD, 
ExecutionStatus.COMPLETE.name());
-    gteEventMetadata.put(SerializationConstants.FLOW_START_TIME_KEY, "1");
-
-    Properties jobStatusProps = new Properties();
-    jobStatusProps.putAll(gteEventMetadata);
-    producer.emitObservabilityEvent(new State(jobStatusProps));
-
-    List<GaaSFlowObservabilityEvent> emittedEvents = 
producer.getTestEmittedFlowEvents();
-
-    Assert.assertEquals(emittedEvents.size(), 1);
-    Iterator<GaaSFlowObservabilityEvent> iterator = emittedEvents.iterator();
-    GaaSFlowObservabilityEvent event = iterator.next();
-    Assert.assertEquals(event.getFlowGroup(), flowGroup);
-    Assert.assertEquals(event.getFlowName(), flowName);
-    Assert.assertEquals(event.getFlowExecutionId(), 
Long.valueOf(flowExecutionId));
-    Assert.assertEquals(event.getFlowStatus(), FlowStatus.SUCCEEDED);
-    Assert.assertNull(event.getEffectiveUserUrn());
-    Assert.assertEquals(event.getFlowStartTimestamp(), Long.valueOf(1));
-
-    AvroSerializer<GaaSFlowObservabilityEvent> serializer = new 
AvroBinarySerializer<>(
-        GaaSFlowObservabilityEvent.SCHEMA$, new NoopSchemaVersionWriter()
-    );
-    serializer.serializeRecord(event);
+    try (MockGaaSJobObservabilityEventProducer producer =
+                 new MockGaaSJobObservabilityEventProducer(producerState, 
this.issueRepository, false)) {
+      Map<String, String> gteEventMetadata = Maps.newHashMap();
+      gteEventMetadata.put(TimingEvent.FlowEventConstants.FLOW_GROUP_FIELD, 
flowGroup);
+      gteEventMetadata.put(TimingEvent.FlowEventConstants.FLOW_NAME_FIELD, 
flowName);
+      
gteEventMetadata.put(TimingEvent.FlowEventConstants.FLOW_EXECUTION_ID_FIELD, 
"1");
+      gteEventMetadata.put(TimingEvent.FlowEventConstants.JOB_NAME_FIELD, 
jobName);
+      gteEventMetadata.put(TimingEvent.FlowEventConstants.JOB_GROUP_FIELD, 
flowName);
+      gteEventMetadata.put(TimingEvent.FlowEventConstants.SPEC_EXECUTOR_FIELD, 
"specExecutor");
+      gteEventMetadata.put(JobStatusRetriever.EVENT_NAME_FIELD, 
ExecutionStatus.COMPLETE.name());
+      gteEventMetadata.put(SerializationConstants.FLOW_START_TIME_KEY, "1");
+
+      Properties jobStatusProps = new Properties();
+      jobStatusProps.putAll(gteEventMetadata);
+      producer.emitObservabilityEvent(new State(jobStatusProps));
+
+      List<GaaSFlowObservabilityEvent> emittedEvents = 
producer.getTestEmittedFlowEvents();
+
+      Assert.assertEquals(emittedEvents.size(), 1);
+      Iterator<GaaSFlowObservabilityEvent> iterator = emittedEvents.iterator();
+      GaaSFlowObservabilityEvent event = iterator.next();
+      Assert.assertEquals(event.getFlowGroup(), flowGroup);
+      Assert.assertEquals(event.getFlowName(), flowName);
+      Assert.assertEquals(event.getFlowExecutionId(), 
Long.valueOf(flowExecutionId));
+      Assert.assertEquals(event.getFlowStatus(), FlowStatus.SUCCEEDED);
+      Assert.assertNull(event.getEffectiveUserUrn());
+      Assert.assertEquals(event.getFlowStartTimestamp(), Long.valueOf(1));
+
+      try (AvroSerializer<GaaSFlowObservabilityEvent> serializer = new 
AvroBinarySerializer<>(
+              GaaSFlowObservabilityEvent.SCHEMA$, new 
NoopSchemaVersionWriter())) {
+        serializer.serializeRecord(event);
+      }
+    }
   }
 
   @Test
@@ -251,31 +255,31 @@ public class GaaSJobObservabilityProducerTest {
     String jobName = String.format("%s_%s_%s", flowGroup, flowName, 
"testJobName1");
     String flowExecutionId = "1";
     this.issueRepository.put(
-        TroubleshooterUtils.getContextIdForJob(flowGroup, flowName, 
flowExecutionId, jobName),
-        createTestIssue("issueSummary", "issueCode", IssueSeverity.INFO)
+            TroubleshooterUtils.getContextIdForJob(flowGroup, flowName, 
flowExecutionId, jobName),
+            createTestIssue("issueSummary", "issueCode", IssueSeverity.INFO)
     );
     State producerState = new State();
     
producerState.setProp(ConfigurationKeys.METRICS_REPORTING_OPENTELEMETRY_ENABLED,
 "true");
     
producerState.setProp(ConfigurationKeys.METRICS_REPORTING_OPENTELEMETRY_ENDPOINT,
 "http://localhost:5000";);
 
-    MockGaaSJobObservabilityEventProducer
-        producer = new MockGaaSJobObservabilityEventProducer(producerState, 
this.issueRepository, true);
-
-    Map<String, String> gteEventMetadata = Maps.newHashMap();
-    gteEventMetadata.put(TimingEvent.FlowEventConstants.FLOW_GROUP_FIELD, 
flowGroup);
-    gteEventMetadata.put(TimingEvent.FlowEventConstants.FLOW_NAME_FIELD, 
flowName);
-    
gteEventMetadata.put(TimingEvent.FlowEventConstants.FLOW_EXECUTION_ID_FIELD, 
"1");
-    gteEventMetadata.put(TimingEvent.FlowEventConstants.JOB_NAME_FIELD, 
jobName);
-    gteEventMetadata.put(TimingEvent.FlowEventConstants.JOB_GROUP_FIELD, 
flowName);
-    gteEventMetadata.put(TimingEvent.FlowEventConstants.FLOW_EDGE_FIELD, 
"flowEdge");
-    gteEventMetadata.put(TimingEvent.FlowEventConstants.SPEC_EXECUTOR_FIELD, 
"specExecutor");
-    gteEventMetadata.put(JobStatusRetriever.EVENT_NAME_FIELD, 
ExecutionStatus.CANCELLED.name());
-
-    Properties jobStatusProps = new Properties();
-    jobStatusProps.putAll(gteEventMetadata);
-
-    // Ensure that this doesn't throw due to NPE
-    producer.emitObservabilityEvent(new State(jobStatusProps));
+    try (MockGaaSJobObservabilityEventProducer producer =
+                 new MockGaaSJobObservabilityEventProducer(producerState, 
this.issueRepository, true)) {
+      Map<String, String> gteEventMetadata = Maps.newHashMap();
+      gteEventMetadata.put(TimingEvent.FlowEventConstants.FLOW_GROUP_FIELD, 
flowGroup);
+      gteEventMetadata.put(TimingEvent.FlowEventConstants.FLOW_NAME_FIELD, 
flowName);
+      
gteEventMetadata.put(TimingEvent.FlowEventConstants.FLOW_EXECUTION_ID_FIELD, 
"1");
+      gteEventMetadata.put(TimingEvent.FlowEventConstants.JOB_NAME_FIELD, 
jobName);
+      gteEventMetadata.put(TimingEvent.FlowEventConstants.JOB_GROUP_FIELD, 
flowName);
+      gteEventMetadata.put(TimingEvent.FlowEventConstants.FLOW_EDGE_FIELD, 
"flowEdge");
+      gteEventMetadata.put(TimingEvent.FlowEventConstants.SPEC_EXECUTOR_FIELD, 
"specExecutor");
+      gteEventMetadata.put(JobStatusRetriever.EVENT_NAME_FIELD, 
ExecutionStatus.CANCELLED.name());
+
+      Properties jobStatusProps = new Properties();
+      jobStatusProps.putAll(gteEventMetadata);
+
+      // Ensure that this doesn't throw due to NPE
+      producer.emitObservabilityEvent(new State(jobStatusProps));
+    }
   }
 
   @Test
@@ -285,67 +289,446 @@ public class GaaSJobObservabilityProducerTest {
     String jobName = String.format("%s_%s_%s", flowGroup, flowName, 
"testJobName1");
     String flowExecutionId = "1";
     this.issueRepository.put(
-        TroubleshooterUtils.getContextIdForJob(flowGroup, flowName, 
flowExecutionId, jobName),
-        createTestIssue("issueSummary", "issueCode", IssueSeverity.INFO)
+            TroubleshooterUtils.getContextIdForJob(flowGroup, flowName, 
flowExecutionId, jobName),
+            createTestIssue("issueSummary", "issueCode", IssueSeverity.INFO)
     );
     State producerState = new State();
     
producerState.setProp(ConfigurationKeys.METRICS_REPORTING_OPENTELEMETRY_ENABLED,
 "true");
 
-    MockGaaSJobObservabilityEventProducer
-        producer = new MockGaaSJobObservabilityEventProducer(producerState, 
this.issueRepository, true);
-    Map<String, String> gteEventMetadata = Maps.newHashMap();
-    gteEventMetadata.put(TimingEvent.FlowEventConstants.FLOW_GROUP_FIELD, 
flowGroup);
-    gteEventMetadata.put(TimingEvent.FlowEventConstants.FLOW_NAME_FIELD, 
flowName);
-    
gteEventMetadata.put(TimingEvent.FlowEventConstants.FLOW_EXECUTION_ID_FIELD, 
"1");
-    gteEventMetadata.put(TimingEvent.FlowEventConstants.JOB_NAME_FIELD, 
jobName);
-    gteEventMetadata.put(TimingEvent.FlowEventConstants.JOB_GROUP_FIELD, 
flowName);
-    gteEventMetadata.put(TimingEvent.FlowEventConstants.FLOW_EDGE_FIELD, 
"flowEdge");
-    gteEventMetadata.put(TimingEvent.FlowEventConstants.SPEC_EXECUTOR_FIELD, 
"specExecutor");
-    gteEventMetadata.put(JobStatusRetriever.EVENT_NAME_FIELD, 
ExecutionStatus.CANCELLED.name());
-
-    Map<String, String> gteEventMetadata2 = Maps.newHashMap();
-    gteEventMetadata2.put(TimingEvent.FlowEventConstants.FLOW_GROUP_FIELD, 
flowGroup);
-    gteEventMetadata2.put(TimingEvent.FlowEventConstants.FLOW_NAME_FIELD, 
flowName);
-    
gteEventMetadata2.put(TimingEvent.FlowEventConstants.FLOW_EXECUTION_ID_FIELD, 
"2");
-    gteEventMetadata2.put(TimingEvent.FlowEventConstants.JOB_NAME_FIELD, 
jobName);
-    gteEventMetadata2.put(TimingEvent.FlowEventConstants.JOB_GROUP_FIELD, 
flowName);
-    gteEventMetadata2.put(TimingEvent.FlowEventConstants.FLOW_EDGE_FIELD, 
"flowEdge");
-    gteEventMetadata2.put(TimingEvent.FlowEventConstants.SPEC_EXECUTOR_FIELD, 
"specExecutor");
-    gteEventMetadata2.put(JobStatusRetriever.EVENT_NAME_FIELD, 
ExecutionStatus.COMPLETE.name());
-
-    Properties jobStatusProps = new Properties();
-    Properties jobStatusProps2 = new Properties();
-    jobStatusProps.putAll(gteEventMetadata);    // Ensure that this doesn't 
throw due to NPE
-    producer.emitObservabilityEvent(new State(jobStatusProps));
-    jobStatusProps2.putAll(gteEventMetadata2);
-    producer.emitObservabilityEvent(new State(jobStatusProps2));
-    Collection<MetricData> metrics = 
producer.getOpentelemetryMetrics().metricReader.collectAllMetrics();
-    // Check number of meters
-    Assert.assertEquals(metrics.size(), 1);
-    Map<String, MetricData > metricsByName = 
metrics.stream().collect(Collectors.toMap(metric -> metric.getName(), 
metricData -> metricData));
-    MetricData jobStatusMetric = metricsByName.get("jobSucceeded");
-    // Check the attributes of the metrics
-    List<LongPointData> datapoints = 
jobStatusMetric.getLongGaugeData().getPoints().stream().collect(Collectors.toList());
-    Assert.assertEquals(datapoints.size(), 2);
-    // Check that the values are different for the two events (order not 
guaranteed for the same collection event)
-    Assert.assertNotEquals(datapoints.get(0).getValue(), 
datapoints.get(1).getValue());
-    
Assert.assertNotEquals(datapoints.get(0).getAttributes().asMap().get(AttributeKey.longKey("flowExecutionId")),
-        
datapoints.get(1).getAttributes().asMap().get(AttributeKey.longKey("flowExecutionId")));
-
-    // Check common string tag
-    
Assert.assertEquals(datapoints.get(0).getAttributes().asMap().get(AttributeKey.stringKey("flowGroup")),
 flowGroup);
-    
Assert.assertEquals(datapoints.get(1).getAttributes().asMap().get(AttributeKey.stringKey("flowGroup")),
 flowGroup);
-    datapoints.forEach(point -> {
-      if 
(point.getAttributes().asMap().get(AttributeKey.longKey("flowExecutionId")).equals(1L))
 {
-        Assert.assertEquals(point.getValue(), 0); // Cancelled job should show 
up as a 0
-      } else if 
(point.getAttributes().asMap().get(AttributeKey.longKey("flowExecutionId")).equals(2L))
 {
-        Assert.assertEquals(point.getValue(), 1L); // Completed job should 
show up as a 1
-      }
-      
Assert.assertEquals(point.getAttributes().asMap().get(AttributeKey.stringKey("flowName")),
 flowName);
-      
Assert.assertEquals(point.getAttributes().asMap().get(AttributeKey.stringKey("jobName")),
 jobName);
-      
Assert.assertEquals(point.getAttributes().asMap().get(AttributeKey.stringKey("flowEdge")),
 "flowEdge");
-      
Assert.assertEquals(point.getAttributes().asMap().get(AttributeKey.stringKey("specExecutor")),
 "specExecutor");
-    });
+    try (MockGaaSJobObservabilityEventProducer producer =
+                 new MockGaaSJobObservabilityEventProducer(producerState, 
this.issueRepository, true)) {
+      Map<String, String> gteEventMetadata = Maps.newHashMap();
+      gteEventMetadata.put(TimingEvent.FlowEventConstants.FLOW_GROUP_FIELD, 
flowGroup);
+      gteEventMetadata.put(TimingEvent.FlowEventConstants.FLOW_NAME_FIELD, 
flowName);
+      
gteEventMetadata.put(TimingEvent.FlowEventConstants.FLOW_EXECUTION_ID_FIELD, 
"1");
+      gteEventMetadata.put(TimingEvent.FlowEventConstants.JOB_NAME_FIELD, 
jobName);
+      gteEventMetadata.put(TimingEvent.FlowEventConstants.JOB_GROUP_FIELD, 
flowName);
+      gteEventMetadata.put(TimingEvent.FlowEventConstants.FLOW_EDGE_FIELD, 
"flowEdge");
+      gteEventMetadata.put(TimingEvent.FlowEventConstants.SPEC_EXECUTOR_FIELD, 
"specExecutor");
+      gteEventMetadata.put(JobStatusRetriever.EVENT_NAME_FIELD, 
ExecutionStatus.CANCELLED.name());
+
+      Map<String, String> gteEventMetadata2 = Maps.newHashMap();
+      gteEventMetadata2.put(TimingEvent.FlowEventConstants.FLOW_GROUP_FIELD, 
flowGroup);
+      gteEventMetadata2.put(TimingEvent.FlowEventConstants.FLOW_NAME_FIELD, 
flowName);
+      
gteEventMetadata2.put(TimingEvent.FlowEventConstants.FLOW_EXECUTION_ID_FIELD, 
"2");
+      gteEventMetadata2.put(TimingEvent.FlowEventConstants.JOB_NAME_FIELD, 
jobName);
+      gteEventMetadata2.put(TimingEvent.FlowEventConstants.JOB_GROUP_FIELD, 
flowName);
+      gteEventMetadata2.put(TimingEvent.FlowEventConstants.FLOW_EDGE_FIELD, 
"flowEdge");
+      
gteEventMetadata2.put(TimingEvent.FlowEventConstants.SPEC_EXECUTOR_FIELD, 
"specExecutor");
+      gteEventMetadata2.put(JobStatusRetriever.EVENT_NAME_FIELD, 
ExecutionStatus.COMPLETE.name());
+
+      Properties jobStatusProps = new Properties();
+      Properties jobStatusProps2 = new Properties();
+      jobStatusProps.putAll(gteEventMetadata);    // Ensure that this doesn't 
throw due to NPE
+      producer.emitObservabilityEvent(new State(jobStatusProps));
+      jobStatusProps2.putAll(gteEventMetadata2);
+      producer.emitObservabilityEvent(new State(jobStatusProps2));
+      Collection<MetricData> metrics = 
producer.getOpentelemetryMetrics().metricReader.collectAllMetrics();
+      // Check number of meters
+      Assert.assertEquals(metrics.size(), 1);
+      Map<String, MetricData > metricsByName = 
metrics.stream().collect(Collectors.toMap(metric -> metric.getName(), 
metricData -> metricData));
+      MetricData jobStatusMetric = metricsByName.get("jobSucceeded");
+      // Check the attributes of the metrics
+      List<LongPointData> datapoints = 
jobStatusMetric.getLongGaugeData().getPoints().stream().collect(Collectors.toList());
+      Assert.assertEquals(datapoints.size(), 2);
+      // Check that the values are different for the two events (order not 
guaranteed for the same collection event)
+      Assert.assertNotEquals(datapoints.get(0).getValue(), 
datapoints.get(1).getValue());
+      
Assert.assertNotEquals(datapoints.get(0).getAttributes().asMap().get(AttributeKey.longKey("flowExecutionId")),
+              
datapoints.get(1).getAttributes().asMap().get(AttributeKey.longKey("flowExecutionId")));
+
+      // Check common string tag
+      
Assert.assertEquals(datapoints.get(0).getAttributes().asMap().get(AttributeKey.stringKey("flowGroup")),
 flowGroup);
+      
Assert.assertEquals(datapoints.get(1).getAttributes().asMap().get(AttributeKey.stringKey("flowGroup")),
 flowGroup);
+      datapoints.forEach(point -> {
+        if 
(point.getAttributes().asMap().get(AttributeKey.longKey("flowExecutionId")).equals(1L))
 {
+          Assert.assertEquals(point.getValue(), 0); // Cancelled job should 
show up as a 0
+        } else if 
(point.getAttributes().asMap().get(AttributeKey.longKey("flowExecutionId")).equals(2L))
 {
+          Assert.assertEquals(point.getValue(), 1L); // Completed job should 
show up as a 1
+        }
+        
Assert.assertEquals(point.getAttributes().asMap().get(AttributeKey.stringKey("flowName")),
 flowName);
+        
Assert.assertEquals(point.getAttributes().asMap().get(AttributeKey.stringKey("jobName")),
 jobName);
+        
Assert.assertEquals(point.getAttributes().asMap().get(AttributeKey.stringKey("flowEdge")),
 "flowEdge");
+        
Assert.assertEquals(point.getAttributes().asMap().get(AttributeKey.stringKey("specExecutor")),
 "specExecutor");
+      });
+    }
+  }
+
+  @Test
+  public void testMockProduceMetrics_dimensionsMapNoFallback() throws 
Exception {
+    String flowGroup = "testFlowGroupMapNoFallback";
+    String flowName = "testFlowNameMapNoFallback";
+    String jobName = String.format("%s_%s_%s", flowGroup, flowName, 
"testJobNameMapNoFallback");
+
+    State producerState = new State();
+    
producerState.setProp(ConfigurationKeys.METRICS_REPORTING_OPENTELEMETRY_ENABLED,
 "true");
+    // Use a non-baseline field to prove we're not falling back to the 
hardcoded attributes,
+    // and that non-baseline mappings are honored.
+    
producerState.setProp(GaaSJobObservabilityEventProducer.JOB_SUCCEEDED_DIMENSIONS_MAP_KEY,
+            "{\"status\":\"jobStatus\"}");
+    
producerState.setProp(GaaSJobObservabilityEventProducer.JOB_SUCCEEDED_EXTRA_DIMENSIONS_ENABLED_KEY,
 "true");
+
+    try (MockGaaSJobObservabilityEventProducer producer =
+                 new MockGaaSJobObservabilityEventProducer(producerState, 
this.issueRepository, true)) {
+      Properties jobProps = new Properties();
+      
jobProps.setProperty(GaaSJobObservabilityEventProducer.JOB_SUCCEEDED_EXTRA_DIMENSIONS_KEYS_JOBPROP,
 "java_version,tag");
+      jobProps.setProperty("java_version", "11");
+      jobProps.setProperty("tag", "foo");
+
+      Map<String, String> gteEventMetadata = Maps.newHashMap();
+      gteEventMetadata.put(TimingEvent.FlowEventConstants.FLOW_GROUP_FIELD, 
flowGroup);
+      gteEventMetadata.put(TimingEvent.FlowEventConstants.FLOW_NAME_FIELD, 
flowName);
+      
gteEventMetadata.put(TimingEvent.FlowEventConstants.FLOW_EXECUTION_ID_FIELD, 
"1");
+      gteEventMetadata.put(TimingEvent.FlowEventConstants.JOB_NAME_FIELD, 
jobName);
+      gteEventMetadata.put(TimingEvent.FlowEventConstants.JOB_GROUP_FIELD, 
flowName);
+      gteEventMetadata.put(TimingEvent.FlowEventConstants.FLOW_EDGE_FIELD, 
"flowEdge");
+      gteEventMetadata.put(TimingEvent.FlowEventConstants.SPEC_EXECUTOR_FIELD, 
"specExecutor");
+      gteEventMetadata.put(JobStatusRetriever.EVENT_NAME_FIELD, 
ExecutionStatus.COMPLETE.name());
+      gteEventMetadata.put(JobExecutionPlan.JOB_PROPS_KEY, 
PropertiesUtils.serialize(jobProps));
+
+      Properties jobStatusProps = new Properties();
+      jobStatusProps.putAll(gteEventMetadata);
+      producer.emitObservabilityEvent(new State(jobStatusProps));
+
+      Collection<MetricData> metrics = 
producer.getOpentelemetryMetrics().metricReader.collectAllMetrics();
+      Map<String, MetricData> metricsByName =
+              metrics.stream().collect(Collectors.toMap(metric -> 
metric.getName(), metricData -> metricData));
+      MetricData jobStatusMetric = metricsByName.get("jobSucceeded");
+      List<LongPointData> datapoints = 
jobStatusMetric.getLongGaugeData().getPoints().stream().collect(Collectors.toList());
+      Assert.assertEquals(datapoints.size(), 1);
+
+      Map<AttributeKey<?>, Object> attrs = 
datapoints.get(0).getAttributes().asMap();
+
+      // From dimensionsMap
+      Assert.assertEquals(attrs.get(AttributeKey.stringKey("status")), 
"SUCCEEDED");
+
+      // Baseline dimensions should always be present (backfilled if missing 
from the dimensionsMap)
+      Assert.assertEquals(attrs.get(AttributeKey.stringKey("specExecutor")), 
"specExecutor");
+      Assert.assertEquals(attrs.get(AttributeKey.stringKey("flowEdge")), 
"flowEdge");
+
+      // Extra dimensions from job props
+      Assert.assertEquals(attrs.get(AttributeKey.stringKey("java_version")), 
"11");
+      Assert.assertEquals(attrs.get(AttributeKey.stringKey("tag")), "foo");
+    }
+  }
+
+  @Test
+  public void 
testMockProduceMetrics_dimensionsMapMissingBaselineDimsStillEmitted() throws 
Exception {
+    String flowGroup = "testFlowGroupBaselineBackfill";
+    String flowName = "testFlowNameBaselineBackfill";
+    String jobName = String.format("%s_%s_%s", flowGroup, flowName, 
"testJobNameBaselineBackfill");
+
+    State producerState = new State();
+    
producerState.setProp(ConfigurationKeys.METRICS_REPORTING_OPENTELEMETRY_ENABLED,
 "true");
+    // Intentionally omit baseline keys like 
flowName/flowGroup/jobName/flowExecutionId to ensure they are backfilled.
+    
producerState.setProp(GaaSJobObservabilityEventProducer.JOB_SUCCEEDED_DIMENSIONS_MAP_KEY,
+        "{\"status\":\"jobStatus\"}");
+
+    try (MockGaaSJobObservabilityEventProducer producer =
+        new MockGaaSJobObservabilityEventProducer(producerState, 
this.issueRepository, true)) {
+      Map<String, String> gteEventMetadata = Maps.newHashMap();
+      gteEventMetadata.put(TimingEvent.FlowEventConstants.FLOW_GROUP_FIELD, 
flowGroup);
+      gteEventMetadata.put(TimingEvent.FlowEventConstants.FLOW_NAME_FIELD, 
flowName);
+      
gteEventMetadata.put(TimingEvent.FlowEventConstants.FLOW_EXECUTION_ID_FIELD, 
"1");
+      gteEventMetadata.put(TimingEvent.FlowEventConstants.JOB_NAME_FIELD, 
jobName);
+      gteEventMetadata.put(TimingEvent.FlowEventConstants.JOB_GROUP_FIELD, 
flowName);
+      gteEventMetadata.put(TimingEvent.FlowEventConstants.FLOW_EDGE_FIELD, 
"flowEdge");
+      gteEventMetadata.put(TimingEvent.FlowEventConstants.SPEC_EXECUTOR_FIELD, 
"specExecutor");
+      gteEventMetadata.put(JobStatusRetriever.EVENT_NAME_FIELD, 
ExecutionStatus.COMPLETE.name());
+
+      Properties jobStatusProps = new Properties();
+      jobStatusProps.putAll(gteEventMetadata);
+      producer.emitObservabilityEvent(new State(jobStatusProps));
+
+      Collection<MetricData> metrics = 
producer.getOpentelemetryMetrics().metricReader.collectAllMetrics();
+      Map<String, MetricData> metricsByName =
+          metrics.stream().collect(Collectors.toMap(metric -> 
metric.getName(), metricData -> metricData));
+      MetricData jobStatusMetric = metricsByName.get("jobSucceeded");
+      List<LongPointData> datapoints = 
jobStatusMetric.getLongGaugeData().getPoints().stream().collect(Collectors.toList());
+      Assert.assertEquals(datapoints.size(), 1);
+      Map<AttributeKey<?>, Object> attrs = 
datapoints.get(0).getAttributes().asMap();
+
+      // Configured key
+      Assert.assertEquals(attrs.get(AttributeKey.stringKey("status")), 
"SUCCEEDED");
+
+      // Backfilled baseline keys
+      Assert.assertEquals(attrs.get(AttributeKey.stringKey("flowName")), 
flowName);
+      Assert.assertEquals(attrs.get(AttributeKey.stringKey("flowGroup")), 
flowGroup);
+      Assert.assertEquals(attrs.get(AttributeKey.stringKey("jobName")), 
jobName);
+      Assert.assertEquals(attrs.get(AttributeKey.longKey("flowExecutionId")), 
1L);
+      Assert.assertEquals(attrs.get(AttributeKey.stringKey("specExecutor")), 
"specExecutor");
+      Assert.assertEquals(attrs.get(AttributeKey.stringKey("flowEdge")), 
"flowEdge");
+    }
+  }
+
+  @Test
+  public void testMockProduceMetrics_dimensionsMapDuplicateObsFieldAllowed() 
throws Exception {
+    String flowGroup = "testFlowGroupDupObsFieldAllowed";
+    String flowName = "testFlowNameDupObsFieldAllowed";
+    String jobName = String.format("%s_%s_%s", flowGroup, flowName, 
"testJobNameDupObsFieldAllowed");
+
+    State producerState = new State();
+    
producerState.setProp(ConfigurationKeys.METRICS_REPORTING_OPENTELEMETRY_ENABLED,
 "true");
+    // Map baseline source fields under additional keys; these should be 
allowed (multiple dimension keys may map to the same event field).
+    
producerState.setProp(GaaSJobObservabilityEventProducer.JOB_SUCCEEDED_DIMENSIONS_MAP_KEY,
+        "{\"edgeId\":\"flowEdgeId\",\"executor\":\"executorId\"}");
+
+    try (MockGaaSJobObservabilityEventProducer producer =
+        new MockGaaSJobObservabilityEventProducer(producerState, 
this.issueRepository, true)) {
+      Map<String, String> gteEventMetadata = Maps.newHashMap();
+      gteEventMetadata.put(TimingEvent.FlowEventConstants.FLOW_GROUP_FIELD, 
flowGroup);
+      gteEventMetadata.put(TimingEvent.FlowEventConstants.FLOW_NAME_FIELD, 
flowName);
+      
gteEventMetadata.put(TimingEvent.FlowEventConstants.FLOW_EXECUTION_ID_FIELD, 
"1");
+      gteEventMetadata.put(TimingEvent.FlowEventConstants.JOB_NAME_FIELD, 
jobName);
+      gteEventMetadata.put(TimingEvent.FlowEventConstants.JOB_GROUP_FIELD, 
flowName);
+      gteEventMetadata.put(TimingEvent.FlowEventConstants.FLOW_EDGE_FIELD, 
"flowEdge");
+      gteEventMetadata.put(TimingEvent.FlowEventConstants.SPEC_EXECUTOR_FIELD, 
"specExecutor");
+      gteEventMetadata.put(JobStatusRetriever.EVENT_NAME_FIELD, 
ExecutionStatus.COMPLETE.name());
+
+      Properties jobStatusProps = new Properties();
+      jobStatusProps.putAll(gteEventMetadata);
+      producer.emitObservabilityEvent(new State(jobStatusProps));
+
+      Collection<MetricData> metrics = 
producer.getOpentelemetryMetrics().metricReader.collectAllMetrics();
+      Map<String, MetricData> metricsByName =
+          metrics.stream().collect(Collectors.toMap(MetricData::getName, 
metricData -> metricData));
+      MetricData jobStatusMetric = metricsByName.get("jobSucceeded");
+      List<LongPointData> datapoints = 
jobStatusMetric.getLongGaugeData().getPoints().stream().collect(Collectors.toList());
+      Assert.assertEquals(datapoints.size(), 1);
+      Map<AttributeKey<?>, Object> attrs = 
datapoints.get(0).getAttributes().asMap();
+
+      // Orchestrator configured keys present.
+      Assert.assertEquals(attrs.get(AttributeKey.stringKey("edgeId")), 
"flowEdge");
+      Assert.assertEquals(attrs.get(AttributeKey.stringKey("executor")), 
"specExecutor");
+
+      // Baseline keys still present.
+      Assert.assertEquals(attrs.get(AttributeKey.stringKey("flowEdge")), 
"flowEdge");
+      Assert.assertEquals(attrs.get(AttributeKey.stringKey("specExecutor")), 
"specExecutor");
+    }
+  }
+
+  @Test
+  public void 
testMockProduceMetrics_noDimensionsMapExtraDimsStillHonoredWhenGateEnabled() 
throws Exception {
+    String flowGroup = "testFlowGroupNoMapExtraDims";
+    String flowName = "testFlowNameNoMapExtraDims";
+    String jobName = String.format("%s_%s_%s", flowGroup, flowName, 
"testJobNameNoMapExtraDims");
+
+    State producerState = new State();
+    
producerState.setProp(ConfigurationKeys.METRICS_REPORTING_OPENTELEMETRY_ENABLED,
 "true");
+    // No dimensionsMap set => fallback baseline attributes
+    
producerState.setProp(GaaSJobObservabilityEventProducer.JOB_SUCCEEDED_EXTRA_DIMENSIONS_ENABLED_KEY,
 "true");
+
+    try (MockGaaSJobObservabilityEventProducer producer =
+        new MockGaaSJobObservabilityEventProducer(producerState, 
this.issueRepository, true)) {
+      Properties jobProps = new Properties();
+      
jobProps.setProperty(GaaSJobObservabilityEventProducer.JOB_SUCCEEDED_EXTRA_DIMENSIONS_KEYS_JOBPROP,
 "java_version,tag");
+      jobProps.setProperty("java_version", "11");
+      jobProps.setProperty("tag", "foo");
+
+      Map<String, String> gteEventMetadata = Maps.newHashMap();
+      gteEventMetadata.put(TimingEvent.FlowEventConstants.FLOW_GROUP_FIELD, 
flowGroup);
+      gteEventMetadata.put(TimingEvent.FlowEventConstants.FLOW_NAME_FIELD, 
flowName);
+      
gteEventMetadata.put(TimingEvent.FlowEventConstants.FLOW_EXECUTION_ID_FIELD, 
"1");
+      gteEventMetadata.put(TimingEvent.FlowEventConstants.JOB_NAME_FIELD, 
jobName);
+      gteEventMetadata.put(TimingEvent.FlowEventConstants.JOB_GROUP_FIELD, 
flowName);
+      gteEventMetadata.put(TimingEvent.FlowEventConstants.FLOW_EDGE_FIELD, 
"flowEdge");
+      gteEventMetadata.put(TimingEvent.FlowEventConstants.SPEC_EXECUTOR_FIELD, 
"specExecutor");
+      gteEventMetadata.put(JobStatusRetriever.EVENT_NAME_FIELD, 
ExecutionStatus.COMPLETE.name());
+      gteEventMetadata.put(JobExecutionPlan.JOB_PROPS_KEY, 
PropertiesUtils.serialize(jobProps));
+
+      Properties jobStatusProps = new Properties();
+      jobStatusProps.putAll(gteEventMetadata);
+      producer.emitObservabilityEvent(new State(jobStatusProps));
+
+      Collection<MetricData> metrics = 
producer.getOpentelemetryMetrics().metricReader.collectAllMetrics();
+      Map<String, MetricData> metricsByName =
+          metrics.stream().collect(Collectors.toMap(MetricData::getName, 
metricData -> metricData));
+      MetricData jobStatusMetric = metricsByName.get("jobSucceeded");
+      List<LongPointData> datapoints = 
jobStatusMetric.getLongGaugeData().getPoints().stream().collect(Collectors.toList());
+      Assert.assertEquals(datapoints.size(), 1);
+      Map<AttributeKey<?>, Object> attrs = 
datapoints.get(0).getAttributes().asMap();
+
+      // Baseline keys
+      Assert.assertEquals(attrs.get(AttributeKey.stringKey("flowName")), 
flowName);
+      Assert.assertEquals(attrs.get(AttributeKey.stringKey("flowGroup")), 
flowGroup);
+      Assert.assertEquals(attrs.get(AttributeKey.stringKey("jobName")), 
jobName);
+      Assert.assertEquals(attrs.get(AttributeKey.longKey("flowExecutionId")), 
1L);
+      Assert.assertEquals(attrs.get(AttributeKey.stringKey("specExecutor")), 
"specExecutor");
+      Assert.assertEquals(attrs.get(AttributeKey.stringKey("flowEdge")), 
"flowEdge");
+
+      // Extra dims still honored
+      Assert.assertEquals(attrs.get(AttributeKey.stringKey("java_version")), 
"11");
+      Assert.assertEquals(attrs.get(AttributeKey.stringKey("tag")), "foo");
+    }
+  }
+
+  @Test
+  public void testMockProduceMetrics_maxDimensionsCapsExtraDims() throws 
Exception {
+    String flowGroup = "testFlowGroupMaxDims";
+    String flowName = "testFlowNameMaxDims";
+    String jobName = String.format("%s_%s_%s", flowGroup, flowName, 
"testJobNameMaxDims");
+
+    State producerState = new State();
+    
producerState.setProp(ConfigurationKeys.METRICS_REPORTING_OPENTELEMETRY_ENABLED,
 "true");
+    
producerState.setProp(GaaSJobObservabilityEventProducer.JOB_SUCCEEDED_EXTRA_DIMENSIONS_ENABLED_KEY,
 "true");
+    // Baseline has 6 dims; cap at 7 means we cannot add both extras, so we 
add none (all-or-nothing).
+    
producerState.setProp(GaaSJobObservabilityEventProducer.JOB_SUCCEEDED_MAX_DIMENSIONS_KEY,
 "7");
+
+    try (MockGaaSJobObservabilityEventProducer producer =
+        new MockGaaSJobObservabilityEventProducer(producerState, 
this.issueRepository, true)) {
+      Properties jobProps = new Properties();
+      
jobProps.setProperty(GaaSJobObservabilityEventProducer.JOB_SUCCEEDED_EXTRA_DIMENSIONS_KEYS_JOBPROP,
 "java_version,tag");
+      jobProps.setProperty("java_version", "11");
+      jobProps.setProperty("tag", "foo");
+
+      Map<String, String> gteEventMetadata = Maps.newHashMap();
+      gteEventMetadata.put(TimingEvent.FlowEventConstants.FLOW_GROUP_FIELD, 
flowGroup);
+      gteEventMetadata.put(TimingEvent.FlowEventConstants.FLOW_NAME_FIELD, 
flowName);
+      
gteEventMetadata.put(TimingEvent.FlowEventConstants.FLOW_EXECUTION_ID_FIELD, 
"1");
+      gteEventMetadata.put(TimingEvent.FlowEventConstants.JOB_NAME_FIELD, 
jobName);
+      gteEventMetadata.put(TimingEvent.FlowEventConstants.JOB_GROUP_FIELD, 
flowName);
+      gteEventMetadata.put(TimingEvent.FlowEventConstants.FLOW_EDGE_FIELD, 
"flowEdge");
+      gteEventMetadata.put(TimingEvent.FlowEventConstants.SPEC_EXECUTOR_FIELD, 
"specExecutor");
+      gteEventMetadata.put(JobStatusRetriever.EVENT_NAME_FIELD, 
ExecutionStatus.COMPLETE.name());
+      gteEventMetadata.put(JobExecutionPlan.JOB_PROPS_KEY, 
PropertiesUtils.serialize(jobProps));
+
+      Properties jobStatusProps = new Properties();
+      jobStatusProps.putAll(gteEventMetadata);
+      producer.emitObservabilityEvent(new State(jobStatusProps));
+
+      Collection<MetricData> metrics = 
producer.getOpentelemetryMetrics().metricReader.collectAllMetrics();
+      Map<String, MetricData> metricsByName =
+          metrics.stream().collect(Collectors.toMap(MetricData::getName, 
metricData -> metricData));
+      MetricData jobStatusMetric = metricsByName.get("jobSucceeded");
+      List<LongPointData> datapoints = 
jobStatusMetric.getLongGaugeData().getPoints().stream().collect(Collectors.toList());
+      Assert.assertEquals(datapoints.size(), 1);
+      Map<AttributeKey<?>, Object> attrs = 
datapoints.get(0).getAttributes().asMap();
+
+      Assert.assertNull(attrs.get(AttributeKey.stringKey("java_version")));
+      Assert.assertNull(attrs.get(AttributeKey.stringKey("tag")));
+    }
+  }
+
+  @Test
+  public void testMockProduceMetrics_maxDimensionsAllowsAllExtraDims() throws 
Exception {
+    String flowGroup = "testFlowGroupMaxDimsAllExtras";
+    String flowName = "testFlowNameMaxDimsAllExtras";
+    String jobName = String.format("%s_%s_%s", flowGroup, flowName, 
"testJobNameMaxDimsAllExtras");
+
+    State producerState = new State();
+    
producerState.setProp(ConfigurationKeys.METRICS_REPORTING_OPENTELEMETRY_ENABLED,
 "true");
+    
producerState.setProp(GaaSJobObservabilityEventProducer.JOB_SUCCEEDED_EXTRA_DIMENSIONS_ENABLED_KEY,
 "true");
+    // Baseline(6) + extras(2) = 8 fits.
+    
producerState.setProp(GaaSJobObservabilityEventProducer.JOB_SUCCEEDED_MAX_DIMENSIONS_KEY,
 "8");
+
+    try (MockGaaSJobObservabilityEventProducer producer =
+        new MockGaaSJobObservabilityEventProducer(producerState, 
this.issueRepository, true)) {
+      Properties jobProps = new Properties();
+      
jobProps.setProperty(GaaSJobObservabilityEventProducer.JOB_SUCCEEDED_EXTRA_DIMENSIONS_KEYS_JOBPROP,
 "java_version,tag");
+      jobProps.setProperty("java_version", "11");
+      jobProps.setProperty("tag", "foo");
+
+      Map<String, String> gteEventMetadata = Maps.newHashMap();
+      gteEventMetadata.put(TimingEvent.FlowEventConstants.FLOW_GROUP_FIELD, 
flowGroup);
+      gteEventMetadata.put(TimingEvent.FlowEventConstants.FLOW_NAME_FIELD, 
flowName);
+      
gteEventMetadata.put(TimingEvent.FlowEventConstants.FLOW_EXECUTION_ID_FIELD, 
"1");
+      gteEventMetadata.put(TimingEvent.FlowEventConstants.JOB_NAME_FIELD, 
jobName);
+      gteEventMetadata.put(TimingEvent.FlowEventConstants.JOB_GROUP_FIELD, 
flowName);
+      gteEventMetadata.put(TimingEvent.FlowEventConstants.FLOW_EDGE_FIELD, 
"flowEdge");
+      gteEventMetadata.put(TimingEvent.FlowEventConstants.SPEC_EXECUTOR_FIELD, 
"specExecutor");
+      gteEventMetadata.put(JobStatusRetriever.EVENT_NAME_FIELD, 
ExecutionStatus.COMPLETE.name());
+      gteEventMetadata.put(JobExecutionPlan.JOB_PROPS_KEY, 
PropertiesUtils.serialize(jobProps));
+
+      Properties jobStatusProps = new Properties();
+      jobStatusProps.putAll(gteEventMetadata);
+      producer.emitObservabilityEvent(new State(jobStatusProps));
+
+      Collection<MetricData> metrics = 
producer.getOpentelemetryMetrics().metricReader.collectAllMetrics();
+      Map<String, MetricData> metricsByName =
+          metrics.stream().collect(Collectors.toMap(MetricData::getName, 
metricData -> metricData));
+      MetricData jobStatusMetric = metricsByName.get("jobSucceeded");
+      List<LongPointData> datapoints = 
jobStatusMetric.getLongGaugeData().getPoints().stream().collect(Collectors.toList());
+      Assert.assertEquals(datapoints.size(), 1);
+      Map<AttributeKey<?>, Object> attrs = 
datapoints.get(0).getAttributes().asMap();
+
+      Assert.assertEquals(attrs.get(AttributeKey.stringKey("java_version")), 
"11");
+      Assert.assertEquals(attrs.get(AttributeKey.stringKey("tag")), "foo");
+    }
+  }
+
+  @Test
+  public void 
testMockProduceMetrics_maxDimensionsExceededFallsBackToBaseline() throws 
Exception {
+    String flowGroup = "testFlowGroupMaxDimsBaselineOnly";
+    String flowName = "testFlowNameMaxDimsBaselineOnly";
+    String jobName = String.format("%s_%s_%s", flowGroup, flowName, 
"testJobNameMaxDimsBaselineOnly");
+
+    State producerState = new State();
+    
producerState.setProp(ConfigurationKeys.METRICS_REPORTING_OPENTELEMETRY_ENABLED,
 "true");
+    // Effective map would be baseline(6) + configured(1) => 7, but cap at 6 
=> baseline only.
+    
producerState.setProp(GaaSJobObservabilityEventProducer.JOB_SUCCEEDED_MAX_DIMENSIONS_KEY,
 "6");
+    
producerState.setProp(GaaSJobObservabilityEventProducer.JOB_SUCCEEDED_DIMENSIONS_MAP_KEY,
 "{\"status\":\"jobStatus\"}");
+
+    try (MockGaaSJobObservabilityEventProducer producer =
+        new MockGaaSJobObservabilityEventProducer(producerState, 
this.issueRepository, true)) {
+      Map<String, String> gteEventMetadata = Maps.newHashMap();
+      gteEventMetadata.put(TimingEvent.FlowEventConstants.FLOW_GROUP_FIELD, 
flowGroup);
+      gteEventMetadata.put(TimingEvent.FlowEventConstants.FLOW_NAME_FIELD, 
flowName);
+      
gteEventMetadata.put(TimingEvent.FlowEventConstants.FLOW_EXECUTION_ID_FIELD, 
"1");
+      gteEventMetadata.put(TimingEvent.FlowEventConstants.JOB_NAME_FIELD, 
jobName);
+      gteEventMetadata.put(TimingEvent.FlowEventConstants.JOB_GROUP_FIELD, 
flowName);
+      gteEventMetadata.put(TimingEvent.FlowEventConstants.FLOW_EDGE_FIELD, 
"flowEdge");
+      gteEventMetadata.put(TimingEvent.FlowEventConstants.SPEC_EXECUTOR_FIELD, 
"specExecutor");
+      gteEventMetadata.put(JobStatusRetriever.EVENT_NAME_FIELD, 
ExecutionStatus.COMPLETE.name());
+
+      Properties jobStatusProps = new Properties();
+      jobStatusProps.putAll(gteEventMetadata);
+      producer.emitObservabilityEvent(new State(jobStatusProps));
+
+      Collection<MetricData> metrics = 
producer.getOpentelemetryMetrics().metricReader.collectAllMetrics();
+      Map<String, MetricData> metricsByName =
+          metrics.stream().collect(Collectors.toMap(MetricData::getName, 
metricData -> metricData));
+      MetricData jobStatusMetric = metricsByName.get("jobSucceeded");
+      List<LongPointData> datapoints = 
jobStatusMetric.getLongGaugeData().getPoints().stream().collect(Collectors.toList());
+      Assert.assertEquals(datapoints.size(), 1);
+      Map<AttributeKey<?>, Object> attrs = 
datapoints.get(0).getAttributes().asMap();
+
+      Assert.assertNull(attrs.get(AttributeKey.stringKey("status")));
+      Assert.assertEquals(attrs.get(AttributeKey.stringKey("flowName")), 
flowName);
+    }
+  }
+
+  @Test
+  public void testGetConfiguredJobSucceededDimensionsMap_validJson() {
+    State state = new State();
+    
state.setProp(GaaSJobObservabilityEventProducer.JOB_SUCCEEDED_DIMENSIONS_MAP_KEY,
 "{\"status\":\"jobStatus\",\"executor\":\"executorId\"}");
+
+    Map<String, String> result = 
GaaSJobObservabilityEventProducer.getConfiguredJobSucceededDimensionsMap(state);
+
+    Assert.assertNotNull(result);
+    Assert.assertEquals(result.size(), 2);
+    Assert.assertEquals(result.get("status"), "jobStatus");
+    Assert.assertEquals(result.get("executor"), "executorId");
+  }
+
+  @Test
+  public void testGetConfiguredJobSucceededDimensionsMap_blankConfig() {
+    State state = new State();
+    
state.setProp(GaaSJobObservabilityEventProducer.JOB_SUCCEEDED_DIMENSIONS_MAP_KEY,
 "");
+
+    Map<String, String> result = 
GaaSJobObservabilityEventProducer.getConfiguredJobSucceededDimensionsMap(state);
+
+    Assert.assertNull(result);
+  }
+
+  @Test
+  public void testGetConfiguredJobSucceededDimensionsMap_invalidJson() {
+    State state = new State();
+    // Invalid JSON - malformed content (missing closing brace)
+    
state.setProp(GaaSJobObservabilityEventProducer.JOB_SUCCEEDED_DIMENSIONS_MAP_KEY,
 "{a:b,c:d");
+
+    Map<String, String> result = 
GaaSJobObservabilityEventProducer.getConfiguredJobSucceededDimensionsMap(state);
+
+    // Should return null and log warning on parse failure
+    Assert.assertNull(result);
   }
 
   private Issue createTestIssue(String summary, String code, IssueSeverity 
severity) {

Reply via email to