abhishekmjain commented on code in PR #4164:
URL: https://github.com/apache/gobblin/pull/4164#discussion_r2815097475


##########
gobblin-service/src/main/java/org/apache/gobblin/service/monitoring/GaaSJobObservabilityEventProducer.java:
##########
@@ -138,15 +192,261 @@ public void emitObservabilityEvent(final State jobState) 
{
     this.eventCollector.add(event);
   }
 
+  /**
+   * Creates dimensions for the OpenTelemetry event based on the configured 
mapping of MDM dimension keys to GaaS observability event field keys.
+   * If the config is not present or fails to parse, falls back to a default 
set of dimensions using hardcoded mappings.
+   * The dimension values are read from the GaaS observability event.
+   * If a value is missing or cannot be read, it defaults to "NA".
+   * @param event
+   * @return
+   */
   public Attributes getEventAttributes(GaaSJobObservabilityEvent event) {
-    Attributes tags = 
Attributes.builder().put(TimingEvent.FlowEventConstants.FLOW_NAME_FIELD, 
getOrDefault(event.getFlowName(), DEFAULT_OPENTELEMETRY_ATTRIBUTE_VALUE))
-        .put(TimingEvent.FlowEventConstants.FLOW_GROUP_FIELD, 
getOrDefault(event.getFlowGroup(), DEFAULT_OPENTELEMETRY_ATTRIBUTE_VALUE))
-        .put(TimingEvent.FlowEventConstants.JOB_NAME_FIELD, 
getOrDefault(event.getJobName(), DEFAULT_OPENTELEMETRY_ATTRIBUTE_VALUE))
-        .put(TimingEvent.FlowEventConstants.FLOW_EXECUTION_ID_FIELD, 
event.getFlowExecutionId())
-        .put(TimingEvent.FlowEventConstants.SPEC_EXECUTOR_FIELD, 
getOrDefault(event.getExecutorId(), DEFAULT_OPENTELEMETRY_ATTRIBUTE_VALUE))
-        .put(TimingEvent.FlowEventConstants.FLOW_EDGE_FIELD, 
getOrDefault(event.getFlowEdgeId(), DEFAULT_OPENTELEMETRY_ATTRIBUTE_VALUE))
-        .build();
-    return tags;
+    int maxDimensions = getMaxJobSucceededDimensions();
+    Map<String, String> configuredMap = 
getConfiguredJobSucceededDimensionsMap(this.state);
+    if (configuredMap == null || configuredMap.isEmpty()) {
+      // Backward-compatible fallback: always emit the baseline dimensions.
+      // If the extra-dimensions gate is enabled, honor per-run dimensions 
from job properties.
+      AttributesBuilder builder = Attributes.builder();
+      for (Map.Entry<String, String> entry : 
JOB_SUCCEEDED_BASELINE_DIMENSIONS_MAP.entrySet()) {
+        addObsEventFieldAttribute(builder, entry.getKey(), entry.getValue(), 
event);
+      }
+      addExtraDimensionsFromJobProperties(builder, 
JOB_SUCCEEDED_BASELINE_DIMENSIONS_MAP, event, maxDimensions);
+      return builder.build();
+    }
+
+    // Drop orchestrator dimensions that point to the same event field as a 
baseline dimension.
+    // This prevents capturing the same source field under multiple dimension 
keys.
+    Map<String, String> filteredConfiguredMap = 
dropDuplicateObsEventFieldMappings(configuredMap, 
JOB_SUCCEEDED_BASELINE_DIMENSIONS_MAP);
+
+    // Ensure baseline dimensions are always emitted, even if orchestrator 
config is missing some keys.
+    Map<String, String> effectiveMap = new 
LinkedHashMap<>(filteredConfiguredMap);
+    for (Map.Entry<String, String> baselineEntry : 
JOB_SUCCEEDED_BASELINE_DIMENSIONS_MAP.entrySet()) {
+      effectiveMap.putIfAbsent(baselineEntry.getKey(), 
baselineEntry.getValue());
+    }
+
+    // If orchestrator config would push us past the cap, drop orchestrator 
dimensions (keep baseline only).
+    if (effectiveMap.size() > maxDimensions) {
+      log.warn("jobSucceeded would emit {} baseline+orchestrator dimensions 
which exceeds maxDimensions={} (`{}`); dropping orchestrator dimensions",
+          effectiveMap.size(), maxDimensions, 
JOB_SUCCEEDED_MAX_DIMENSIONS_KEY);
+      effectiveMap = new 
LinkedHashMap<>(JOB_SUCCEEDED_BASELINE_DIMENSIONS_MAP);
+    }
+
+    AttributesBuilder builder = Attributes.builder();
+    for (Map.Entry<String, String> entry : effectiveMap.entrySet()) {
+      String mdmDimensionKey = entry.getKey();
+      String obsEventFieldKey = entry.getValue();
+      addObsEventFieldAttribute(builder, mdmDimensionKey, obsEventFieldKey, 
event);
+    }
+
+    addExtraDimensionsFromJobProperties(builder, effectiveMap, event, 
maxDimensions);
+
+    return builder.build();
+  }
+
+  private int getMaxJobSucceededDimensions() {
+    int jobSucceededMaxDimensions = DEFAULT_JOB_SUCCEEDED_MAX_DIMENSIONS;
+    try {
+      jobSucceededMaxDimensions = 
this.state.getPropAsInt(JOB_SUCCEEDED_MAX_DIMENSIONS_KEY, 
DEFAULT_JOB_SUCCEEDED_MAX_DIMENSIONS);
+    } catch (Exception e) {
+      log.warn("Invalid `{}` value; using default {}", 
JOB_SUCCEEDED_MAX_DIMENSIONS_KEY, DEFAULT_JOB_SUCCEEDED_MAX_DIMENSIONS, e);
+    }
+    int baselineSize = JOB_SUCCEEDED_BASELINE_DIMENSIONS_MAP.size();
+    if (jobSucceededMaxDimensions < baselineSize) {
+      log.warn("Configured `{}`={} is less than baseline dimension count {}; 
using {} instead",
+          JOB_SUCCEEDED_MAX_DIMENSIONS_KEY, jobSucceededMaxDimensions, 
baselineSize, baselineSize);
+      return baselineSize;
+    }
+    return jobSucceededMaxDimensions;
+  }
+
+  private static Map<String, String> 
dropDuplicateObsEventFieldMappings(Map<String, String> configuredMap,

Review Comment:
   Is this really required?



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: [email protected]

For queries about this service, please contact Infrastructure at:
[email protected]

Reply via email to