abhishekmjain commented on code in PR #4164:
URL: https://github.com/apache/gobblin/pull/4164#discussion_r2815097475
##########
gobblin-service/src/main/java/org/apache/gobblin/service/monitoring/GaaSJobObservabilityEventProducer.java:
##########
@@ -138,15 +192,261 @@ public void emitObservabilityEvent(final State jobState)
{
this.eventCollector.add(event);
}
+ /**
+ * Creates dimensions for the OpenTelemetry event based on the configured
mapping of MDM dimension keys to GaaS observability event field keys.
+ * If the config is not present or fails to parse, falls back to a default
set of dimensions using hardcoded mappings.
+ * The dimension values are read from the GaaS observability event.
+ * If a value is missing or cannot be read, it defaults to "NA".
+ * @param event
+ * @return
+ */
public Attributes getEventAttributes(GaaSJobObservabilityEvent event) {
- Attributes tags =
Attributes.builder().put(TimingEvent.FlowEventConstants.FLOW_NAME_FIELD,
getOrDefault(event.getFlowName(), DEFAULT_OPENTELEMETRY_ATTRIBUTE_VALUE))
- .put(TimingEvent.FlowEventConstants.FLOW_GROUP_FIELD,
getOrDefault(event.getFlowGroup(), DEFAULT_OPENTELEMETRY_ATTRIBUTE_VALUE))
- .put(TimingEvent.FlowEventConstants.JOB_NAME_FIELD,
getOrDefault(event.getJobName(), DEFAULT_OPENTELEMETRY_ATTRIBUTE_VALUE))
- .put(TimingEvent.FlowEventConstants.FLOW_EXECUTION_ID_FIELD,
event.getFlowExecutionId())
- .put(TimingEvent.FlowEventConstants.SPEC_EXECUTOR_FIELD,
getOrDefault(event.getExecutorId(), DEFAULT_OPENTELEMETRY_ATTRIBUTE_VALUE))
- .put(TimingEvent.FlowEventConstants.FLOW_EDGE_FIELD,
getOrDefault(event.getFlowEdgeId(), DEFAULT_OPENTELEMETRY_ATTRIBUTE_VALUE))
- .build();
- return tags;
+ int maxDimensions = getMaxJobSucceededDimensions();
+ Map<String, String> configuredMap =
getConfiguredJobSucceededDimensionsMap(this.state);
+ if (configuredMap == null || configuredMap.isEmpty()) {
+ // Backward-compatible fallback: always emit the baseline dimensions.
+ // If the extra-dimensions gate is enabled, honor per-run dimensions
from job properties.
+ AttributesBuilder builder = Attributes.builder();
+ for (Map.Entry<String, String> entry :
JOB_SUCCEEDED_BASELINE_DIMENSIONS_MAP.entrySet()) {
+ addObsEventFieldAttribute(builder, entry.getKey(), entry.getValue(),
event);
+ }
+ addExtraDimensionsFromJobProperties(builder,
JOB_SUCCEEDED_BASELINE_DIMENSIONS_MAP, event, maxDimensions);
+ return builder.build();
+ }
+
+ // Drop orchestrator dimensions that point to the same event field as a
baseline dimension.
+ // This prevents capturing the same source field under multiple dimension
keys.
+ Map<String, String> filteredConfiguredMap =
dropDuplicateObsEventFieldMappings(configuredMap,
JOB_SUCCEEDED_BASELINE_DIMENSIONS_MAP);
+
+ // Ensure baseline dimensions are always emitted, even if orchestrator
config is missing some keys.
+ Map<String, String> effectiveMap = new
LinkedHashMap<>(filteredConfiguredMap);
+ for (Map.Entry<String, String> baselineEntry :
JOB_SUCCEEDED_BASELINE_DIMENSIONS_MAP.entrySet()) {
+ effectiveMap.putIfAbsent(baselineEntry.getKey(),
baselineEntry.getValue());
+ }
+
+ // If orchestrator config would push us past the cap, drop orchestrator
dimensions (keep baseline only).
+ if (effectiveMap.size() > maxDimensions) {
+ log.warn("jobSucceeded would emit {} baseline+orchestrator dimensions
which exceeds maxDimensions={} (`{}`); dropping orchestrator dimensions",
+ effectiveMap.size(), maxDimensions,
JOB_SUCCEEDED_MAX_DIMENSIONS_KEY);
+ effectiveMap = new
LinkedHashMap<>(JOB_SUCCEEDED_BASELINE_DIMENSIONS_MAP);
+ }
+
+ AttributesBuilder builder = Attributes.builder();
+ for (Map.Entry<String, String> entry : effectiveMap.entrySet()) {
+ String mdmDimensionKey = entry.getKey();
+ String obsEventFieldKey = entry.getValue();
+ addObsEventFieldAttribute(builder, mdmDimensionKey, obsEventFieldKey,
event);
+ }
+
+ addExtraDimensionsFromJobProperties(builder, effectiveMap, event,
maxDimensions);
+
+ return builder.build();
+ }
+
+ private int getMaxJobSucceededDimensions() {
+ int jobSucceededMaxDimensions = DEFAULT_JOB_SUCCEEDED_MAX_DIMENSIONS;
+ try {
+ jobSucceededMaxDimensions =
this.state.getPropAsInt(JOB_SUCCEEDED_MAX_DIMENSIONS_KEY,
DEFAULT_JOB_SUCCEEDED_MAX_DIMENSIONS);
+ } catch (Exception e) {
+ log.warn("Invalid `{}` value; using default {}",
JOB_SUCCEEDED_MAX_DIMENSIONS_KEY, DEFAULT_JOB_SUCCEEDED_MAX_DIMENSIONS, e);
+ }
+ int baselineSize = JOB_SUCCEEDED_BASELINE_DIMENSIONS_MAP.size();
+ if (jobSucceededMaxDimensions < baselineSize) {
+ log.warn("Configured `{}`={} is less than baseline dimension count {};
using {} instead",
+ JOB_SUCCEEDED_MAX_DIMENSIONS_KEY, jobSucceededMaxDimensions,
baselineSize, baselineSize);
+ return baselineSize;
+ }
+ return jobSucceededMaxDimensions;
+ }
+
+ private static Map<String, String>
dropDuplicateObsEventFieldMappings(Map<String, String> configuredMap,
Review Comment:
Is this really required?
--
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
To unsubscribe, e-mail: [email protected]
For queries about this service, please contact Infrastructure at:
[email protected]