[
https://issues.apache.org/jira/browse/GOBBLIN-1764?focusedWorklogId=839166&page=com.atlassian.jira.plugin.system.issuetabpanels:worklog-tabpanel#worklog-839166
]
ASF GitHub Bot logged work on GOBBLIN-1764:
-------------------------------------------
Author: ASF GitHub Bot
Created on: 13/Jan/23 22:36
Start Date: 13/Jan/23 22:36
Worklog Time Spent: 10m
Work Description: phet commented on code in PR #3623:
URL: https://github.com/apache/gobblin/pull/3623#discussion_r1070071056
##########
gobblin-modules/gobblin-kafka-09/src/test/java/org/apache/gobblin/runtime/KafkaAvroJobStatusMonitorTest.java:
##########
@@ -89,6 +97,7 @@ public class KafkaAvroJobStatusMonitorTest {
private String stateStoreDir = "/tmp/jobStatusMonitor/statestore";
private MetricContext context;
private KafkaAvroEventKeyValueReporter.Builder<?> builder;
+ private static Queue<GaaSObservabilityEventExperimental> queue = new
LinkedList<>();
Review Comment:
took me a minute to see how this gets wired in so that the events arrive on
it... now I found the static inner class. this strikes me as brittle, esp.
with `queue` being `static` and shared between all instances. still I
understand the difficulty you face when the system under test internally
instantiates its own `MockGaaSObservabilityEventProducer` from a class name.
hence, I'd accept those downsides, and just concentrate on readability.
either add a clearer comment up here saying it will be used in this
unconventional way, for the benefit of testing. or my own pref might be to
create a small singleton, known to both this test class and the mock, which
documents the arrangement, while encapsulating the queue. this could even be
used by other test classes that should produce observability events.
e.g.:
```
/** why this odd arrangement ... */
static class QueueSharingSingleton {
private Queue<GaaSObservabilityEventExperimental> queue = new
LinkedList<>();
private static INSTANCE = null;
public static QSS getInstance() { ... }
private QueueSharingSingleton {}
public void putMessage(GaaSObservabilityEventExperimental msg) {
queue.add(msg); }
public List<GaaSObservabilityEventExperimental> peekAllMessages() { ... }
}
```
then initialize the above as:
```
private QueueSharingSingleton queueSharing =
QueueSharingSingleton.getInstance();
```
(and do the same in the mock)
##########
gobblin-metrics-libs/gobblin-metrics-base/src/main/avro/GaaSObservabilityEventExperimental.avsc:
##########
@@ -70,14 +70,20 @@
},
{
"name": "jobStartTime",
- "type": "long",
- "doc": "Start time of the job in millis since Epoch",
+ "type": [
+ "null",
+ "long"
+ ],
+ "doc": "Start time of the job in millis since Epoch, null if the job was
never ran",
Review Comment:
minor, grammar: "was run", not "was ran"
(a second time below too)
##########
gobblin-modules/gobblin-kafka-09/src/test/java/org/apache/gobblin/runtime/KafkaAvroJobStatusMonitorTest.java:
##########
@@ -618,8 +670,20 @@ public org.apache.gobblin.configuration.State
parseJobStatus(GobblinTrackingEven
int n = ++numFakeExceptionsFromParseJobStatus;
throw new RuntimeException(String.format("BOOM! Failure [%d] w/ event
at %d", n, event.getTimestamp()));
} else {
+
return super.parseJobStatus(event);
}
}
}
+
+ public static class MockGaaSObservabilityEventProducer extends
GaaSObservabilityEventProducer {
Review Comment:
javadoc to clarify for future maintainers
##########
gobblin-modules/gobblin-kafka-09/src/test/java/org/apache/gobblin/runtime/KafkaAvroJobStatusMonitorTest.java:
##########
@@ -447,6 +456,45 @@ public void testProcessMessageForCancelledAndKilledEvent()
throws IOException, R
jobStatusMonitor.shutDown();
}
+ @Test (dependsOnMethods = "testProcessMessageForCancelledAndKilledEvent")
+ public void testJobMonitorCreatesGaaSObservabilityEvent() throws
IOException, ReflectiveOperationException {
+ KafkaEventReporter kafkaReporter = builder.build("localhost:0000",
"topic4");
+
+ //Submit GobblinTrackingEvents to Kafka
+ ImmutableList.of(
+ createFlowCompiledEvent(),
+ createFlowSucceededEvent()
+ ).forEach(event -> {
+ context.submitEvent(event);
+ kafkaReporter.report();
+ });
+ try {
+ Thread.sleep(1000);
+ } catch (InterruptedException ex) {
+ Thread.currentThread().interrupt();
+ }
+ Config config = ConfigFactory.empty()
+
.withValue(GaaSObservabilityEventProducer.GAAS_OBSERVABILITY_EVENT_ENABLED,
ConfigValueFactory.fromAnyRef(true))
+
.withValue(GaaSObservabilityEventProducer.GAAS_OBSERVABILITY_EVENT_PRODUCER_CLASS,
ConfigValueFactory.fromAnyRef(MockGaaSObservabilityEventProducer.class.getName()));
+ MockKafkaAvroJobStatusMonitor jobStatusMonitor =
createMockKafkaAvroJobStatusMonitor(new AtomicBoolean(false), config);
+ jobStatusMonitor.buildMetricsContextAndMetrics();
+ Iterator<DecodeableKafkaRecord> recordIterator = Iterators.transform(
+ this.kafkaTestHelper.getIteratorForTopic(TOPIC),
+ this::convertMessageAndMetadataToDecodableKafkaRecord);
+
+ State state = getNextJobStatusState(jobStatusMonitor, recordIterator,
"NA", "NA");
+ Assert.assertEquals(state.getProp(JobStatusRetriever.EVENT_NAME_FIELD),
ExecutionStatus.COMPILED.name());
+
+ state = getNextJobStatusState(jobStatusMonitor, recordIterator,
this.jobGroup, this.jobName);
+ Assert.assertEquals(state.getProp(JobStatusRetriever.EVENT_NAME_FIELD),
ExecutionStatus.COMPLETE.name());
+ // Only the COMPLETE event should create a GaaSObservabilityEvent
+ Assert.assertEquals(queue.size(), 1);
+ GaaSObservabilityEventExperimental event = queue.poll();
+ Assert.assertEquals(event.getJobStatus(), JobStatus.SUCCEEDED);
Review Comment:
would be better to verify more of the event--probably in a helper
method--it's flow group+name, job name, etc.
##########
gobblin-service/src/main/java/org/apache/gobblin/service/monitoring/KafkaAvroJobStatusMonitor.java:
##########
@@ -65,9 +65,9 @@ public class KafkaAvroJobStatusMonitor extends
KafkaJobStatusMonitor {
private Meter messageParseFailures;
public KafkaAvroJobStatusMonitor(String topic, Config config, int numThreads,
- JobIssueEventHandler jobIssueEventHandler)
+ JobIssueEventHandler jobIssueEventHandler, boolean
instrumentationEnabled)
Review Comment:
instrumentation is vague, but even there suggests something different, such
as us measuring how fast the class operates as it handles messages. why not
call it what it is--`shouldEmitObervabilityEvents`?
##########
gobblin-service/src/main/java/org/apache/gobblin/service/monitoring/KafkaJobStatusMonitor.java:
##########
@@ -288,6 +303,15 @@ private static void
modifyStateIfRetryRequired(org.apache.gobblin.configuration.
state.removeProp(TimingEvent.FlowEventConstants.DOES_CANCELED_FLOW_MERIT_RETRY);
}
+ private static boolean
isStateTransitionToFinal(org.apache.gobblin.configuration.State currentState,
List<org.apache.gobblin.configuration.State> prevStates) {
+ Set<String> finalStates = ImmutableSet.of(ExecutionStatus.COMPLETE.name(),
ExecutionStatus.CANCELLED.name(), ExecutionStatus.FAILED.name());
Review Comment:
may be better as a class `static`
##########
gobblin-service/src/main/java/org/apache/gobblin/service/monitoring/KafkaJobStatusMonitor.java:
##########
@@ -136,6 +140,12 @@ public <V> void onRetry(Attempt<V> attempt) {
}
}
}));
+
+ if (instrumentationEnabled && ConfigUtils.getBoolean(config,
GaaSObservabilityEventProducer.GAAS_OBSERVABILITY_EVENT_ENABLED, false)) {
+ this.eventProducer =
Optional.of(GaaSObservabilityEventProducer.getEventProducer(config,
jobIssueEventHandler.getIssueRepository()));
Review Comment:
slightly unconventional to construct within from a class name, rather than
to make the event producer a ctor param. using reflection internally makes the
tests more challenging to write, as it appears you've found. of course,
there's a place for reflection--just at a higher level than this class.
I do see how you need the issue repo from the issue handler to create the
observability event producer. how about whereever that handler gets created
and injected, that's where you'd inject an optional event producer?
##########
gobblin-service/src/main/java/org/apache/gobblin/service/monitoring/GaaSObservabilityEventProducer.java:
##########
@@ -0,0 +1,149 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.gobblin.service.monitoring;
+
+import java.io.Closeable;
+import java.io.IOException;
+import java.util.List;
+import java.util.stream.Collectors;
+
+import com.codahale.metrics.MetricRegistry;
+import com.typesafe.config.Config;
+
+import lombok.extern.slf4j.Slf4j;
+
+import org.apache.gobblin.configuration.State;
+import org.apache.gobblin.instrumented.Instrumented;
+import org.apache.gobblin.metrics.ContextAwareMeter;
+import org.apache.gobblin.metrics.GaaSObservabilityEventExperimental;
+import org.apache.gobblin.metrics.Issue;
+import org.apache.gobblin.metrics.IssueSeverity;
+import org.apache.gobblin.metrics.JobStatus;
+import org.apache.gobblin.metrics.MetricContext;
+import org.apache.gobblin.metrics.ServiceMetricNames;
+import org.apache.gobblin.metrics.event.TimingEvent;
+import org.apache.gobblin.runtime.troubleshooter.MultiContextIssueRepository;
+import org.apache.gobblin.runtime.troubleshooter.TroubleshooterUtils;
+import org.apache.gobblin.service.ExecutionStatus;
+import org.apache.gobblin.util.ConfigUtils;
+import org.apache.gobblin.util.reflection.GobblinConstructorUtils;
+
+
+
+/**
+ * A class embedded within GaaS running in the JobStatusMonitor which emits
GaaSObservabilityEvents after each job in a flow
+ * This is an abstract class, we need a sub system like Kakfa, which support
at least once delivery, to emit the event
+ */
+@Slf4j
+public abstract class GaaSObservabilityEventProducer implements Closeable {
+ public static final String GAAS_OBSERVABILITY_EVENT_PRODUCER_PREFIX =
"GaaSObservabilityEventProducer.";
+ public static final String GAAS_OBSERVABILITY_EVENT_ENABLED =
GAAS_OBSERVABILITY_EVENT_PRODUCER_PREFIX + "enabled";
+ public static final String GAAS_OBSERVABILITY_EVENT_PRODUCER_CLASS =
GAAS_OBSERVABILITY_EVENT_PRODUCER_PREFIX + "class.name";
+ public static final String ISSUE_READ_ERROR_COUNT =
"GaaSObservability.producer.getIssuesFailedCount";
+
+ protected MetricContext metricContext;
+ protected State state;
+ protected MultiContextIssueRepository issueRepository;
+ ContextAwareMeter getIssuesFailedMeter;
+
+ public GaaSObservabilityEventProducer(State state,
MultiContextIssueRepository issueRepository) {
+ this.metricContext = Instrumented.getMetricContext(state, getClass());
+ getIssuesFailedMeter =
this.metricContext.contextAwareMeter(MetricRegistry.name(ServiceMetricNames.GOBBLIN_SERVICE_PREFIX,
+ ISSUE_READ_ERROR_COUNT));
+ this.state = state;
+ this.issueRepository = issueRepository;
+ }
+
+ public void emitObservabilityEvent(State jobState) {
+ GaaSObservabilityEventExperimental event =
createGaaSObservabilityEvent(jobState);
+ sendUnderlyingEvent(event);
+ }
+
+ /**
+ * Emits the GaaSObservabilityEvent with the mechanism that the child class
is built upon e.g. Kafka
+ * @param event
+ */
+ abstract protected void
sendUnderlyingEvent(GaaSObservabilityEventExperimental event);
+
+ /**
+ * Creates a GaaSObservabilityEvent which is derived from a final GaaS job
pipeline state, which is combination of GTE job states in an ordered fashion
+ * @param jobState
+ * @return GaaSObservabilityEvent
+ */
+ private GaaSObservabilityEventExperimental
createGaaSObservabilityEvent(State jobState) {
+ Long jobStartTime = jobState.contains(TimingEvent.JOB_START_TIME) ?
jobState.getPropAsLong(TimingEvent.JOB_START_TIME) : null;
+ Long jobEndTime = jobState.contains(TimingEvent.JOB_END_TIME) ?
jobState.getPropAsLong(TimingEvent.JOB_END_TIME) : null;
+ GaaSObservabilityEventExperimental.Builder builder =
GaaSObservabilityEventExperimental.newBuilder();
+ List<Issue> issueList = null;
+ try {
+ issueList =
issueRepository.getAll(TroubleshooterUtils.getContextIdForJob(jobState.getProperties())).stream().map(
+ issue -> new
org.apache.gobblin.metrics.Issue(issue.getTime().toEpochSecond(),
+ IssueSeverity.valueOf(issue.getSeverity().toString()),
issue.getCode(), issue.getSummary(), issue.getDetails(),
issue.getProperties())).collect(Collectors.toList());
Review Comment:
seems ripe for abstracting entirely within a conversion function
##########
gobblin-service/src/main/java/org/apache/gobblin/service/monitoring/GaaSObservabilityEventProducer.java:
##########
@@ -0,0 +1,149 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.gobblin.service.monitoring;
+
+import java.io.Closeable;
+import java.io.IOException;
+import java.util.List;
+import java.util.stream.Collectors;
+
+import com.codahale.metrics.MetricRegistry;
+import com.typesafe.config.Config;
+
+import lombok.extern.slf4j.Slf4j;
+
+import org.apache.gobblin.configuration.State;
+import org.apache.gobblin.instrumented.Instrumented;
+import org.apache.gobblin.metrics.ContextAwareMeter;
+import org.apache.gobblin.metrics.GaaSObservabilityEventExperimental;
+import org.apache.gobblin.metrics.Issue;
+import org.apache.gobblin.metrics.IssueSeverity;
+import org.apache.gobblin.metrics.JobStatus;
+import org.apache.gobblin.metrics.MetricContext;
+import org.apache.gobblin.metrics.ServiceMetricNames;
+import org.apache.gobblin.metrics.event.TimingEvent;
+import org.apache.gobblin.runtime.troubleshooter.MultiContextIssueRepository;
+import org.apache.gobblin.runtime.troubleshooter.TroubleshooterUtils;
+import org.apache.gobblin.service.ExecutionStatus;
+import org.apache.gobblin.util.ConfigUtils;
+import org.apache.gobblin.util.reflection.GobblinConstructorUtils;
+
+
+
+/**
+ * A class embedded within GaaS running in the JobStatusMonitor which emits
GaaSObservabilityEvents after each job in a flow
+ * This is an abstract class, we need a sub system like Kakfa, which support
at least once delivery, to emit the event
+ */
+@Slf4j
+public abstract class GaaSObservabilityEventProducer implements Closeable {
+ public static final String GAAS_OBSERVABILITY_EVENT_PRODUCER_PREFIX =
"GaaSObservabilityEventProducer.";
+ public static final String GAAS_OBSERVABILITY_EVENT_ENABLED =
GAAS_OBSERVABILITY_EVENT_PRODUCER_PREFIX + "enabled";
+ public static final String GAAS_OBSERVABILITY_EVENT_PRODUCER_CLASS =
GAAS_OBSERVABILITY_EVENT_PRODUCER_PREFIX + "class.name";
Review Comment:
nit: I'd suffix these with `_KEY`, since they're not say a class, but a key
for specifying the class
##########
gobblin-runtime/src/main/java/org/apache/gobblin/runtime/troubleshooter/JobIssueEventHandler.java:
##########
@@ -50,6 +51,7 @@ public class JobIssueEventHandler {
private static final Logger issueLogger =
LoggerFactory.getLogger("org.apache.gobblin.runtime.troubleshooter.JobIssueLogger");
+ @Getter
Review Comment:
any expectations to document here, such as only read-only use... or doesn't
matter?
##########
gobblin-service/src/main/java/org/apache/gobblin/service/monitoring/GaaSObservabilityEventProducer.java:
##########
@@ -0,0 +1,149 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.gobblin.service.monitoring;
+
+import java.io.Closeable;
+import java.io.IOException;
+import java.util.List;
+import java.util.stream.Collectors;
+
+import com.codahale.metrics.MetricRegistry;
+import com.typesafe.config.Config;
+
+import lombok.extern.slf4j.Slf4j;
+
+import org.apache.gobblin.configuration.State;
+import org.apache.gobblin.instrumented.Instrumented;
+import org.apache.gobblin.metrics.ContextAwareMeter;
+import org.apache.gobblin.metrics.GaaSObservabilityEventExperimental;
+import org.apache.gobblin.metrics.Issue;
+import org.apache.gobblin.metrics.IssueSeverity;
+import org.apache.gobblin.metrics.JobStatus;
+import org.apache.gobblin.metrics.MetricContext;
+import org.apache.gobblin.metrics.ServiceMetricNames;
+import org.apache.gobblin.metrics.event.TimingEvent;
+import org.apache.gobblin.runtime.troubleshooter.MultiContextIssueRepository;
+import org.apache.gobblin.runtime.troubleshooter.TroubleshooterUtils;
+import org.apache.gobblin.service.ExecutionStatus;
+import org.apache.gobblin.util.ConfigUtils;
+import org.apache.gobblin.util.reflection.GobblinConstructorUtils;
+
+
+
+/**
+ * A class embedded within GaaS running in the JobStatusMonitor which emits
GaaSObservabilityEvents after each job in a flow
+ * This is an abstract class, we need a sub system like Kakfa, which support
at least once delivery, to emit the event
+ */
+@Slf4j
+public abstract class GaaSObservabilityEventProducer implements Closeable {
+ public static final String GAAS_OBSERVABILITY_EVENT_PRODUCER_PREFIX =
"GaaSObservabilityEventProducer.";
+ public static final String GAAS_OBSERVABILITY_EVENT_ENABLED =
GAAS_OBSERVABILITY_EVENT_PRODUCER_PREFIX + "enabled";
+ public static final String GAAS_OBSERVABILITY_EVENT_PRODUCER_CLASS =
GAAS_OBSERVABILITY_EVENT_PRODUCER_PREFIX + "class.name";
+ public static final String ISSUE_READ_ERROR_COUNT =
"GaaSObservability.producer.getIssuesFailedCount";
Review Comment:
why not the same prefix... I see later it's not a config key, but even so,
why deviate?
as far as naming this var, shall we say something about `METRIC_NAME` or
`METRIC_KEY`?
##########
gobblin-service/src/main/java/org/apache/gobblin/service/monitoring/GaaSObservabilityEventProducer.java:
##########
@@ -0,0 +1,149 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.gobblin.service.monitoring;
+
+import java.io.Closeable;
+import java.io.IOException;
+import java.util.List;
+import java.util.stream.Collectors;
+
+import com.codahale.metrics.MetricRegistry;
+import com.typesafe.config.Config;
+
+import lombok.extern.slf4j.Slf4j;
+
+import org.apache.gobblin.configuration.State;
+import org.apache.gobblin.instrumented.Instrumented;
+import org.apache.gobblin.metrics.ContextAwareMeter;
+import org.apache.gobblin.metrics.GaaSObservabilityEventExperimental;
+import org.apache.gobblin.metrics.Issue;
+import org.apache.gobblin.metrics.IssueSeverity;
+import org.apache.gobblin.metrics.JobStatus;
+import org.apache.gobblin.metrics.MetricContext;
+import org.apache.gobblin.metrics.ServiceMetricNames;
+import org.apache.gobblin.metrics.event.TimingEvent;
+import org.apache.gobblin.runtime.troubleshooter.MultiContextIssueRepository;
+import org.apache.gobblin.runtime.troubleshooter.TroubleshooterUtils;
+import org.apache.gobblin.service.ExecutionStatus;
+import org.apache.gobblin.util.ConfigUtils;
+import org.apache.gobblin.util.reflection.GobblinConstructorUtils;
+
+
+
+/**
+ * A class embedded within GaaS running in the JobStatusMonitor which emits
GaaSObservabilityEvents after each job in a flow
+ * This is an abstract class, we need a sub system like Kakfa, which support
at least once delivery, to emit the event
+ */
+@Slf4j
+public abstract class GaaSObservabilityEventProducer implements Closeable {
+ public static final String GAAS_OBSERVABILITY_EVENT_PRODUCER_PREFIX =
"GaaSObservabilityEventProducer.";
+ public static final String GAAS_OBSERVABILITY_EVENT_ENABLED =
GAAS_OBSERVABILITY_EVENT_PRODUCER_PREFIX + "enabled";
+ public static final String GAAS_OBSERVABILITY_EVENT_PRODUCER_CLASS =
GAAS_OBSERVABILITY_EVENT_PRODUCER_PREFIX + "class.name";
+ public static final String ISSUE_READ_ERROR_COUNT =
"GaaSObservability.producer.getIssuesFailedCount";
+
+ protected MetricContext metricContext;
+ protected State state;
+ protected MultiContextIssueRepository issueRepository;
+ ContextAwareMeter getIssuesFailedMeter;
Review Comment:
if visible for testing, let's add an annotation
##########
gobblin-service/src/test/java/org/apache/gobblin/service/monitoring/GaaSObservabilityProducerTest.java:
##########
@@ -0,0 +1,100 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.gobblin.service.monitoring;
+
+import java.time.ZonedDateTime;
+import java.util.LinkedList;
+import java.util.Map;
+import java.util.Properties;
+import java.util.Queue;
+
+import org.junit.Test;
+import org.testng.Assert;
+
+import com.google.common.collect.Maps;
+
+import org.apache.gobblin.configuration.State;
+import org.apache.gobblin.metrics.GaaSObservabilityEventExperimental;
+import org.apache.gobblin.metrics.JobStatus;
+import org.apache.gobblin.metrics.event.TimingEvent;
+import
org.apache.gobblin.runtime.troubleshooter.InMemoryMultiContextIssueRepository;
+import org.apache.gobblin.runtime.troubleshooter.Issue;
+import org.apache.gobblin.runtime.troubleshooter.IssueSeverity;
+import org.apache.gobblin.runtime.troubleshooter.MultiContextIssueRepository;
+import org.apache.gobblin.runtime.troubleshooter.TroubleshooterUtils;
+import org.apache.gobblin.service.ExecutionStatus;
+
+
+public class GaaSObservabilityProducerTest {
+
+ private MultiContextIssueRepository issueRepository = new
InMemoryMultiContextIssueRepository();
+ Queue<GaaSObservabilityEventExperimental> emittedEvents = new LinkedList<>();
+
+ @Test
+ public void testCreateGaaSObservabilityEvent() throws Exception {
+ String flowGroup = "testFlowGroup1";
+ String flowName = "testFlowName1";
+ String jobName = String.format("%s_%s_%s", flowGroup, flowName,
"testJobName1");
+ String flowExecutionId = "1";
+ this.issueRepository.put(
+ TroubleshooterUtils.getContextIdForJob(flowGroup, flowName,
flowExecutionId, jobName),
+ createTestIssue("issueSummary", "issueCode", IssueSeverity.INFO)
+ );
+ GaaSObservabilityEventProducer producer = new
MockGaaSObservabilityProducer(new State(), this.issueRepository);
+ Map<String, String> gteEventMetadata = Maps.newHashMap();
+ gteEventMetadata.put(TimingEvent.FlowEventConstants.FLOW_GROUP_FIELD,
flowGroup);
+ gteEventMetadata.put(TimingEvent.FlowEventConstants.FLOW_NAME_FIELD,
flowName);
+
gteEventMetadata.put(TimingEvent.FlowEventConstants.FLOW_EXECUTION_ID_FIELD,
"1");
+ gteEventMetadata.put(TimingEvent.FlowEventConstants.JOB_NAME_FIELD,
jobName);
+ gteEventMetadata.put(TimingEvent.FlowEventConstants.JOB_GROUP_FIELD,
flowName);
+ gteEventMetadata.put(TimingEvent.METADATA_MESSAGE, "hostName");
+ gteEventMetadata.put(TimingEvent.METADATA_START_TIME, "1");
+ gteEventMetadata.put(TimingEvent.METADATA_END_TIME, "100");
+ gteEventMetadata.put(JobStatusRetriever.EVENT_NAME_FIELD,
ExecutionStatus.COMPLETE.name());
+
+ Properties jobStatusProps = new Properties();
+ jobStatusProps.putAll(gteEventMetadata);
+ producer.emitObservabilityEvent(new State(jobStatusProps));
+
+ Assert.assertEquals(emittedEvents.size(), 1);
+ GaaSObservabilityEventExperimental event = emittedEvents.poll();
+ Assert.assertEquals(event.getFlowGroup(), flowGroup);
+ Assert.assertEquals(event.getFlowName(), flowName);
+ Assert.assertEquals(event.getJobName(), jobName);
+ Assert.assertEquals(event.getFlowExecutionId(),
Long.valueOf(flowExecutionId));
+ Assert.assertEquals(event.getJobStatus(), JobStatus.SUCCEEDED);
+ Assert.assertEquals(event.getExecutorUrl(), "hostName");
+ Assert.assertEquals(event.getIssues().size(), 1);
+ }
+
+ private Issue createTestIssue(String summary, String code, IssueSeverity
severity) {
+ return
Issue.builder().summary(summary).code(code).time(ZonedDateTime.now()).severity(severity).build();
+ }
+
+
+ public class MockGaaSObservabilityProducer extends
GaaSObservabilityEventProducer {
+ public MockGaaSObservabilityProducer(State state,
MultiContextIssueRepository issueRepository) {
+ super(state, issueRepository);
+ }
+ // Send the events to the class test queue, so tests should not run
concurrently
+ @Override
+ protected void sendUnderlyingEvent(GaaSObservabilityEventExperimental
event) {
+ emittedEvents.add(event);
Review Comment:
unlike the other test, the queue is non-`static`... so does it support
concurrent test exec?
##########
gobblin-service/src/main/java/org/apache/gobblin/service/monitoring/KafkaJobStatusMonitor.java:
##########
@@ -288,6 +303,15 @@ private static void
modifyStateIfRetryRequired(org.apache.gobblin.configuration.
state.removeProp(TimingEvent.FlowEventConstants.DOES_CANCELED_FLOW_MERIT_RETRY);
}
+ private static boolean
isStateTransitionToFinal(org.apache.gobblin.configuration.State currentState,
List<org.apache.gobblin.configuration.State> prevStates) {
+ Set<String> finalStates = ImmutableSet.of(ExecutionStatus.COMPLETE.name(),
ExecutionStatus.CANCELLED.name(), ExecutionStatus.FAILED.name());
+ if (prevStates.size() == 0) {
+ return
finalStates.contains(currentState.getProp(JobStatusRetriever.EVENT_NAME_FIELD));
+ }
+ return currentState.contains(JobStatusRetriever.EVENT_NAME_FIELD) &&
finalStates.contains(currentState.getProp(JobStatusRetriever.EVENT_NAME_FIELD))
+ &&
!finalStates.contains(prevStates.get(prevStates.size()-1).getProp(JobStatusRetriever.EVENT_NAME_FIELD));
Review Comment:
thought marginally more processing, it more clearly captures the intent and
handles a corner case (if even possible) of the state flip flopping from final
to non-final back to final:
```
Stream<Boolean> statesFinality =
Stream.concat(prevStates.stream(),
Stream.of(currentState)).map(isFinalState);
return statesFinality.filter(identity).count() == 1 &&
Iterables.getLast(statesFinality) == true;
```
##########
gobblin-service/src/main/java/org/apache/gobblin/service/monitoring/KafkaJobStatusMonitor.java:
##########
@@ -288,6 +303,15 @@ private static void
modifyStateIfRetryRequired(org.apache.gobblin.configuration.
state.removeProp(TimingEvent.FlowEventConstants.DOES_CANCELED_FLOW_MERIT_RETRY);
}
+ private static boolean
isStateTransitionToFinal(org.apache.gobblin.configuration.State currentState,
List<org.apache.gobblin.configuration.State> prevStates) {
Review Comment:
nit: impl seems more like `isNewStateTransitionToFinal` or
`isFirstStateTransitionToFinal`
Issue Time Tracking
-------------------
Worklog Id: (was: 839166)
Time Spent: 1h 20m (was: 1h 10m)
> Emit GaaSObservabilityEvent
> ---------------------------
>
> Key: GOBBLIN-1764
> URL: https://issues.apache.org/jira/browse/GOBBLIN-1764
> Project: Apache Gobblin
> Issue Type: New Feature
> Components: gobblin-service
> Reporter: William Lo
> Assignee: Abhishek Tiwari
> Priority: Major
> Time Spent: 1h 20m
> Remaining Estimate: 0h
>
> GaaSObservabilityEvents are a new events that provides a job summary from
> pipelines in GaaS. It differs from GobblinTrackingEvents as it runs once per
> job pipeline, and it intended to be easily queryable and alert on.
> We want to emit this observability event from GaaS by deriving it from a
> job's job status. Since this feature is Experimental and WIP, it is not
> expected to fill out all of the fields immediately.
--
This message was sent by Atlassian Jira
(v8.20.10#820010)