abhishekmjain commented on code in PR #4164:
URL: https://github.com/apache/gobblin/pull/4164#discussion_r2814944419
##########
gobblin-service/src/main/java/org/apache/gobblin/service/monitoring/GaaSJobObservabilityEventProducer.java:
##########
@@ -138,15 +192,261 @@ public void emitObservabilityEvent(final State jobState)
{
this.eventCollector.add(event);
}
+ /**
+ * Creates dimensions for the OpenTelemetry event based on the configured
mapping of MDM dimension keys to GaaS observability event field keys.
+ * If the config is not present or fails to parse, falls back to a default
set of dimensions using hardcoded mappings.
+ * The dimension values are read from the GaaS observability event.
+ * If a value is missing or cannot be read, it defaults to "NA".
+ * @param event
+ * @return
+ */
public Attributes getEventAttributes(GaaSJobObservabilityEvent event) {
- Attributes tags =
Attributes.builder().put(TimingEvent.FlowEventConstants.FLOW_NAME_FIELD,
getOrDefault(event.getFlowName(), DEFAULT_OPENTELEMETRY_ATTRIBUTE_VALUE))
- .put(TimingEvent.FlowEventConstants.FLOW_GROUP_FIELD,
getOrDefault(event.getFlowGroup(), DEFAULT_OPENTELEMETRY_ATTRIBUTE_VALUE))
- .put(TimingEvent.FlowEventConstants.JOB_NAME_FIELD,
getOrDefault(event.getJobName(), DEFAULT_OPENTELEMETRY_ATTRIBUTE_VALUE))
- .put(TimingEvent.FlowEventConstants.FLOW_EXECUTION_ID_FIELD,
event.getFlowExecutionId())
- .put(TimingEvent.FlowEventConstants.SPEC_EXECUTOR_FIELD,
getOrDefault(event.getExecutorId(), DEFAULT_OPENTELEMETRY_ATTRIBUTE_VALUE))
- .put(TimingEvent.FlowEventConstants.FLOW_EDGE_FIELD,
getOrDefault(event.getFlowEdgeId(), DEFAULT_OPENTELEMETRY_ATTRIBUTE_VALUE))
- .build();
- return tags;
+ int maxDimensions = getMaxJobSucceededDimensions();
+ Map<String, String> configuredMap =
getConfiguredJobSucceededDimensionsMap(this.state);
+ if (configuredMap == null || configuredMap.isEmpty()) {
+ // Backward-compatible fallback: always emit the baseline dimensions.
+ // If the extra-dimensions gate is enabled, honor per-run dimensions
from job properties.
+ AttributesBuilder builder = Attributes.builder();
+ for (Map.Entry<String, String> entry :
JOB_SUCCEEDED_BASELINE_DIMENSIONS_MAP.entrySet()) {
+ addObsEventFieldAttribute(builder, entry.getKey(), entry.getValue(),
event);
+ }
+ addExtraDimensionsFromJobProperties(builder,
JOB_SUCCEEDED_BASELINE_DIMENSIONS_MAP, event, maxDimensions);
+ return builder.build();
+ }
+
+ // Drop orchestrator dimensions that point to the same event field as a
baseline dimension.
+ // This prevents capturing the same source field under multiple dimension
keys.
+ Map<String, String> filteredConfiguredMap =
dropDuplicateObsEventFieldMappings(configuredMap,
JOB_SUCCEEDED_BASELINE_DIMENSIONS_MAP);
+
+ // Ensure baseline dimensions are always emitted, even if orchestrator
config is missing some keys.
+ Map<String, String> effectiveMap = new
LinkedHashMap<>(filteredConfiguredMap);
+ for (Map.Entry<String, String> baselineEntry :
JOB_SUCCEEDED_BASELINE_DIMENSIONS_MAP.entrySet()) {
+ effectiveMap.putIfAbsent(baselineEntry.getKey(),
baselineEntry.getValue());
+ }
+
+ // If orchestrator config would push us past the cap, drop orchestrator
dimensions (keep baseline only).
+ if (effectiveMap.size() > maxDimensions) {
+ log.warn("jobSucceeded would emit {} baseline+orchestrator dimensions
which exceeds maxDimensions={} (`{}`); dropping orchestrator dimensions",
+ effectiveMap.size(), maxDimensions,
JOB_SUCCEEDED_MAX_DIMENSIONS_KEY);
+ effectiveMap = new
LinkedHashMap<>(JOB_SUCCEEDED_BASELINE_DIMENSIONS_MAP);
+ }
+
+ AttributesBuilder builder = Attributes.builder();
+ for (Map.Entry<String, String> entry : effectiveMap.entrySet()) {
+ String mdmDimensionKey = entry.getKey();
+ String obsEventFieldKey = entry.getValue();
+ addObsEventFieldAttribute(builder, mdmDimensionKey, obsEventFieldKey,
event);
+ }
+
+ addExtraDimensionsFromJobProperties(builder, effectiveMap, event,
maxDimensions);
+
+ return builder.build();
+ }
+
+ private int getMaxJobSucceededDimensions() {
+ int jobSucceededMaxDimensions = DEFAULT_JOB_SUCCEEDED_MAX_DIMENSIONS;
+ try {
+ jobSucceededMaxDimensions =
this.state.getPropAsInt(JOB_SUCCEEDED_MAX_DIMENSIONS_KEY,
DEFAULT_JOB_SUCCEEDED_MAX_DIMENSIONS);
+ } catch (Exception e) {
+ log.warn("Invalid `{}` value; using default {}",
JOB_SUCCEEDED_MAX_DIMENSIONS_KEY, DEFAULT_JOB_SUCCEEDED_MAX_DIMENSIONS, e);
+ }
+ int baselineSize = JOB_SUCCEEDED_BASELINE_DIMENSIONS_MAP.size();
+ if (jobSucceededMaxDimensions < baselineSize) {
+ log.warn("Configured `{}`={} is less than baseline dimension count {};
using {} instead",
+ JOB_SUCCEEDED_MAX_DIMENSIONS_KEY, jobSucceededMaxDimensions,
baselineSize, baselineSize);
+ return baselineSize;
+ }
+ return jobSucceededMaxDimensions;
+ }
+
+ private static Map<String, String>
dropDuplicateObsEventFieldMappings(Map<String, String> configuredMap,
+ Map<String, String> baselineMap) {
+ Map<String, String> baselineObsFieldToKey = new LinkedHashMap<>();
+ for (Map.Entry<String, String> entry : baselineMap.entrySet()) {
+ if (StringUtils.isNotBlank(entry.getValue())) {
+ baselineObsFieldToKey.put(entry.getValue().trim(), entry.getKey());
+ }
+ }
+
+ Map<String, String> seenObsFieldToKey = new LinkedHashMap<>();
+ Map<String, String> filtered = new LinkedHashMap<>();
+ for (Map.Entry<String, String> entry : configuredMap.entrySet()) {
+ String dimensionKey = entry.getKey();
+ String obsFieldKey = entry.getValue();
+ if (StringUtils.isBlank(dimensionKey) ||
StringUtils.isBlank(obsFieldKey)) {
+ continue;
+ }
+ String normalizedObsFieldKey = obsFieldKey.trim();
+
+ String baselineKey = baselineObsFieldToKey.get(normalizedObsFieldKey);
+ if (baselineKey != null && !baselineKey.equals(dimensionKey)) {
+ log.warn("Ignoring jobSucceeded dimension mapping `{}` -> `{}` because
field `{}` is already captured under baseline dimension `{}`",
+ dimensionKey, obsFieldKey, normalizedObsFieldKey, baselineKey);
+ continue;
+ }
+
+ String previousKey = seenObsFieldToKey.get(normalizedObsFieldKey);
+ if (previousKey != null && !previousKey.equals(dimensionKey)) {
+ log.warn("Ignoring duplicate jobSucceeded dimension mapping `{}` ->
`{}` because field `{}` is already captured under dimension `{}`",
+ dimensionKey, obsFieldKey, normalizedObsFieldKey, previousKey);
+ continue;
+ }
+
+ seenObsFieldToKey.put(normalizedObsFieldKey, dimensionKey);
+ filtered.put(dimensionKey, obsFieldKey);
+ }
+ return filtered;
+ }
+
+ /**
+ * Reads the JSON string config for job succeeded dimensions map, which is
expected to be in the format of
+ * {@code <mdm_dim_key>: <gaas_obs_event_field_key>}.
+ * @param state
+ * @return
+ */
+ private static Map<String, String>
getConfiguredJobSucceededDimensionsMap(State state) {
+ String raw = state.getProp(JOB_SUCCEEDED_DIMENSIONS_MAP_KEY, "");
+ if (StringUtils.isBlank(raw)) {
+ return null;
+ }
+ try {
+ Type type = new TypeToken<Map<String, String>>() {}.getType();
+ Map<String, String> parsed = GsonUtils.GSON.fromJson(raw, type);
+ if (parsed == null || parsed.isEmpty()) {
+ return null;
+ }
+ // Normalize into a deterministic insertion-order map.
+ return new LinkedHashMap<>(parsed);
+ } catch (Exception e) {
+ log.warn("Failed parsing jobSucceeded dimensionsMap config `{}`; falling
back to hardcoded defaults. Raw value: {}",
+ JOB_SUCCEEDED_DIMENSIONS_MAP_KEY, raw, e);
+ return null;
+ }
+ }
+
+ /**
+ * Adds an attribute to the OpenTelemetry event builder based on the mapping
of MDM dimension key to GaaS observability event field key.
+ * @param builder
+ * @param mdmDimensionKey
+ * @param obsEventFieldKey
+ * @param event
+ */
+ private void addObsEventFieldAttribute(AttributesBuilder builder, String
mdmDimensionKey, String obsEventFieldKey,
+ GaaSJobObservabilityEvent event) {
+ if (StringUtils.isBlank(mdmDimensionKey) ||
StringUtils.isBlank(obsEventFieldKey)) {
+ return;
+ }
+ String normalizedEventKey = obsEventFieldKey.trim();
+
+ Object value = getObsEventFieldValue(normalizedEventKey, event);
+ if (value == null) {
+ builder.put(mdmDimensionKey, DEFAULT_OPENTELEMETRY_ATTRIBUTE_VALUE);
+ return;
+ }
+ if (value instanceof Number) {
+ builder.put(mdmDimensionKey, ((Number) value).longValue());
+ return;
+ }
Review Comment:
While I agree a dimension usually would not have floating values, but its
better to call that out somewhere.
--
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
To unsubscribe, e-mail: [email protected]
For queries about this service, please contact Infrastructure at:
[email protected]