This is an automated email from the ASF dual-hosted git repository.
abhijain pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/gobblin.git
The following commit(s) were added to refs/heads/master by this push:
new 9e557b872f [GOBBLIN-2246] Make Orchestrator MDM metric dimesnion
configurable (#4164)
9e557b872f is described below
commit 9e557b872f626d3d054fb9d29ba499bd4ca0488e
Author: Gulbarga Adithya Rao <[email protected]>
AuthorDate: Thu Feb 19 16:35:21 2026 +0530
[GOBBLIN-2246] Make Orchestrator MDM metric dimesnion configurable (#4164)
Added configurable mdm dimensions to jobSucceeded
---
.../org/apache/gobblin/runtime/util/GsonUtils.java | 5 +
.../GaaSJobObservabilityEventProducer.java | 276 ++++++-
.../GaaSJobObservabilityProducerTest.java | 801 +++++++++++++++------
3 files changed, 865 insertions(+), 217 deletions(-)
diff --git
a/gobblin-runtime/src/main/java/org/apache/gobblin/runtime/util/GsonUtils.java
b/gobblin-runtime/src/main/java/org/apache/gobblin/runtime/util/GsonUtils.java
index 32a0672703..72b7aca4bf 100644
---
a/gobblin-runtime/src/main/java/org/apache/gobblin/runtime/util/GsonUtils.java
+++
b/gobblin-runtime/src/main/java/org/apache/gobblin/runtime/util/GsonUtils.java
@@ -24,6 +24,11 @@ import com.google.gson.GsonBuilder;
public final class GsonUtils {
+ /**
+ * Plain GSON serializer for general-purpose JSON parsing where no special
adapters are needed.
+ */
+ public static final Gson GSON = new Gson();
+
/**
* GSON serializer that can convert Java 8 date and time types to ISO
standard representation.
*/
diff --git
a/gobblin-service/src/main/java/org/apache/gobblin/service/monitoring/GaaSJobObservabilityEventProducer.java
b/gobblin-service/src/main/java/org/apache/gobblin/service/monitoring/GaaSJobObservabilityEventProducer.java
index 860f3ea587..836584bf01 100644
---
a/gobblin-service/src/main/java/org/apache/gobblin/service/monitoring/GaaSJobObservabilityEventProducer.java
+++
b/gobblin-service/src/main/java/org/apache/gobblin/service/monitoring/GaaSJobObservabilityEventProducer.java
@@ -21,16 +21,21 @@ import java.io.Closeable;
import java.io.IOException;
import java.lang.reflect.Type;
import java.util.ArrayList;
+import java.util.LinkedHashMap;
import java.util.List;
+import java.util.Map;
import java.util.Properties;
import java.util.stream.Collectors;
import org.apache.commons.lang3.StringUtils;
import com.codahale.metrics.MetricRegistry;
+import com.google.common.base.Splitter;
import com.google.gson.reflect.TypeToken;
+import org.apache.avro.Schema.Field;
import io.opentelemetry.api.common.Attributes;
+import io.opentelemetry.api.common.AttributesBuilder;
import io.opentelemetry.api.metrics.ObservableLongMeasurement;
import lombok.extern.slf4j.Slf4j;
@@ -77,6 +82,55 @@ public abstract class GaaSJobObservabilityEventProducer
implements Closeable {
public static final String GAAS_OBSERVABILITY_METRICS_GROUPNAME =
GAAS_JOB_OBSERVABILITY_EVENT_PRODUCER_PREFIX + "metrics";
public static final String GAAS_OBSERVABILITY_JOB_SUCCEEDED_METRIC_NAME =
"jobSucceeded";
private static final String DEFAULT_OPENTELEMETRY_ATTRIBUTE_VALUE = "-";
+ private static final Splitter COMMA_SPLITTER =
Splitter.on(',').omitEmptyStrings().trimResults();
+
+ /**
+ * JSON map config: {@code <mdm_dim_key>: <gaas_obs_event_field_key>}.
+ * This comes from orchestrator configs. This serves as default set of
dimensions to emit.
+ */
+ public static final String JOB_SUCCEEDED_DIMENSIONS_MAP_KEY =
+ "metrics.reporting.opentelemetry.jobSucceeded.dimensionsMap";
+
+ /**
+ * This comes from orchestrator configs.
+ * Flag to enable/disable per-run extra dimensions for {@link
#GAAS_OBSERVABILITY_JOB_SUCCEEDED_METRIC_NAME}.
+ */
+ public static final String JOB_SUCCEEDED_EXTRA_DIMENSIONS_ENABLED_KEY =
+
"metrics.reporting.opentelemetry.jobSucceeded.extraDimensions.enabled";
+
+ /**
+ * This comes from common.properties
+ * Job property (passed by template(/user)) listing extra dimension keys
(comma-separated).
+ */
+ public static final String JOB_SUCCEEDED_EXTRA_DIMENSIONS_KEYS_JOBPROP =
+ "metrics.reporting.opentelemetry.jobSucceeded.extraDimensions.keys";
+
+ /**
+ * This comes from orchestrator configs.
+ * Maximum number of dimensions to emit for {@code jobSucceeded}. Defaults
to 20.
+ */
+ public static final String JOB_SUCCEEDED_MAX_DIMENSIONS_KEY =
+ "metrics.reporting.opentelemetry.jobSucceeded.maxDimensions";
+ private static final int DEFAULT_JOB_SUCCEEDED_MAX_DIMENSIONS = 20;
+
+ /**
+ * Baseline dimensions for the {@code jobSucceeded} metric. These should
always be present for backward compatibility,
+ * even if the orchestrator-provided {@link
#JOB_SUCCEEDED_DIMENSIONS_MAP_KEY} is incomplete.
+ *
+ * <p>Map values are Avro field names on {@link GaaSJobObservabilityEvent}
(strict).</p>
+ */
+ private static final Map<String, String>
JOB_SUCCEEDED_BASELINE_DIMENSIONS_MAP =
createJobSucceededBaselineDimensionsMap();
+
+ private static Map<String, String> createJobSucceededBaselineDimensionsMap()
{
+ Map<String, String> baseline = new LinkedHashMap<>();
+ baseline.put(TimingEvent.FlowEventConstants.FLOW_NAME_FIELD, "flowName");
+ baseline.put(TimingEvent.FlowEventConstants.FLOW_GROUP_FIELD, "flowGroup");
+ baseline.put(TimingEvent.FlowEventConstants.JOB_NAME_FIELD, "jobName");
+ baseline.put(TimingEvent.FlowEventConstants.FLOW_EXECUTION_ID_FIELD,
"flowExecutionId");
+ baseline.put(TimingEvent.FlowEventConstants.SPEC_EXECUTOR_FIELD,
"executorId");
+ baseline.put(TimingEvent.FlowEventConstants.FLOW_EDGE_FIELD, "flowEdgeId");
+ return baseline;
+ }
protected MetricContext metricContext;
protected State state;
@@ -138,15 +192,221 @@ public abstract class GaaSJobObservabilityEventProducer
implements Closeable {
this.eventCollector.add(event);
}
+ /**
+ * Creates dimensions for the OpenTelemetry event based on the configured
mapping of MDM dimension keys to GaaS observability event field keys.
+ * If the config is not present or fails to parse, falls back to a default
set of dimensions using hardcoded mappings.
+ * The dimension values are read from the GaaS observability event.
+ * If a value is missing or cannot be read, it defaults to "NA".
+ * @param event
+ * @return
+ */
public Attributes getEventAttributes(GaaSJobObservabilityEvent event) {
- Attributes tags =
Attributes.builder().put(TimingEvent.FlowEventConstants.FLOW_NAME_FIELD,
getOrDefault(event.getFlowName(), DEFAULT_OPENTELEMETRY_ATTRIBUTE_VALUE))
- .put(TimingEvent.FlowEventConstants.FLOW_GROUP_FIELD,
getOrDefault(event.getFlowGroup(), DEFAULT_OPENTELEMETRY_ATTRIBUTE_VALUE))
- .put(TimingEvent.FlowEventConstants.JOB_NAME_FIELD,
getOrDefault(event.getJobName(), DEFAULT_OPENTELEMETRY_ATTRIBUTE_VALUE))
- .put(TimingEvent.FlowEventConstants.FLOW_EXECUTION_ID_FIELD,
event.getFlowExecutionId())
- .put(TimingEvent.FlowEventConstants.SPEC_EXECUTOR_FIELD,
getOrDefault(event.getExecutorId(), DEFAULT_OPENTELEMETRY_ATTRIBUTE_VALUE))
- .put(TimingEvent.FlowEventConstants.FLOW_EDGE_FIELD,
getOrDefault(event.getFlowEdgeId(), DEFAULT_OPENTELEMETRY_ATTRIBUTE_VALUE))
- .build();
- return tags;
+ int maxDimensions = getMaxJobSucceededDimensions();
+ Map<String, String> configuredMap =
getConfiguredJobSucceededDimensionsMap(this.state);
+ if (configuredMap == null || configuredMap.isEmpty()) {
+ // Backward-compatible fallback: always emit the baseline dimensions.
+ // If the extra-dimensions gate is enabled, honor per-run dimensions
from job properties.
+ AttributesBuilder builder = Attributes.builder();
+ for (Map.Entry<String, String> entry :
JOB_SUCCEEDED_BASELINE_DIMENSIONS_MAP.entrySet()) {
+ addObsEventFieldAttribute(builder, entry.getKey(), entry.getValue(),
event);
+ }
+ addExtraDimensionsFromJobProperties(builder,
JOB_SUCCEEDED_BASELINE_DIMENSIONS_MAP, event, maxDimensions);
+ return builder.build();
+ }
+
+ // Ensure baseline dimensions are always emitted, even if orchestrator
config is missing some keys.
+ Map<String, String> effectiveMap = new LinkedHashMap<>(configuredMap);
+ for (Map.Entry<String, String> baselineEntry :
JOB_SUCCEEDED_BASELINE_DIMENSIONS_MAP.entrySet()) {
+ effectiveMap.putIfAbsent(baselineEntry.getKey(),
baselineEntry.getValue());
+ }
+
+ // If orchestrator config would push us past the cap, drop orchestrator
dimensions (keep baseline only).
+ if (effectiveMap.size() > maxDimensions) {
+ log.warn("jobSucceeded would emit {} baseline+orchestrator dimensions
which exceeds maxDimensions={} (`{}`); dropping orchestrator dimensions",
+ effectiveMap.size(), maxDimensions,
JOB_SUCCEEDED_MAX_DIMENSIONS_KEY);
+ effectiveMap = new
LinkedHashMap<>(JOB_SUCCEEDED_BASELINE_DIMENSIONS_MAP);
+ }
+
+ AttributesBuilder builder = Attributes.builder();
+ for (Map.Entry<String, String> entry : effectiveMap.entrySet()) {
+ String mdmDimensionKey = entry.getKey();
+ String obsEventFieldKey = entry.getValue();
+ addObsEventFieldAttribute(builder, mdmDimensionKey, obsEventFieldKey,
event);
+ }
+
+ addExtraDimensionsFromJobProperties(builder, effectiveMap, event,
maxDimensions);
+
+ return builder.build();
+ }
+
+ private int getMaxJobSucceededDimensions() {
+ int jobSucceededMaxDimensions = DEFAULT_JOB_SUCCEEDED_MAX_DIMENSIONS;
+ try {
+ jobSucceededMaxDimensions =
this.state.getPropAsInt(JOB_SUCCEEDED_MAX_DIMENSIONS_KEY,
DEFAULT_JOB_SUCCEEDED_MAX_DIMENSIONS);
+ } catch (Exception e) {
+ log.warn("Invalid `{}` value; using default {}",
JOB_SUCCEEDED_MAX_DIMENSIONS_KEY, DEFAULT_JOB_SUCCEEDED_MAX_DIMENSIONS, e);
+ }
+ int baselineSize = JOB_SUCCEEDED_BASELINE_DIMENSIONS_MAP.size();
+ if (jobSucceededMaxDimensions < baselineSize) {
+ log.warn("Configured `{}`={} is less than baseline dimension count {};
using {} instead",
+ JOB_SUCCEEDED_MAX_DIMENSIONS_KEY, jobSucceededMaxDimensions,
baselineSize, baselineSize);
+ return baselineSize;
+ }
+ return jobSucceededMaxDimensions;
+ }
+
+ /**
+ * Reads the JSON string config for job succeeded dimensions map, which is
expected to be in the format of
+ * {@code <mdm_dim_key>: <gaas_obs_event_field_key>}.
+ * @param state
+ * @return
+ */
+ static Map<String, String> getConfiguredJobSucceededDimensionsMap(State
state) {
+ String raw = state.getProp(JOB_SUCCEEDED_DIMENSIONS_MAP_KEY, "");
+ if (StringUtils.isBlank(raw)) {
+ return null;
+ }
+ try {
+ Type type = new TypeToken<Map<String, String>>() {}.getType();
+ Map<String, String> parsed = GsonUtils.GSON.fromJson(raw, type);
+ if (parsed == null || parsed.isEmpty()) {
+ return null;
+ }
+ // Normalize into a deterministic insertion-order map.
+ return new LinkedHashMap<>(parsed);
+ } catch (Exception e) {
+ log.warn("Failed parsing jobSucceeded dimensionsMap config `{}`; falling
back to hardcoded defaults. Raw value: {}",
+ JOB_SUCCEEDED_DIMENSIONS_MAP_KEY, raw, e);
+ return null;
+ }
+ }
+
+ /**
+ * Adds an attribute to the OpenTelemetry event builder based on the mapping
of MDM dimension key to GaaS observability event field key.
+ * @param builder
+ * @param mdmDimensionKey
+ * @param obsEventFieldKey
+ * @param event
+ */
+ private void addObsEventFieldAttribute(AttributesBuilder builder, String
mdmDimensionKey, String obsEventFieldKey,
+ GaaSJobObservabilityEvent event) {
+ if (StringUtils.isBlank(mdmDimensionKey) ||
StringUtils.isBlank(obsEventFieldKey)) {
+ return;
+ }
+ String normalizedEventKey = obsEventFieldKey.trim();
+
+ Object value = getObsEventFieldValue(normalizedEventKey, event);
+ if (value == null) {
+ builder.put(mdmDimensionKey, DEFAULT_OPENTELEMETRY_ATTRIBUTE_VALUE);
+ return;
+ }
+ if (value instanceof Number) {
+ // OpenTelemetry attributes support numeric types, but for this metric
we standardize all numeric
+ // dimension values as a LONG attribute for consistency (dimensions are
expected to be discrete).
+ // Note: if a floating-point value (e.g., Double) ever shows up here,
the fractional part will be truncated.
+ builder.put(mdmDimensionKey, ((Number) value).longValue());
+ return;
+ }
+ if (value instanceof CharSequence) {
+ builder.put(mdmDimensionKey, getOrDefault(value.toString(),
DEFAULT_OPENTELEMETRY_ATTRIBUTE_VALUE));
+ return;
+ }
+ builder.put(mdmDimensionKey, getOrDefault(String.valueOf(value),
DEFAULT_OPENTELEMETRY_ATTRIBUTE_VALUE));
+ }
+
+ /**
+ * Generic getter for {@link GaaSJobObservabilityEvent} fields used for
dimension resolution.
+ *
+ * <p>Reads values by field name from the Avro schema (SpecificRecord {@code
getSchema()/get(int)}),
+ */
+ private static Object getObsEventFieldValue(String obsEventFieldKey,
GaaSJobObservabilityEvent event) {
+ try {
+ Field field = event.getSchema().getField(obsEventFieldKey);
+ if (field == null) {
+ return null;
+ }
+ return event.get(field.pos());
+ } catch (Exception e) {
+ return null;
+ }
+ }
+
+ /**
+ * Reads extra dimension keys from job properties, and adds them as
attributes if they are not already present in the default dimensions.
+ * The value for the extra dimension is read from job properties using the
same key.
+ * This allows users to specify additional dimensions on a per-job basis
without needing to change the service configuration.
+ * @param builder
+ * @param defaultDims
+ * @param event
+ */
+ private void addExtraDimensionsFromJobProperties(AttributesBuilder builder,
Map<String, String> defaultDims,
+ GaaSJobObservabilityEvent
event, int maxDimensions) {
+ if
(!this.state.getPropAsBoolean(JOB_SUCCEEDED_EXTRA_DIMENSIONS_ENABLED_KEY,
false)) {
+ return;
+ }
+
+ Map<String, String> extraDims =
getExtraDimensionsFromJobProperties(defaultDims, event);
+ if (extraDims.isEmpty()) {
+ return;
+ }
+
+ int total = defaultDims.size() + extraDims.size();
+ if (total > maxDimensions) {
+ log.warn("jobSucceeded extra dimensions would exceed maxDimensions={}
(`{}`): default={} + extra={} => {}; skipping all extra dimensions",
+ maxDimensions, JOB_SUCCEEDED_MAX_DIMENSIONS_KEY, defaultDims.size(),
extraDims.size(), total);
+ return;
+ }
+
+ for (Map.Entry<String, String> entry : extraDims.entrySet()) {
+ builder.put(entry.getKey(), entry.getValue());
+ }
+ }
+
+ private static Map<String, String>
getExtraDimensionsFromJobProperties(Map<String, String> defaultDims,
+ GaaSJobObservabilityEvent event) {
+ Map<String, String> jobProps = parseJobPropertiesJson(event);
+ if (jobProps == null || jobProps.isEmpty()) {
+ return new LinkedHashMap<>();
+ }
+ String extraDimensionKeys =
jobProps.getOrDefault(JOB_SUCCEEDED_EXTRA_DIMENSIONS_KEYS_JOBPROP, "");
+ if (StringUtils.isBlank(extraDimensionKeys)) {
+ return new LinkedHashMap<>();
+ }
+
+ Map<String, String> extras = new LinkedHashMap<>();
+ for (String key : COMMA_SPLITTER.split(extraDimensionKeys)) {
+ if (StringUtils.isBlank(key)) {
+ continue;
+ }
+ // Do not override service-default keys (map keys are the emitted OTel
keys)
+ if (defaultDims.containsKey(key)) {
+ continue;
+ }
+ String value = jobProps.get(key);
+ if (StringUtils.isBlank(value)) {
+ continue;
+ }
+ extras.put(key, value);
+ }
+ return extras;
+ }
+
+ /**
+ * Parses the job properties JSON string from the event into a Map. Returns
null if the raw string is blank or if parsing fails.
+ * @param event
+ * @return
+ */
+ private static Map<String, String>
parseJobPropertiesJson(GaaSJobObservabilityEvent event) {
+ try {
+ String raw = event.getJobProperties();
+ if (StringUtils.isBlank(raw)) {
+ return null;
+ }
+ Type type = new TypeToken<Map<String, String>>() {}.getType();
+ return GsonUtils.GSON.fromJson(raw, type);
+ } catch (Exception e) {
+ return null;
+ }
}
/**
diff --git
a/gobblin-service/src/test/java/org/apache/gobblin/service/monitoring/GaaSJobObservabilityProducerTest.java
b/gobblin-service/src/test/java/org/apache/gobblin/service/monitoring/GaaSJobObservabilityProducerTest.java
index 366a1408f7..f168407f43 100644
---
a/gobblin-service/src/test/java/org/apache/gobblin/service/monitoring/GaaSJobObservabilityProducerTest.java
+++
b/gobblin-service/src/test/java/org/apache/gobblin/service/monitoring/GaaSJobObservabilityProducerTest.java
@@ -88,64 +88,66 @@ public class GaaSJobObservabilityProducerTest {
State state = new State();
state.setProp(ServiceConfigKeys.GOBBLIN_SERVICE_INSTANCE_NAME,
"testCluster");
- MockGaaSJobObservabilityEventProducer producer = new
MockGaaSJobObservabilityEventProducer(state, this.issueRepository, false);
- Map<String, String> gteEventMetadata = Maps.newHashMap();
- gteEventMetadata.put(TimingEvent.FlowEventConstants.FLOW_GROUP_FIELD,
flowGroup);
- gteEventMetadata.put(TimingEvent.FlowEventConstants.FLOW_NAME_FIELD,
flowName);
-
gteEventMetadata.put(TimingEvent.FlowEventConstants.FLOW_EXECUTION_ID_FIELD,
flowExecutionId);
- gteEventMetadata.put(TimingEvent.FlowEventConstants.JOB_NAME_FIELD,
jobName);
- gteEventMetadata.put(TimingEvent.FlowEventConstants.JOB_GROUP_FIELD,
flowName);
- gteEventMetadata.put(TimingEvent.FlowEventConstants.FLOW_EDGE_FIELD,
"sourceNode_destinationNode_flowEdge");
- gteEventMetadata.put(TimingEvent.FlowEventConstants.SPEC_EXECUTOR_FIELD,
"specExecutor");
- gteEventMetadata.put(AzkabanProjectConfig.USER_TO_PROXY, "azkabanUser");
- gteEventMetadata.put(TimingEvent.METADATA_MESSAGE, "hostName");
- gteEventMetadata.put(TimingEvent.JOB_START_TIME, "20");
- gteEventMetadata.put(TimingEvent.JOB_END_TIME, "100");
- gteEventMetadata.put(JobStatusRetriever.EVENT_NAME_FIELD,
ExecutionStatus.COMPLETE.name());
- gteEventMetadata.put(TimingEvent.JOB_ORCHESTRATED_TIME, "1");
-
gteEventMetadata.put(TimingEvent.FlowEventConstants.FLOW_MODIFICATION_TIME_FIELD,
"20");
- gteEventMetadata.put(TimingEvent.DATASET_TASK_SUMMARIES,
GsonUtils.GSON_WITH_DATE_HANDLING.toJson(summaries));
- gteEventMetadata.put(JobExecutionPlan.JOB_PROPS_KEY,
PropertiesUtils.serialize(jobProps));
- Properties jobStatusProps = new Properties();
- jobStatusProps.putAll(gteEventMetadata);
- producer.emitObservabilityEvent(new State(jobStatusProps));
-
- List<GaaSJobObservabilityEvent> emittedEvents =
producer.getTestEmittedJobEvents();
-
- Assert.assertEquals(emittedEvents.size(), 1);
- Iterator<GaaSJobObservabilityEvent> iterator = emittedEvents.iterator();
- GaaSJobObservabilityEvent event = iterator.next();
- Assert.assertEquals(event.getFlowGroup(), flowGroup);
- Assert.assertEquals(event.getFlowName(), flowName);
- Assert.assertEquals(event.getJobName(), jobName);
- Assert.assertEquals(event.getFlowExecutionId(),
Long.valueOf(flowExecutionId));
- Assert.assertEquals(event.getJobStatus(), JobStatus.SUCCEEDED);
- Assert.assertEquals(event.getExecutorUrl(), "hostName");
- Assert.assertEquals(event.getIssues().size(), 1);
- Assert.assertEquals(event.getFlowEdgeId(), "flowEdge");
- Assert.assertEquals(event.getSourceNode(), "sourceNode");
- Assert.assertEquals(event.getDestinationNode(), "destinationNode");
- Assert.assertEquals(event.getExecutorId(), "specExecutor");
- Assert.assertEquals(event.getEffectiveUserUrn(), "azkabanUser");
- Assert.assertEquals(event.getJobOrchestratedTimestamp(), Long.valueOf(1));
- Assert.assertEquals(event.getLastFlowModificationTimestamp(),
Long.valueOf(20));
- Assert.assertEquals(event.getJobStartTimestamp(), Long.valueOf(20));
- Assert.assertEquals(event.getJobEndTimestamp(), Long.valueOf(100));
- Assert.assertEquals(event.getDatasetsMetrics().size(), 2);
- Assert.assertEquals(event.getDatasetsMetrics().get(0).getDatasetUrn(),
dataset1.getDatasetUrn());
-
Assert.assertEquals(event.getDatasetsMetrics().get(0).getEntitiesWritten(),
Long.valueOf(dataset1.getRecordsWritten()));
- Assert.assertEquals(event.getDatasetsMetrics().get(0).getBytesWritten(),
Long.valueOf(dataset1.getBytesWritten()));
-
Assert.assertEquals(event.getDatasetsMetrics().get(0).getSuccessfullyCommitted(),
Boolean.valueOf(dataset1.isSuccessfullyCommitted()));
- Assert.assertEquals(event.getDatasetsMetrics().get(1).getDatasetUrn(),
dataset2.getDatasetUrn());
-
Assert.assertEquals(event.getDatasetsMetrics().get(1).getEntitiesWritten(),
Long.valueOf(dataset2.getRecordsWritten()));
- Assert.assertEquals(event.getDatasetsMetrics().get(1).getBytesWritten(),
Long.valueOf(dataset2.getBytesWritten()));
-
Assert.assertEquals(event.getDatasetsMetrics().get(1).getSuccessfullyCommitted(),
Boolean.valueOf(dataset2.isSuccessfullyCommitted()));
- JsonParser.parseString(event.getJobProperties()); // Should not throw
- Assert.assertEquals(event.getGaasId(), "testCluster");
- AvroSerializer<GaaSJobObservabilityEvent> serializer = new
AvroBinarySerializer<>(
- GaaSJobObservabilityEvent.SCHEMA$, new NoopSchemaVersionWriter()
- );
- serializer.serializeRecord(event);
+ try (MockGaaSJobObservabilityEventProducer producer =
+ new MockGaaSJobObservabilityEventProducer(state,
this.issueRepository, false)) {
+ Map<String, String> gteEventMetadata = Maps.newHashMap();
+ gteEventMetadata.put(TimingEvent.FlowEventConstants.FLOW_GROUP_FIELD,
flowGroup);
+ gteEventMetadata.put(TimingEvent.FlowEventConstants.FLOW_NAME_FIELD,
flowName);
+
gteEventMetadata.put(TimingEvent.FlowEventConstants.FLOW_EXECUTION_ID_FIELD,
flowExecutionId);
+ gteEventMetadata.put(TimingEvent.FlowEventConstants.JOB_NAME_FIELD,
jobName);
+ gteEventMetadata.put(TimingEvent.FlowEventConstants.JOB_GROUP_FIELD,
flowName);
+ gteEventMetadata.put(TimingEvent.FlowEventConstants.FLOW_EDGE_FIELD,
"sourceNode_destinationNode_flowEdge");
+ gteEventMetadata.put(TimingEvent.FlowEventConstants.SPEC_EXECUTOR_FIELD,
"specExecutor");
+ gteEventMetadata.put(AzkabanProjectConfig.USER_TO_PROXY, "azkabanUser");
+ gteEventMetadata.put(TimingEvent.METADATA_MESSAGE, "hostName");
+ gteEventMetadata.put(TimingEvent.JOB_START_TIME, "20");
+ gteEventMetadata.put(TimingEvent.JOB_END_TIME, "100");
+ gteEventMetadata.put(JobStatusRetriever.EVENT_NAME_FIELD,
ExecutionStatus.COMPLETE.name());
+ gteEventMetadata.put(TimingEvent.JOB_ORCHESTRATED_TIME, "1");
+
gteEventMetadata.put(TimingEvent.FlowEventConstants.FLOW_MODIFICATION_TIME_FIELD,
"20");
+ gteEventMetadata.put(TimingEvent.DATASET_TASK_SUMMARIES,
GsonUtils.GSON_WITH_DATE_HANDLING.toJson(summaries));
+ gteEventMetadata.put(JobExecutionPlan.JOB_PROPS_KEY,
PropertiesUtils.serialize(jobProps));
+ Properties jobStatusProps = new Properties();
+ jobStatusProps.putAll(gteEventMetadata);
+ producer.emitObservabilityEvent(new State(jobStatusProps));
+
+ List<GaaSJobObservabilityEvent> emittedEvents =
producer.getTestEmittedJobEvents();
+
+ Assert.assertEquals(emittedEvents.size(), 1);
+ Iterator<GaaSJobObservabilityEvent> iterator = emittedEvents.iterator();
+ GaaSJobObservabilityEvent event = iterator.next();
+ Assert.assertEquals(event.getFlowGroup(), flowGroup);
+ Assert.assertEquals(event.getFlowName(), flowName);
+ Assert.assertEquals(event.getJobName(), jobName);
+ Assert.assertEquals(event.getFlowExecutionId(),
Long.valueOf(flowExecutionId));
+ Assert.assertEquals(event.getJobStatus(), JobStatus.SUCCEEDED);
+ Assert.assertEquals(event.getExecutorUrl(), "hostName");
+ Assert.assertEquals(event.getIssues().size(), 1);
+ Assert.assertEquals(event.getFlowEdgeId(), "flowEdge");
+ Assert.assertEquals(event.getSourceNode(), "sourceNode");
+ Assert.assertEquals(event.getDestinationNode(), "destinationNode");
+ Assert.assertEquals(event.getExecutorId(), "specExecutor");
+ Assert.assertEquals(event.getEffectiveUserUrn(), "azkabanUser");
+ Assert.assertEquals(event.getJobOrchestratedTimestamp(),
Long.valueOf(1));
+ Assert.assertEquals(event.getLastFlowModificationTimestamp(),
Long.valueOf(20));
+ Assert.assertEquals(event.getJobStartTimestamp(), Long.valueOf(20));
+ Assert.assertEquals(event.getJobEndTimestamp(), Long.valueOf(100));
+ Assert.assertEquals(event.getDatasetsMetrics().size(), 2);
+ Assert.assertEquals(event.getDatasetsMetrics().get(0).getDatasetUrn(),
dataset1.getDatasetUrn());
+
Assert.assertEquals(event.getDatasetsMetrics().get(0).getEntitiesWritten(),
Long.valueOf(dataset1.getRecordsWritten()));
+ Assert.assertEquals(event.getDatasetsMetrics().get(0).getBytesWritten(),
Long.valueOf(dataset1.getBytesWritten()));
+
Assert.assertEquals(event.getDatasetsMetrics().get(0).getSuccessfullyCommitted(),
Boolean.valueOf(dataset1.isSuccessfullyCommitted()));
+ Assert.assertEquals(event.getDatasetsMetrics().get(1).getDatasetUrn(),
dataset2.getDatasetUrn());
+
Assert.assertEquals(event.getDatasetsMetrics().get(1).getEntitiesWritten(),
Long.valueOf(dataset2.getRecordsWritten()));
+ Assert.assertEquals(event.getDatasetsMetrics().get(1).getBytesWritten(),
Long.valueOf(dataset2.getBytesWritten()));
+
Assert.assertEquals(event.getDatasetsMetrics().get(1).getSuccessfullyCommitted(),
Boolean.valueOf(dataset2.isSuccessfullyCommitted()));
+ JsonParser.parseString(event.getJobProperties()); // Should not throw
+ Assert.assertEquals(event.getGaasId(), "testCluster");
+ try (AvroSerializer<GaaSJobObservabilityEvent> serializer = new
AvroBinarySerializer<>(
+ GaaSJobObservabilityEvent.SCHEMA$, new
NoopSchemaVersionWriter())) {
+ serializer.serializeRecord(event);
+ }
+ }
}
@Test
@@ -155,47 +157,48 @@ public class GaaSJobObservabilityProducerTest {
String jobName = String.format("%s_%s_%s", flowGroup, flowName,
"testJobName1");
String flowExecutionId = "1";
this.issueRepository.put(
- TroubleshooterUtils.getContextIdForJob(flowGroup, flowName,
flowExecutionId, jobName),
- createTestIssue("issueSummary", "issueCode", IssueSeverity.INFO)
- );
- MockGaaSJobObservabilityEventProducer
- producer = new MockGaaSJobObservabilityEventProducer(new State(),
this.issueRepository, false);
- Map<String, String> gteEventMetadata = Maps.newHashMap();
- gteEventMetadata.put(TimingEvent.FlowEventConstants.FLOW_GROUP_FIELD,
flowGroup);
- gteEventMetadata.put(TimingEvent.FlowEventConstants.FLOW_NAME_FIELD,
flowName);
-
gteEventMetadata.put(TimingEvent.FlowEventConstants.FLOW_EXECUTION_ID_FIELD,
"1");
- gteEventMetadata.put(TimingEvent.FlowEventConstants.JOB_NAME_FIELD,
jobName);
- gteEventMetadata.put(TimingEvent.FlowEventConstants.JOB_GROUP_FIELD,
flowName);
- gteEventMetadata.put(TimingEvent.FlowEventConstants.FLOW_EDGE_FIELD,
"flowEdge");
- gteEventMetadata.put(TimingEvent.FlowEventConstants.SPEC_EXECUTOR_FIELD,
"specExecutor");
- gteEventMetadata.put(JobStatusRetriever.EVENT_NAME_FIELD,
ExecutionStatus.CANCELLED.name());
-
- Properties jobStatusProps = new Properties();
- jobStatusProps.putAll(gteEventMetadata);
- producer.emitObservabilityEvent(new State(jobStatusProps));
-
- List<GaaSJobObservabilityEvent> emittedEvents =
producer.getTestEmittedJobEvents();
-
- Assert.assertEquals(emittedEvents.size(), 1);
- Iterator<GaaSJobObservabilityEvent> iterator = emittedEvents.iterator();
- GaaSJobObservabilityEvent event = iterator.next();
- Assert.assertEquals(event.getFlowGroup(), flowGroup);
- Assert.assertEquals(event.getFlowName(), flowName);
- Assert.assertEquals(event.getJobName(), jobName);
- Assert.assertEquals(event.getFlowExecutionId(),
Long.valueOf(flowExecutionId));
- Assert.assertEquals(event.getJobStatus(), JobStatus.CANCELLED);
- Assert.assertEquals(event.getIssues().size(), 1);
- Assert.assertEquals(event.getFlowEdgeId(), "flowEdge");
- Assert.assertEquals(event.getExecutorId(), "specExecutor");
- Assert.assertEquals(event.getJobOrchestratedTimestamp(), null);
- Assert.assertEquals(event.getJobStartTimestamp(), null);
- Assert.assertEquals(event.getEffectiveUserUrn(), null);
- Assert.assertEquals(event.getExecutorUrl(), null);
-
- AvroSerializer<GaaSJobObservabilityEvent> serializer = new
AvroBinarySerializer<>(
- GaaSJobObservabilityEvent.SCHEMA$, new NoopSchemaVersionWriter()
+ TroubleshooterUtils.getContextIdForJob(flowGroup, flowName,
flowExecutionId, jobName),
+ createTestIssue("issueSummary", "issueCode", IssueSeverity.INFO)
);
- serializer.serializeRecord(event);
+ try (MockGaaSJobObservabilityEventProducer producer =
+ new MockGaaSJobObservabilityEventProducer(new State(),
this.issueRepository, false)) {
+ Map<String, String> gteEventMetadata = Maps.newHashMap();
+ gteEventMetadata.put(TimingEvent.FlowEventConstants.FLOW_GROUP_FIELD,
flowGroup);
+ gteEventMetadata.put(TimingEvent.FlowEventConstants.FLOW_NAME_FIELD,
flowName);
+
gteEventMetadata.put(TimingEvent.FlowEventConstants.FLOW_EXECUTION_ID_FIELD,
"1");
+ gteEventMetadata.put(TimingEvent.FlowEventConstants.JOB_NAME_FIELD,
jobName);
+ gteEventMetadata.put(TimingEvent.FlowEventConstants.JOB_GROUP_FIELD,
flowName);
+ gteEventMetadata.put(TimingEvent.FlowEventConstants.FLOW_EDGE_FIELD,
"flowEdge");
+ gteEventMetadata.put(TimingEvent.FlowEventConstants.SPEC_EXECUTOR_FIELD,
"specExecutor");
+ gteEventMetadata.put(JobStatusRetriever.EVENT_NAME_FIELD,
ExecutionStatus.CANCELLED.name());
+
+ Properties jobStatusProps = new Properties();
+ jobStatusProps.putAll(gteEventMetadata);
+ producer.emitObservabilityEvent(new State(jobStatusProps));
+
+ List<GaaSJobObservabilityEvent> emittedEvents =
producer.getTestEmittedJobEvents();
+
+ Assert.assertEquals(emittedEvents.size(), 1);
+ Iterator<GaaSJobObservabilityEvent> iterator = emittedEvents.iterator();
+ GaaSJobObservabilityEvent event = iterator.next();
+ Assert.assertEquals(event.getFlowGroup(), flowGroup);
+ Assert.assertEquals(event.getFlowName(), flowName);
+ Assert.assertEquals(event.getJobName(), jobName);
+ Assert.assertEquals(event.getFlowExecutionId(),
Long.valueOf(flowExecutionId));
+ Assert.assertEquals(event.getJobStatus(), JobStatus.CANCELLED);
+ Assert.assertEquals(event.getIssues().size(), 1);
+ Assert.assertEquals(event.getFlowEdgeId(), "flowEdge");
+ Assert.assertEquals(event.getExecutorId(), "specExecutor");
+ Assert.assertEquals(event.getJobOrchestratedTimestamp(), null);
+ Assert.assertEquals(event.getJobStartTimestamp(), null);
+ Assert.assertEquals(event.getEffectiveUserUrn(), null);
+ Assert.assertEquals(event.getExecutorUrl(), null);
+
+ try (AvroSerializer<GaaSJobObservabilityEvent> serializer = new
AvroBinarySerializer<>(
+ GaaSJobObservabilityEvent.SCHEMA$, new
NoopSchemaVersionWriter())) {
+ serializer.serializeRecord(event);
+ }
+ }
}
@Test
@@ -205,43 +208,44 @@ public class GaaSJobObservabilityProducerTest {
String jobName = JobStatusRetriever.NA_KEY;
String flowExecutionId = "1";
this.issueRepository.put(
- TroubleshooterUtils.getContextIdForJob(flowGroup, flowName,
flowExecutionId, jobName),
- createTestIssue("issueSummary", "issueCode", IssueSeverity.INFO)
+ TroubleshooterUtils.getContextIdForJob(flowGroup, flowName,
flowExecutionId, jobName),
+ createTestIssue("issueSummary", "issueCode", IssueSeverity.INFO)
);
State producerState = new State();
producerState.setProp(GaaSJobObservabilityEventProducer.EMIT_FLOW_OBSERVABILITY_EVENT,
"true");
- MockGaaSJobObservabilityEventProducer
- producer = new MockGaaSJobObservabilityEventProducer(producerState,
this.issueRepository, false);
- Map<String, String> gteEventMetadata = Maps.newHashMap();
- gteEventMetadata.put(TimingEvent.FlowEventConstants.FLOW_GROUP_FIELD,
flowGroup);
- gteEventMetadata.put(TimingEvent.FlowEventConstants.FLOW_NAME_FIELD,
flowName);
-
gteEventMetadata.put(TimingEvent.FlowEventConstants.FLOW_EXECUTION_ID_FIELD,
"1");
- gteEventMetadata.put(TimingEvent.FlowEventConstants.JOB_NAME_FIELD,
jobName);
- gteEventMetadata.put(TimingEvent.FlowEventConstants.JOB_GROUP_FIELD,
flowName);
- gteEventMetadata.put(TimingEvent.FlowEventConstants.SPEC_EXECUTOR_FIELD,
"specExecutor");
- gteEventMetadata.put(JobStatusRetriever.EVENT_NAME_FIELD,
ExecutionStatus.COMPLETE.name());
- gteEventMetadata.put(SerializationConstants.FLOW_START_TIME_KEY, "1");
-
- Properties jobStatusProps = new Properties();
- jobStatusProps.putAll(gteEventMetadata);
- producer.emitObservabilityEvent(new State(jobStatusProps));
-
- List<GaaSFlowObservabilityEvent> emittedEvents =
producer.getTestEmittedFlowEvents();
-
- Assert.assertEquals(emittedEvents.size(), 1);
- Iterator<GaaSFlowObservabilityEvent> iterator = emittedEvents.iterator();
- GaaSFlowObservabilityEvent event = iterator.next();
- Assert.assertEquals(event.getFlowGroup(), flowGroup);
- Assert.assertEquals(event.getFlowName(), flowName);
- Assert.assertEquals(event.getFlowExecutionId(),
Long.valueOf(flowExecutionId));
- Assert.assertEquals(event.getFlowStatus(), FlowStatus.SUCCEEDED);
- Assert.assertNull(event.getEffectiveUserUrn());
- Assert.assertEquals(event.getFlowStartTimestamp(), Long.valueOf(1));
-
- AvroSerializer<GaaSFlowObservabilityEvent> serializer = new
AvroBinarySerializer<>(
- GaaSFlowObservabilityEvent.SCHEMA$, new NoopSchemaVersionWriter()
- );
- serializer.serializeRecord(event);
+ try (MockGaaSJobObservabilityEventProducer producer =
+ new MockGaaSJobObservabilityEventProducer(producerState,
this.issueRepository, false)) {
+ Map<String, String> gteEventMetadata = Maps.newHashMap();
+ gteEventMetadata.put(TimingEvent.FlowEventConstants.FLOW_GROUP_FIELD,
flowGroup);
+ gteEventMetadata.put(TimingEvent.FlowEventConstants.FLOW_NAME_FIELD,
flowName);
+
gteEventMetadata.put(TimingEvent.FlowEventConstants.FLOW_EXECUTION_ID_FIELD,
"1");
+ gteEventMetadata.put(TimingEvent.FlowEventConstants.JOB_NAME_FIELD,
jobName);
+ gteEventMetadata.put(TimingEvent.FlowEventConstants.JOB_GROUP_FIELD,
flowName);
+ gteEventMetadata.put(TimingEvent.FlowEventConstants.SPEC_EXECUTOR_FIELD,
"specExecutor");
+ gteEventMetadata.put(JobStatusRetriever.EVENT_NAME_FIELD,
ExecutionStatus.COMPLETE.name());
+ gteEventMetadata.put(SerializationConstants.FLOW_START_TIME_KEY, "1");
+
+ Properties jobStatusProps = new Properties();
+ jobStatusProps.putAll(gteEventMetadata);
+ producer.emitObservabilityEvent(new State(jobStatusProps));
+
+ List<GaaSFlowObservabilityEvent> emittedEvents =
producer.getTestEmittedFlowEvents();
+
+ Assert.assertEquals(emittedEvents.size(), 1);
+ Iterator<GaaSFlowObservabilityEvent> iterator = emittedEvents.iterator();
+ GaaSFlowObservabilityEvent event = iterator.next();
+ Assert.assertEquals(event.getFlowGroup(), flowGroup);
+ Assert.assertEquals(event.getFlowName(), flowName);
+ Assert.assertEquals(event.getFlowExecutionId(),
Long.valueOf(flowExecutionId));
+ Assert.assertEquals(event.getFlowStatus(), FlowStatus.SUCCEEDED);
+ Assert.assertNull(event.getEffectiveUserUrn());
+ Assert.assertEquals(event.getFlowStartTimestamp(), Long.valueOf(1));
+
+ try (AvroSerializer<GaaSFlowObservabilityEvent> serializer = new
AvroBinarySerializer<>(
+ GaaSFlowObservabilityEvent.SCHEMA$, new
NoopSchemaVersionWriter())) {
+ serializer.serializeRecord(event);
+ }
+ }
}
@Test
@@ -251,31 +255,31 @@ public class GaaSJobObservabilityProducerTest {
String jobName = String.format("%s_%s_%s", flowGroup, flowName,
"testJobName1");
String flowExecutionId = "1";
this.issueRepository.put(
- TroubleshooterUtils.getContextIdForJob(flowGroup, flowName,
flowExecutionId, jobName),
- createTestIssue("issueSummary", "issueCode", IssueSeverity.INFO)
+ TroubleshooterUtils.getContextIdForJob(flowGroup, flowName,
flowExecutionId, jobName),
+ createTestIssue("issueSummary", "issueCode", IssueSeverity.INFO)
);
State producerState = new State();
producerState.setProp(ConfigurationKeys.METRICS_REPORTING_OPENTELEMETRY_ENABLED,
"true");
producerState.setProp(ConfigurationKeys.METRICS_REPORTING_OPENTELEMETRY_ENDPOINT,
"http://localhost:5000");
- MockGaaSJobObservabilityEventProducer
- producer = new MockGaaSJobObservabilityEventProducer(producerState,
this.issueRepository, true);
-
- Map<String, String> gteEventMetadata = Maps.newHashMap();
- gteEventMetadata.put(TimingEvent.FlowEventConstants.FLOW_GROUP_FIELD,
flowGroup);
- gteEventMetadata.put(TimingEvent.FlowEventConstants.FLOW_NAME_FIELD,
flowName);
-
gteEventMetadata.put(TimingEvent.FlowEventConstants.FLOW_EXECUTION_ID_FIELD,
"1");
- gteEventMetadata.put(TimingEvent.FlowEventConstants.JOB_NAME_FIELD,
jobName);
- gteEventMetadata.put(TimingEvent.FlowEventConstants.JOB_GROUP_FIELD,
flowName);
- gteEventMetadata.put(TimingEvent.FlowEventConstants.FLOW_EDGE_FIELD,
"flowEdge");
- gteEventMetadata.put(TimingEvent.FlowEventConstants.SPEC_EXECUTOR_FIELD,
"specExecutor");
- gteEventMetadata.put(JobStatusRetriever.EVENT_NAME_FIELD,
ExecutionStatus.CANCELLED.name());
-
- Properties jobStatusProps = new Properties();
- jobStatusProps.putAll(gteEventMetadata);
-
- // Ensure that this doesn't throw due to NPE
- producer.emitObservabilityEvent(new State(jobStatusProps));
+ try (MockGaaSJobObservabilityEventProducer producer =
+ new MockGaaSJobObservabilityEventProducer(producerState,
this.issueRepository, true)) {
+ Map<String, String> gteEventMetadata = Maps.newHashMap();
+ gteEventMetadata.put(TimingEvent.FlowEventConstants.FLOW_GROUP_FIELD,
flowGroup);
+ gteEventMetadata.put(TimingEvent.FlowEventConstants.FLOW_NAME_FIELD,
flowName);
+
gteEventMetadata.put(TimingEvent.FlowEventConstants.FLOW_EXECUTION_ID_FIELD,
"1");
+ gteEventMetadata.put(TimingEvent.FlowEventConstants.JOB_NAME_FIELD,
jobName);
+ gteEventMetadata.put(TimingEvent.FlowEventConstants.JOB_GROUP_FIELD,
flowName);
+ gteEventMetadata.put(TimingEvent.FlowEventConstants.FLOW_EDGE_FIELD,
"flowEdge");
+ gteEventMetadata.put(TimingEvent.FlowEventConstants.SPEC_EXECUTOR_FIELD,
"specExecutor");
+ gteEventMetadata.put(JobStatusRetriever.EVENT_NAME_FIELD,
ExecutionStatus.CANCELLED.name());
+
+ Properties jobStatusProps = new Properties();
+ jobStatusProps.putAll(gteEventMetadata);
+
+ // Ensure that this doesn't throw due to NPE
+ producer.emitObservabilityEvent(new State(jobStatusProps));
+ }
}
@Test
@@ -285,67 +289,446 @@ public class GaaSJobObservabilityProducerTest {
String jobName = String.format("%s_%s_%s", flowGroup, flowName,
"testJobName1");
String flowExecutionId = "1";
this.issueRepository.put(
- TroubleshooterUtils.getContextIdForJob(flowGroup, flowName,
flowExecutionId, jobName),
- createTestIssue("issueSummary", "issueCode", IssueSeverity.INFO)
+ TroubleshooterUtils.getContextIdForJob(flowGroup, flowName,
flowExecutionId, jobName),
+ createTestIssue("issueSummary", "issueCode", IssueSeverity.INFO)
);
State producerState = new State();
producerState.setProp(ConfigurationKeys.METRICS_REPORTING_OPENTELEMETRY_ENABLED,
"true");
- MockGaaSJobObservabilityEventProducer
- producer = new MockGaaSJobObservabilityEventProducer(producerState,
this.issueRepository, true);
- Map<String, String> gteEventMetadata = Maps.newHashMap();
- gteEventMetadata.put(TimingEvent.FlowEventConstants.FLOW_GROUP_FIELD,
flowGroup);
- gteEventMetadata.put(TimingEvent.FlowEventConstants.FLOW_NAME_FIELD,
flowName);
-
gteEventMetadata.put(TimingEvent.FlowEventConstants.FLOW_EXECUTION_ID_FIELD,
"1");
- gteEventMetadata.put(TimingEvent.FlowEventConstants.JOB_NAME_FIELD,
jobName);
- gteEventMetadata.put(TimingEvent.FlowEventConstants.JOB_GROUP_FIELD,
flowName);
- gteEventMetadata.put(TimingEvent.FlowEventConstants.FLOW_EDGE_FIELD,
"flowEdge");
- gteEventMetadata.put(TimingEvent.FlowEventConstants.SPEC_EXECUTOR_FIELD,
"specExecutor");
- gteEventMetadata.put(JobStatusRetriever.EVENT_NAME_FIELD,
ExecutionStatus.CANCELLED.name());
-
- Map<String, String> gteEventMetadata2 = Maps.newHashMap();
- gteEventMetadata2.put(TimingEvent.FlowEventConstants.FLOW_GROUP_FIELD,
flowGroup);
- gteEventMetadata2.put(TimingEvent.FlowEventConstants.FLOW_NAME_FIELD,
flowName);
-
gteEventMetadata2.put(TimingEvent.FlowEventConstants.FLOW_EXECUTION_ID_FIELD,
"2");
- gteEventMetadata2.put(TimingEvent.FlowEventConstants.JOB_NAME_FIELD,
jobName);
- gteEventMetadata2.put(TimingEvent.FlowEventConstants.JOB_GROUP_FIELD,
flowName);
- gteEventMetadata2.put(TimingEvent.FlowEventConstants.FLOW_EDGE_FIELD,
"flowEdge");
- gteEventMetadata2.put(TimingEvent.FlowEventConstants.SPEC_EXECUTOR_FIELD,
"specExecutor");
- gteEventMetadata2.put(JobStatusRetriever.EVENT_NAME_FIELD,
ExecutionStatus.COMPLETE.name());
-
- Properties jobStatusProps = new Properties();
- Properties jobStatusProps2 = new Properties();
- jobStatusProps.putAll(gteEventMetadata); // Ensure that this doesn't
throw due to NPE
- producer.emitObservabilityEvent(new State(jobStatusProps));
- jobStatusProps2.putAll(gteEventMetadata2);
- producer.emitObservabilityEvent(new State(jobStatusProps2));
- Collection<MetricData> metrics =
producer.getOpentelemetryMetrics().metricReader.collectAllMetrics();
- // Check number of meters
- Assert.assertEquals(metrics.size(), 1);
- Map<String, MetricData > metricsByName =
metrics.stream().collect(Collectors.toMap(metric -> metric.getName(),
metricData -> metricData));
- MetricData jobStatusMetric = metricsByName.get("jobSucceeded");
- // Check the attributes of the metrics
- List<LongPointData> datapoints =
jobStatusMetric.getLongGaugeData().getPoints().stream().collect(Collectors.toList());
- Assert.assertEquals(datapoints.size(), 2);
- // Check that the values are different for the two events (order not
guaranteed for the same collection event)
- Assert.assertNotEquals(datapoints.get(0).getValue(),
datapoints.get(1).getValue());
-
Assert.assertNotEquals(datapoints.get(0).getAttributes().asMap().get(AttributeKey.longKey("flowExecutionId")),
-
datapoints.get(1).getAttributes().asMap().get(AttributeKey.longKey("flowExecutionId")));
-
- // Check common string tag
-
Assert.assertEquals(datapoints.get(0).getAttributes().asMap().get(AttributeKey.stringKey("flowGroup")),
flowGroup);
-
Assert.assertEquals(datapoints.get(1).getAttributes().asMap().get(AttributeKey.stringKey("flowGroup")),
flowGroup);
- datapoints.forEach(point -> {
- if
(point.getAttributes().asMap().get(AttributeKey.longKey("flowExecutionId")).equals(1L))
{
- Assert.assertEquals(point.getValue(), 0); // Cancelled job should show
up as a 0
- } else if
(point.getAttributes().asMap().get(AttributeKey.longKey("flowExecutionId")).equals(2L))
{
- Assert.assertEquals(point.getValue(), 1L); // Completed job should
show up as a 1
- }
-
Assert.assertEquals(point.getAttributes().asMap().get(AttributeKey.stringKey("flowName")),
flowName);
-
Assert.assertEquals(point.getAttributes().asMap().get(AttributeKey.stringKey("jobName")),
jobName);
-
Assert.assertEquals(point.getAttributes().asMap().get(AttributeKey.stringKey("flowEdge")),
"flowEdge");
-
Assert.assertEquals(point.getAttributes().asMap().get(AttributeKey.stringKey("specExecutor")),
"specExecutor");
- });
+ try (MockGaaSJobObservabilityEventProducer producer =
+ new MockGaaSJobObservabilityEventProducer(producerState,
this.issueRepository, true)) {
+ Map<String, String> gteEventMetadata = Maps.newHashMap();
+ gteEventMetadata.put(TimingEvent.FlowEventConstants.FLOW_GROUP_FIELD,
flowGroup);
+ gteEventMetadata.put(TimingEvent.FlowEventConstants.FLOW_NAME_FIELD,
flowName);
+
gteEventMetadata.put(TimingEvent.FlowEventConstants.FLOW_EXECUTION_ID_FIELD,
"1");
+ gteEventMetadata.put(TimingEvent.FlowEventConstants.JOB_NAME_FIELD,
jobName);
+ gteEventMetadata.put(TimingEvent.FlowEventConstants.JOB_GROUP_FIELD,
flowName);
+ gteEventMetadata.put(TimingEvent.FlowEventConstants.FLOW_EDGE_FIELD,
"flowEdge");
+ gteEventMetadata.put(TimingEvent.FlowEventConstants.SPEC_EXECUTOR_FIELD,
"specExecutor");
+ gteEventMetadata.put(JobStatusRetriever.EVENT_NAME_FIELD,
ExecutionStatus.CANCELLED.name());
+
+ Map<String, String> gteEventMetadata2 = Maps.newHashMap();
+ gteEventMetadata2.put(TimingEvent.FlowEventConstants.FLOW_GROUP_FIELD,
flowGroup);
+ gteEventMetadata2.put(TimingEvent.FlowEventConstants.FLOW_NAME_FIELD,
flowName);
+
gteEventMetadata2.put(TimingEvent.FlowEventConstants.FLOW_EXECUTION_ID_FIELD,
"2");
+ gteEventMetadata2.put(TimingEvent.FlowEventConstants.JOB_NAME_FIELD,
jobName);
+ gteEventMetadata2.put(TimingEvent.FlowEventConstants.JOB_GROUP_FIELD,
flowName);
+ gteEventMetadata2.put(TimingEvent.FlowEventConstants.FLOW_EDGE_FIELD,
"flowEdge");
+
gteEventMetadata2.put(TimingEvent.FlowEventConstants.SPEC_EXECUTOR_FIELD,
"specExecutor");
+ gteEventMetadata2.put(JobStatusRetriever.EVENT_NAME_FIELD,
ExecutionStatus.COMPLETE.name());
+
+ Properties jobStatusProps = new Properties();
+ Properties jobStatusProps2 = new Properties();
+ jobStatusProps.putAll(gteEventMetadata); // Ensure that this doesn't
throw due to NPE
+ producer.emitObservabilityEvent(new State(jobStatusProps));
+ jobStatusProps2.putAll(gteEventMetadata2);
+ producer.emitObservabilityEvent(new State(jobStatusProps2));
+ Collection<MetricData> metrics =
producer.getOpentelemetryMetrics().metricReader.collectAllMetrics();
+ // Check number of meters
+ Assert.assertEquals(metrics.size(), 1);
+ Map<String, MetricData > metricsByName =
metrics.stream().collect(Collectors.toMap(metric -> metric.getName(),
metricData -> metricData));
+ MetricData jobStatusMetric = metricsByName.get("jobSucceeded");
+ // Check the attributes of the metrics
+ List<LongPointData> datapoints =
jobStatusMetric.getLongGaugeData().getPoints().stream().collect(Collectors.toList());
+ Assert.assertEquals(datapoints.size(), 2);
+ // Check that the values are different for the two events (order not
guaranteed for the same collection event)
+ Assert.assertNotEquals(datapoints.get(0).getValue(),
datapoints.get(1).getValue());
+
Assert.assertNotEquals(datapoints.get(0).getAttributes().asMap().get(AttributeKey.longKey("flowExecutionId")),
+
datapoints.get(1).getAttributes().asMap().get(AttributeKey.longKey("flowExecutionId")));
+
+ // Check common string tag
+
Assert.assertEquals(datapoints.get(0).getAttributes().asMap().get(AttributeKey.stringKey("flowGroup")),
flowGroup);
+
Assert.assertEquals(datapoints.get(1).getAttributes().asMap().get(AttributeKey.stringKey("flowGroup")),
flowGroup);
+ datapoints.forEach(point -> {
+ if
(point.getAttributes().asMap().get(AttributeKey.longKey("flowExecutionId")).equals(1L))
{
+ Assert.assertEquals(point.getValue(), 0); // Cancelled job should
show up as a 0
+ } else if
(point.getAttributes().asMap().get(AttributeKey.longKey("flowExecutionId")).equals(2L))
{
+ Assert.assertEquals(point.getValue(), 1L); // Completed job should
show up as a 1
+ }
+
Assert.assertEquals(point.getAttributes().asMap().get(AttributeKey.stringKey("flowName")),
flowName);
+
Assert.assertEquals(point.getAttributes().asMap().get(AttributeKey.stringKey("jobName")),
jobName);
+
Assert.assertEquals(point.getAttributes().asMap().get(AttributeKey.stringKey("flowEdge")),
"flowEdge");
+
Assert.assertEquals(point.getAttributes().asMap().get(AttributeKey.stringKey("specExecutor")),
"specExecutor");
+ });
+ }
+ }
+
+ @Test
+ public void testMockProduceMetrics_dimensionsMapNoFallback() throws
Exception {
+ String flowGroup = "testFlowGroupMapNoFallback";
+ String flowName = "testFlowNameMapNoFallback";
+ String jobName = String.format("%s_%s_%s", flowGroup, flowName,
"testJobNameMapNoFallback");
+
+ State producerState = new State();
+
producerState.setProp(ConfigurationKeys.METRICS_REPORTING_OPENTELEMETRY_ENABLED,
"true");
+ // Use a non-baseline field to prove we're not falling back to the
hardcoded attributes,
+ // and that non-baseline mappings are honored.
+
producerState.setProp(GaaSJobObservabilityEventProducer.JOB_SUCCEEDED_DIMENSIONS_MAP_KEY,
+ "{\"status\":\"jobStatus\"}");
+
producerState.setProp(GaaSJobObservabilityEventProducer.JOB_SUCCEEDED_EXTRA_DIMENSIONS_ENABLED_KEY,
"true");
+
+ try (MockGaaSJobObservabilityEventProducer producer =
+ new MockGaaSJobObservabilityEventProducer(producerState,
this.issueRepository, true)) {
+ Properties jobProps = new Properties();
+
jobProps.setProperty(GaaSJobObservabilityEventProducer.JOB_SUCCEEDED_EXTRA_DIMENSIONS_KEYS_JOBPROP,
"java_version,tag");
+ jobProps.setProperty("java_version", "11");
+ jobProps.setProperty("tag", "foo");
+
+ Map<String, String> gteEventMetadata = Maps.newHashMap();
+ gteEventMetadata.put(TimingEvent.FlowEventConstants.FLOW_GROUP_FIELD,
flowGroup);
+ gteEventMetadata.put(TimingEvent.FlowEventConstants.FLOW_NAME_FIELD,
flowName);
+
gteEventMetadata.put(TimingEvent.FlowEventConstants.FLOW_EXECUTION_ID_FIELD,
"1");
+ gteEventMetadata.put(TimingEvent.FlowEventConstants.JOB_NAME_FIELD,
jobName);
+ gteEventMetadata.put(TimingEvent.FlowEventConstants.JOB_GROUP_FIELD,
flowName);
+ gteEventMetadata.put(TimingEvent.FlowEventConstants.FLOW_EDGE_FIELD,
"flowEdge");
+ gteEventMetadata.put(TimingEvent.FlowEventConstants.SPEC_EXECUTOR_FIELD,
"specExecutor");
+ gteEventMetadata.put(JobStatusRetriever.EVENT_NAME_FIELD,
ExecutionStatus.COMPLETE.name());
+ gteEventMetadata.put(JobExecutionPlan.JOB_PROPS_KEY,
PropertiesUtils.serialize(jobProps));
+
+ Properties jobStatusProps = new Properties();
+ jobStatusProps.putAll(gteEventMetadata);
+ producer.emitObservabilityEvent(new State(jobStatusProps));
+
+ Collection<MetricData> metrics =
producer.getOpentelemetryMetrics().metricReader.collectAllMetrics();
+ Map<String, MetricData> metricsByName =
+ metrics.stream().collect(Collectors.toMap(metric ->
metric.getName(), metricData -> metricData));
+ MetricData jobStatusMetric = metricsByName.get("jobSucceeded");
+ List<LongPointData> datapoints =
jobStatusMetric.getLongGaugeData().getPoints().stream().collect(Collectors.toList());
+ Assert.assertEquals(datapoints.size(), 1);
+
+ Map<AttributeKey<?>, Object> attrs =
datapoints.get(0).getAttributes().asMap();
+
+ // From dimensionsMap
+ Assert.assertEquals(attrs.get(AttributeKey.stringKey("status")),
"SUCCEEDED");
+
+ // Baseline dimensions should always be present (backfilled if missing
from the dimensionsMap)
+ Assert.assertEquals(attrs.get(AttributeKey.stringKey("specExecutor")),
"specExecutor");
+ Assert.assertEquals(attrs.get(AttributeKey.stringKey("flowEdge")),
"flowEdge");
+
+ // Extra dimensions from job props
+ Assert.assertEquals(attrs.get(AttributeKey.stringKey("java_version")),
"11");
+ Assert.assertEquals(attrs.get(AttributeKey.stringKey("tag")), "foo");
+ }
+ }
+
+ @Test
+ public void
testMockProduceMetrics_dimensionsMapMissingBaselineDimsStillEmitted() throws
Exception {
+ String flowGroup = "testFlowGroupBaselineBackfill";
+ String flowName = "testFlowNameBaselineBackfill";
+ String jobName = String.format("%s_%s_%s", flowGroup, flowName,
"testJobNameBaselineBackfill");
+
+ State producerState = new State();
+
producerState.setProp(ConfigurationKeys.METRICS_REPORTING_OPENTELEMETRY_ENABLED,
"true");
+ // Intentionally omit baseline keys like
flowName/flowGroup/jobName/flowExecutionId to ensure they are backfilled.
+
producerState.setProp(GaaSJobObservabilityEventProducer.JOB_SUCCEEDED_DIMENSIONS_MAP_KEY,
+ "{\"status\":\"jobStatus\"}");
+
+ try (MockGaaSJobObservabilityEventProducer producer =
+ new MockGaaSJobObservabilityEventProducer(producerState,
this.issueRepository, true)) {
+ Map<String, String> gteEventMetadata = Maps.newHashMap();
+ gteEventMetadata.put(TimingEvent.FlowEventConstants.FLOW_GROUP_FIELD,
flowGroup);
+ gteEventMetadata.put(TimingEvent.FlowEventConstants.FLOW_NAME_FIELD,
flowName);
+
gteEventMetadata.put(TimingEvent.FlowEventConstants.FLOW_EXECUTION_ID_FIELD,
"1");
+ gteEventMetadata.put(TimingEvent.FlowEventConstants.JOB_NAME_FIELD,
jobName);
+ gteEventMetadata.put(TimingEvent.FlowEventConstants.JOB_GROUP_FIELD,
flowName);
+ gteEventMetadata.put(TimingEvent.FlowEventConstants.FLOW_EDGE_FIELD,
"flowEdge");
+ gteEventMetadata.put(TimingEvent.FlowEventConstants.SPEC_EXECUTOR_FIELD,
"specExecutor");
+ gteEventMetadata.put(JobStatusRetriever.EVENT_NAME_FIELD,
ExecutionStatus.COMPLETE.name());
+
+ Properties jobStatusProps = new Properties();
+ jobStatusProps.putAll(gteEventMetadata);
+ producer.emitObservabilityEvent(new State(jobStatusProps));
+
+ Collection<MetricData> metrics =
producer.getOpentelemetryMetrics().metricReader.collectAllMetrics();
+ Map<String, MetricData> metricsByName =
+ metrics.stream().collect(Collectors.toMap(metric ->
metric.getName(), metricData -> metricData));
+ MetricData jobStatusMetric = metricsByName.get("jobSucceeded");
+ List<LongPointData> datapoints =
jobStatusMetric.getLongGaugeData().getPoints().stream().collect(Collectors.toList());
+ Assert.assertEquals(datapoints.size(), 1);
+ Map<AttributeKey<?>, Object> attrs =
datapoints.get(0).getAttributes().asMap();
+
+ // Configured key
+ Assert.assertEquals(attrs.get(AttributeKey.stringKey("status")),
"SUCCEEDED");
+
+ // Backfilled baseline keys
+ Assert.assertEquals(attrs.get(AttributeKey.stringKey("flowName")),
flowName);
+ Assert.assertEquals(attrs.get(AttributeKey.stringKey("flowGroup")),
flowGroup);
+ Assert.assertEquals(attrs.get(AttributeKey.stringKey("jobName")),
jobName);
+ Assert.assertEquals(attrs.get(AttributeKey.longKey("flowExecutionId")),
1L);
+ Assert.assertEquals(attrs.get(AttributeKey.stringKey("specExecutor")),
"specExecutor");
+ Assert.assertEquals(attrs.get(AttributeKey.stringKey("flowEdge")),
"flowEdge");
+ }
+ }
+
+ @Test
+ public void testMockProduceMetrics_dimensionsMapDuplicateObsFieldAllowed()
throws Exception {
+ String flowGroup = "testFlowGroupDupObsFieldAllowed";
+ String flowName = "testFlowNameDupObsFieldAllowed";
+ String jobName = String.format("%s_%s_%s", flowGroup, flowName,
"testJobNameDupObsFieldAllowed");
+
+ State producerState = new State();
+
producerState.setProp(ConfigurationKeys.METRICS_REPORTING_OPENTELEMETRY_ENABLED,
"true");
+ // Map baseline source fields under additional keys; these should be
allowed (multiple dimension keys may map to the same event field).
+
producerState.setProp(GaaSJobObservabilityEventProducer.JOB_SUCCEEDED_DIMENSIONS_MAP_KEY,
+ "{\"edgeId\":\"flowEdgeId\",\"executor\":\"executorId\"}");
+
+ try (MockGaaSJobObservabilityEventProducer producer =
+ new MockGaaSJobObservabilityEventProducer(producerState,
this.issueRepository, true)) {
+ Map<String, String> gteEventMetadata = Maps.newHashMap();
+ gteEventMetadata.put(TimingEvent.FlowEventConstants.FLOW_GROUP_FIELD,
flowGroup);
+ gteEventMetadata.put(TimingEvent.FlowEventConstants.FLOW_NAME_FIELD,
flowName);
+
gteEventMetadata.put(TimingEvent.FlowEventConstants.FLOW_EXECUTION_ID_FIELD,
"1");
+ gteEventMetadata.put(TimingEvent.FlowEventConstants.JOB_NAME_FIELD,
jobName);
+ gteEventMetadata.put(TimingEvent.FlowEventConstants.JOB_GROUP_FIELD,
flowName);
+ gteEventMetadata.put(TimingEvent.FlowEventConstants.FLOW_EDGE_FIELD,
"flowEdge");
+ gteEventMetadata.put(TimingEvent.FlowEventConstants.SPEC_EXECUTOR_FIELD,
"specExecutor");
+ gteEventMetadata.put(JobStatusRetriever.EVENT_NAME_FIELD,
ExecutionStatus.COMPLETE.name());
+
+ Properties jobStatusProps = new Properties();
+ jobStatusProps.putAll(gteEventMetadata);
+ producer.emitObservabilityEvent(new State(jobStatusProps));
+
+ Collection<MetricData> metrics =
producer.getOpentelemetryMetrics().metricReader.collectAllMetrics();
+ Map<String, MetricData> metricsByName =
+ metrics.stream().collect(Collectors.toMap(MetricData::getName,
metricData -> metricData));
+ MetricData jobStatusMetric = metricsByName.get("jobSucceeded");
+ List<LongPointData> datapoints =
jobStatusMetric.getLongGaugeData().getPoints().stream().collect(Collectors.toList());
+ Assert.assertEquals(datapoints.size(), 1);
+ Map<AttributeKey<?>, Object> attrs =
datapoints.get(0).getAttributes().asMap();
+
+ // Orchestrator configured keys present.
+ Assert.assertEquals(attrs.get(AttributeKey.stringKey("edgeId")),
"flowEdge");
+ Assert.assertEquals(attrs.get(AttributeKey.stringKey("executor")),
"specExecutor");
+
+ // Baseline keys still present.
+ Assert.assertEquals(attrs.get(AttributeKey.stringKey("flowEdge")),
"flowEdge");
+ Assert.assertEquals(attrs.get(AttributeKey.stringKey("specExecutor")),
"specExecutor");
+ }
+ }
+
+ @Test
+ public void
testMockProduceMetrics_noDimensionsMapExtraDimsStillHonoredWhenGateEnabled()
throws Exception {
+ String flowGroup = "testFlowGroupNoMapExtraDims";
+ String flowName = "testFlowNameNoMapExtraDims";
+ String jobName = String.format("%s_%s_%s", flowGroup, flowName,
"testJobNameNoMapExtraDims");
+
+ State producerState = new State();
+
producerState.setProp(ConfigurationKeys.METRICS_REPORTING_OPENTELEMETRY_ENABLED,
"true");
+ // No dimensionsMap set => fallback baseline attributes
+
producerState.setProp(GaaSJobObservabilityEventProducer.JOB_SUCCEEDED_EXTRA_DIMENSIONS_ENABLED_KEY,
"true");
+
+ try (MockGaaSJobObservabilityEventProducer producer =
+ new MockGaaSJobObservabilityEventProducer(producerState,
this.issueRepository, true)) {
+ Properties jobProps = new Properties();
+
jobProps.setProperty(GaaSJobObservabilityEventProducer.JOB_SUCCEEDED_EXTRA_DIMENSIONS_KEYS_JOBPROP,
"java_version,tag");
+ jobProps.setProperty("java_version", "11");
+ jobProps.setProperty("tag", "foo");
+
+ Map<String, String> gteEventMetadata = Maps.newHashMap();
+ gteEventMetadata.put(TimingEvent.FlowEventConstants.FLOW_GROUP_FIELD,
flowGroup);
+ gteEventMetadata.put(TimingEvent.FlowEventConstants.FLOW_NAME_FIELD,
flowName);
+
gteEventMetadata.put(TimingEvent.FlowEventConstants.FLOW_EXECUTION_ID_FIELD,
"1");
+ gteEventMetadata.put(TimingEvent.FlowEventConstants.JOB_NAME_FIELD,
jobName);
+ gteEventMetadata.put(TimingEvent.FlowEventConstants.JOB_GROUP_FIELD,
flowName);
+ gteEventMetadata.put(TimingEvent.FlowEventConstants.FLOW_EDGE_FIELD,
"flowEdge");
+ gteEventMetadata.put(TimingEvent.FlowEventConstants.SPEC_EXECUTOR_FIELD,
"specExecutor");
+ gteEventMetadata.put(JobStatusRetriever.EVENT_NAME_FIELD,
ExecutionStatus.COMPLETE.name());
+ gteEventMetadata.put(JobExecutionPlan.JOB_PROPS_KEY,
PropertiesUtils.serialize(jobProps));
+
+ Properties jobStatusProps = new Properties();
+ jobStatusProps.putAll(gteEventMetadata);
+ producer.emitObservabilityEvent(new State(jobStatusProps));
+
+ Collection<MetricData> metrics =
producer.getOpentelemetryMetrics().metricReader.collectAllMetrics();
+ Map<String, MetricData> metricsByName =
+ metrics.stream().collect(Collectors.toMap(MetricData::getName,
metricData -> metricData));
+ MetricData jobStatusMetric = metricsByName.get("jobSucceeded");
+ List<LongPointData> datapoints =
jobStatusMetric.getLongGaugeData().getPoints().stream().collect(Collectors.toList());
+ Assert.assertEquals(datapoints.size(), 1);
+ Map<AttributeKey<?>, Object> attrs =
datapoints.get(0).getAttributes().asMap();
+
+ // Baseline keys
+ Assert.assertEquals(attrs.get(AttributeKey.stringKey("flowName")),
flowName);
+ Assert.assertEquals(attrs.get(AttributeKey.stringKey("flowGroup")),
flowGroup);
+ Assert.assertEquals(attrs.get(AttributeKey.stringKey("jobName")),
jobName);
+ Assert.assertEquals(attrs.get(AttributeKey.longKey("flowExecutionId")),
1L);
+ Assert.assertEquals(attrs.get(AttributeKey.stringKey("specExecutor")),
"specExecutor");
+ Assert.assertEquals(attrs.get(AttributeKey.stringKey("flowEdge")),
"flowEdge");
+
+ // Extra dims still honored
+ Assert.assertEquals(attrs.get(AttributeKey.stringKey("java_version")),
"11");
+ Assert.assertEquals(attrs.get(AttributeKey.stringKey("tag")), "foo");
+ }
+ }
+
+ @Test
+ public void testMockProduceMetrics_maxDimensionsCapsExtraDims() throws
Exception {
+ String flowGroup = "testFlowGroupMaxDims";
+ String flowName = "testFlowNameMaxDims";
+ String jobName = String.format("%s_%s_%s", flowGroup, flowName,
"testJobNameMaxDims");
+
+ State producerState = new State();
+
producerState.setProp(ConfigurationKeys.METRICS_REPORTING_OPENTELEMETRY_ENABLED,
"true");
+
producerState.setProp(GaaSJobObservabilityEventProducer.JOB_SUCCEEDED_EXTRA_DIMENSIONS_ENABLED_KEY,
"true");
+ // Baseline has 6 dims; cap at 7 means we cannot add both extras, so we
add none (all-or-nothing).
+
producerState.setProp(GaaSJobObservabilityEventProducer.JOB_SUCCEEDED_MAX_DIMENSIONS_KEY,
"7");
+
+ try (MockGaaSJobObservabilityEventProducer producer =
+ new MockGaaSJobObservabilityEventProducer(producerState,
this.issueRepository, true)) {
+ Properties jobProps = new Properties();
+
jobProps.setProperty(GaaSJobObservabilityEventProducer.JOB_SUCCEEDED_EXTRA_DIMENSIONS_KEYS_JOBPROP,
"java_version,tag");
+ jobProps.setProperty("java_version", "11");
+ jobProps.setProperty("tag", "foo");
+
+ Map<String, String> gteEventMetadata = Maps.newHashMap();
+ gteEventMetadata.put(TimingEvent.FlowEventConstants.FLOW_GROUP_FIELD,
flowGroup);
+ gteEventMetadata.put(TimingEvent.FlowEventConstants.FLOW_NAME_FIELD,
flowName);
+
gteEventMetadata.put(TimingEvent.FlowEventConstants.FLOW_EXECUTION_ID_FIELD,
"1");
+ gteEventMetadata.put(TimingEvent.FlowEventConstants.JOB_NAME_FIELD,
jobName);
+ gteEventMetadata.put(TimingEvent.FlowEventConstants.JOB_GROUP_FIELD,
flowName);
+ gteEventMetadata.put(TimingEvent.FlowEventConstants.FLOW_EDGE_FIELD,
"flowEdge");
+ gteEventMetadata.put(TimingEvent.FlowEventConstants.SPEC_EXECUTOR_FIELD,
"specExecutor");
+ gteEventMetadata.put(JobStatusRetriever.EVENT_NAME_FIELD,
ExecutionStatus.COMPLETE.name());
+ gteEventMetadata.put(JobExecutionPlan.JOB_PROPS_KEY,
PropertiesUtils.serialize(jobProps));
+
+ Properties jobStatusProps = new Properties();
+ jobStatusProps.putAll(gteEventMetadata);
+ producer.emitObservabilityEvent(new State(jobStatusProps));
+
+ Collection<MetricData> metrics =
producer.getOpentelemetryMetrics().metricReader.collectAllMetrics();
+ Map<String, MetricData> metricsByName =
+ metrics.stream().collect(Collectors.toMap(MetricData::getName,
metricData -> metricData));
+ MetricData jobStatusMetric = metricsByName.get("jobSucceeded");
+ List<LongPointData> datapoints =
jobStatusMetric.getLongGaugeData().getPoints().stream().collect(Collectors.toList());
+ Assert.assertEquals(datapoints.size(), 1);
+ Map<AttributeKey<?>, Object> attrs =
datapoints.get(0).getAttributes().asMap();
+
+ Assert.assertNull(attrs.get(AttributeKey.stringKey("java_version")));
+ Assert.assertNull(attrs.get(AttributeKey.stringKey("tag")));
+ }
+ }
+
+ @Test
+ public void testMockProduceMetrics_maxDimensionsAllowsAllExtraDims() throws
Exception {
+ String flowGroup = "testFlowGroupMaxDimsAllExtras";
+ String flowName = "testFlowNameMaxDimsAllExtras";
+ String jobName = String.format("%s_%s_%s", flowGroup, flowName,
"testJobNameMaxDimsAllExtras");
+
+ State producerState = new State();
+
producerState.setProp(ConfigurationKeys.METRICS_REPORTING_OPENTELEMETRY_ENABLED,
"true");
+
producerState.setProp(GaaSJobObservabilityEventProducer.JOB_SUCCEEDED_EXTRA_DIMENSIONS_ENABLED_KEY,
"true");
+ // Baseline(6) + extras(2) = 8 fits.
+
producerState.setProp(GaaSJobObservabilityEventProducer.JOB_SUCCEEDED_MAX_DIMENSIONS_KEY,
"8");
+
+ try (MockGaaSJobObservabilityEventProducer producer =
+ new MockGaaSJobObservabilityEventProducer(producerState,
this.issueRepository, true)) {
+ Properties jobProps = new Properties();
+
jobProps.setProperty(GaaSJobObservabilityEventProducer.JOB_SUCCEEDED_EXTRA_DIMENSIONS_KEYS_JOBPROP,
"java_version,tag");
+ jobProps.setProperty("java_version", "11");
+ jobProps.setProperty("tag", "foo");
+
+ Map<String, String> gteEventMetadata = Maps.newHashMap();
+ gteEventMetadata.put(TimingEvent.FlowEventConstants.FLOW_GROUP_FIELD,
flowGroup);
+ gteEventMetadata.put(TimingEvent.FlowEventConstants.FLOW_NAME_FIELD,
flowName);
+
gteEventMetadata.put(TimingEvent.FlowEventConstants.FLOW_EXECUTION_ID_FIELD,
"1");
+ gteEventMetadata.put(TimingEvent.FlowEventConstants.JOB_NAME_FIELD,
jobName);
+ gteEventMetadata.put(TimingEvent.FlowEventConstants.JOB_GROUP_FIELD,
flowName);
+ gteEventMetadata.put(TimingEvent.FlowEventConstants.FLOW_EDGE_FIELD,
"flowEdge");
+ gteEventMetadata.put(TimingEvent.FlowEventConstants.SPEC_EXECUTOR_FIELD,
"specExecutor");
+ gteEventMetadata.put(JobStatusRetriever.EVENT_NAME_FIELD,
ExecutionStatus.COMPLETE.name());
+ gteEventMetadata.put(JobExecutionPlan.JOB_PROPS_KEY,
PropertiesUtils.serialize(jobProps));
+
+ Properties jobStatusProps = new Properties();
+ jobStatusProps.putAll(gteEventMetadata);
+ producer.emitObservabilityEvent(new State(jobStatusProps));
+
+ Collection<MetricData> metrics =
producer.getOpentelemetryMetrics().metricReader.collectAllMetrics();
+ Map<String, MetricData> metricsByName =
+ metrics.stream().collect(Collectors.toMap(MetricData::getName,
metricData -> metricData));
+ MetricData jobStatusMetric = metricsByName.get("jobSucceeded");
+ List<LongPointData> datapoints =
jobStatusMetric.getLongGaugeData().getPoints().stream().collect(Collectors.toList());
+ Assert.assertEquals(datapoints.size(), 1);
+ Map<AttributeKey<?>, Object> attrs =
datapoints.get(0).getAttributes().asMap();
+
+ Assert.assertEquals(attrs.get(AttributeKey.stringKey("java_version")),
"11");
+ Assert.assertEquals(attrs.get(AttributeKey.stringKey("tag")), "foo");
+ }
+ }
+
+ @Test
+ public void
testMockProduceMetrics_maxDimensionsExceededFallsBackToBaseline() throws
Exception {
+ String flowGroup = "testFlowGroupMaxDimsBaselineOnly";
+ String flowName = "testFlowNameMaxDimsBaselineOnly";
+ String jobName = String.format("%s_%s_%s", flowGroup, flowName,
"testJobNameMaxDimsBaselineOnly");
+
+ State producerState = new State();
+
producerState.setProp(ConfigurationKeys.METRICS_REPORTING_OPENTELEMETRY_ENABLED,
"true");
+ // Effective map would be baseline(6) + configured(1) => 7, but cap at 6
=> baseline only.
+
producerState.setProp(GaaSJobObservabilityEventProducer.JOB_SUCCEEDED_MAX_DIMENSIONS_KEY,
"6");
+
producerState.setProp(GaaSJobObservabilityEventProducer.JOB_SUCCEEDED_DIMENSIONS_MAP_KEY,
"{\"status\":\"jobStatus\"}");
+
+ try (MockGaaSJobObservabilityEventProducer producer =
+ new MockGaaSJobObservabilityEventProducer(producerState,
this.issueRepository, true)) {
+ Map<String, String> gteEventMetadata = Maps.newHashMap();
+ gteEventMetadata.put(TimingEvent.FlowEventConstants.FLOW_GROUP_FIELD,
flowGroup);
+ gteEventMetadata.put(TimingEvent.FlowEventConstants.FLOW_NAME_FIELD,
flowName);
+
gteEventMetadata.put(TimingEvent.FlowEventConstants.FLOW_EXECUTION_ID_FIELD,
"1");
+ gteEventMetadata.put(TimingEvent.FlowEventConstants.JOB_NAME_FIELD,
jobName);
+ gteEventMetadata.put(TimingEvent.FlowEventConstants.JOB_GROUP_FIELD,
flowName);
+ gteEventMetadata.put(TimingEvent.FlowEventConstants.FLOW_EDGE_FIELD,
"flowEdge");
+ gteEventMetadata.put(TimingEvent.FlowEventConstants.SPEC_EXECUTOR_FIELD,
"specExecutor");
+ gteEventMetadata.put(JobStatusRetriever.EVENT_NAME_FIELD,
ExecutionStatus.COMPLETE.name());
+
+ Properties jobStatusProps = new Properties();
+ jobStatusProps.putAll(gteEventMetadata);
+ producer.emitObservabilityEvent(new State(jobStatusProps));
+
+ Collection<MetricData> metrics =
producer.getOpentelemetryMetrics().metricReader.collectAllMetrics();
+ Map<String, MetricData> metricsByName =
+ metrics.stream().collect(Collectors.toMap(MetricData::getName,
metricData -> metricData));
+ MetricData jobStatusMetric = metricsByName.get("jobSucceeded");
+ List<LongPointData> datapoints =
jobStatusMetric.getLongGaugeData().getPoints().stream().collect(Collectors.toList());
+ Assert.assertEquals(datapoints.size(), 1);
+ Map<AttributeKey<?>, Object> attrs =
datapoints.get(0).getAttributes().asMap();
+
+ Assert.assertNull(attrs.get(AttributeKey.stringKey("status")));
+ Assert.assertEquals(attrs.get(AttributeKey.stringKey("flowName")),
flowName);
+ }
+ }
+
+ @Test
+ public void testGetConfiguredJobSucceededDimensionsMap_validJson() {
+ State state = new State();
+
state.setProp(GaaSJobObservabilityEventProducer.JOB_SUCCEEDED_DIMENSIONS_MAP_KEY,
"{\"status\":\"jobStatus\",\"executor\":\"executorId\"}");
+
+ Map<String, String> result =
GaaSJobObservabilityEventProducer.getConfiguredJobSucceededDimensionsMap(state);
+
+ Assert.assertNotNull(result);
+ Assert.assertEquals(result.size(), 2);
+ Assert.assertEquals(result.get("status"), "jobStatus");
+ Assert.assertEquals(result.get("executor"), "executorId");
+ }
+
+ @Test
+ public void testGetConfiguredJobSucceededDimensionsMap_blankConfig() {
+ State state = new State();
+
state.setProp(GaaSJobObservabilityEventProducer.JOB_SUCCEEDED_DIMENSIONS_MAP_KEY,
"");
+
+ Map<String, String> result =
GaaSJobObservabilityEventProducer.getConfiguredJobSucceededDimensionsMap(state);
+
+ Assert.assertNull(result);
+ }
+
+ @Test
+ public void testGetConfiguredJobSucceededDimensionsMap_invalidJson() {
+ State state = new State();
+ // Invalid JSON - malformed content (missing closing brace)
+
state.setProp(GaaSJobObservabilityEventProducer.JOB_SUCCEEDED_DIMENSIONS_MAP_KEY,
"{a:b,c:d");
+
+ Map<String, String> result =
GaaSJobObservabilityEventProducer.getConfiguredJobSucceededDimensionsMap(state);
+
+ // Should return null and log warning on parse failure
+ Assert.assertNull(result);
}
private Issue createTestIssue(String summary, String code, IssueSeverity
severity) {