[
https://issues.apache.org/jira/browse/GOBBLIN-1764?focusedWorklogId=840096&page=com.atlassian.jira.plugin.system.issuetabpanels:worklog-tabpanel#worklog-840096
]
ASF GitHub Bot logged work on GOBBLIN-1764:
-------------------------------------------
Author: ASF GitHub Bot
Created on: 18/Jan/23 21:46
Start Date: 18/Jan/23 21:46
Worklog Time Spent: 10m
Work Description: phet commented on code in PR #3623:
URL: https://github.com/apache/gobblin/pull/3623#discussion_r1080567642
##########
gobblin-modules/gobblin-kafka-09/src/test/java/org/apache/gobblin/runtime/KafkaAvroJobStatusMonitorTest.java:
##########
@@ -387,62 +460,95 @@ public void
testProcessProgressingMessageWhenNoPreviousStatus() throws IOExcepti
Assert.assertNull(state.getProp(JobStatusRetriever.EVENT_NAME_FIELD));
}
- @Test (dependsOnMethods =
"testProcessingRetriedForApparentlyTransientErrors")
- public void testProcessMessageForCancelledAndKilledEvent() throws
IOException, ReflectiveOperationException {
- KafkaEventReporter kafkaReporter = builder.build("localhost:0000",
"topic4");
+ @Test (dependsOnMethods =
"testProcessProgressingMessageWhenNoPreviousStatus")
+ public void testJobMonitorCreatesGaaSObservabilityEvent() throws
IOException, ReflectiveOperationException {
+ KafkaEventReporter kafkaReporter = builder.build("localhost:0000",
"topic6");
//Submit GobblinTrackingEvents to Kafka
ImmutableList.of(
createFlowCompiledEvent(),
- createJobOrchestratedEvent(1, 4),
- createJobSLAKilledEvent(),
- createJobOrchestratedEvent(2, 4),
- createJobStartSLAKilledEvent(),
- // Verify that kill event will not retry
- createJobOrchestratedEvent(3, 4),
- createJobCancelledEvent()
+ createJobSucceededEvent()
).forEach(event -> {
context.submitEvent(event);
kafkaReporter.report();
});
-
try {
Thread.sleep(1000);
- } catch(InterruptedException ex) {
+ } catch (InterruptedException ex) {
Thread.currentThread().interrupt();
}
-
- MockKafkaAvroJobStatusMonitor jobStatusMonitor =
createMockKafkaAvroJobStatusMonitor(new AtomicBoolean(false),
ConfigFactory.empty());
+ MultiContextIssueRepository issueRepository = new
InMemoryMultiContextIssueRepository();
+ MockGaaSObservabilityEventProducer mockEventProducer = new
MockGaaSObservabilityEventProducer(
+ ConfigUtils.configToState(ConfigFactory.empty()), issueRepository);
+ MockKafkaAvroJobStatusMonitor jobStatusMonitor =
createMockKafkaAvroJobStatusMonitor(new AtomicBoolean(false),
ConfigFactory.empty(),
+ mockEventProducer);
jobStatusMonitor.buildMetricsContextAndMetrics();
Iterator<DecodeableKafkaRecord> recordIterator = Iterators.transform(
- this.kafkaTestHelper.getIteratorForTopic(TOPIC),
- this::convertMessageAndMetadataToDecodableKafkaRecord);
+ this.kafkaTestHelper.getIteratorForTopic(TOPIC),
+ this::convertMessageAndMetadataToDecodableKafkaRecord);
State state = getNextJobStatusState(jobStatusMonitor, recordIterator,
"NA", "NA");
Assert.assertEquals(state.getProp(JobStatusRetriever.EVENT_NAME_FIELD),
ExecutionStatus.COMPILED.name());
state = getNextJobStatusState(jobStatusMonitor, recordIterator,
this.jobGroup, this.jobName);
- Assert.assertEquals(state.getProp(JobStatusRetriever.EVENT_NAME_FIELD),
ExecutionStatus.ORCHESTRATED.name());
+ Assert.assertEquals(state.getProp(JobStatusRetriever.EVENT_NAME_FIELD),
ExecutionStatus.COMPLETE.name());
- state = getNextJobStatusState(jobStatusMonitor, recordIterator,
this.jobGroup, this.jobName);
- Assert.assertEquals(state.getProp(JobStatusRetriever.EVENT_NAME_FIELD),
ExecutionStatus.PENDING_RETRY.name());
-
Assert.assertEquals(state.getProp(TimingEvent.FlowEventConstants.SHOULD_RETRY_FIELD),
Boolean.toString(true));
+ // Only the COMPLETE event should create a GaaSObservabilityEvent
+ List<GaaSObservabilityEventExperimental> emittedEvents =
mockEventProducer.getTestEmittedEvents();
+ Iterator<GaaSObservabilityEventExperimental> iterator =
emittedEvents.iterator();
+ GaaSObservabilityEventExperimental event1 = iterator.next();
+ Assert.assertEquals(event1.getJobStatus(), JobStatus.SUCCEEDED);
+ Assert.assertEquals(event1.getFlowName(), this.flowName);
+ Assert.assertEquals(event1.getFlowGroup(), this.flowGroup);
- state = getNextJobStatusState(jobStatusMonitor, recordIterator,
this.jobGroup, this.jobName);
- //Job orchestrated for retrying
- Assert.assertEquals(state.getProp(JobStatusRetriever.EVENT_NAME_FIELD),
ExecutionStatus.ORCHESTRATED.name());
+ jobStatusMonitor.shutDown();
+ }
- state = getNextJobStatusState(jobStatusMonitor, recordIterator,
this.jobGroup, this.jobName);
- Assert.assertEquals(state.getProp(JobStatusRetriever.EVENT_NAME_FIELD),
ExecutionStatus.PENDING_RETRY.name());
+ @Test (dependsOnMethods = "testJobMonitorCreatesGaaSObservabilityEvent")
+ public void testObservabilityEventSingleEmission() throws IOException,
ReflectiveOperationException {
+ KafkaEventReporter kafkaReporter = builder.build("localhost:0000",
"topic7");
+
+ //Submit GobblinTrackingEvents to Kafka
+ ImmutableList.of(
+ createFlowCompiledEvent(),
+ createJobCancelledEvent(),
+ createJobSucceededEvent() // This event should be ignored
+ ).forEach(event -> {
+ context.submitEvent(event);
+ kafkaReporter.report();
+ });
+ try {
+ Thread.sleep(1000);
+ } catch (InterruptedException ex) {
+ Thread.currentThread().interrupt();
+ }
+ MultiContextIssueRepository issueRepository = new
InMemoryMultiContextIssueRepository();
+ MockGaaSObservabilityEventProducer mockEventProducer = new
MockGaaSObservabilityEventProducer(
+ ConfigUtils.configToState(ConfigFactory.empty()), issueRepository);
+ MockKafkaAvroJobStatusMonitor jobStatusMonitor =
createMockKafkaAvroJobStatusMonitor(new AtomicBoolean(false),
ConfigFactory.empty(),
+ mockEventProducer);
+ jobStatusMonitor.buildMetricsContextAndMetrics();
+ Iterator<DecodeableKafkaRecord> recordIterator = Iterators.transform(
+ this.kafkaTestHelper.getIteratorForTopic(TOPIC),
+ this::convertMessageAndMetadataToDecodableKafkaRecord);
Review Comment:
perhaps not now... but definitely if you need to add further tests (as you
fill in details of the observability event) - let's explore whether this
repetitive boilerplate could live behind a reusable abstraction.
##########
gobblin-modules/gobblin-kafka-09/src/test/java/org/apache/gobblin/runtime/KafkaAvroJobStatusMonitorTest.java:
##########
@@ -358,6 +370,66 @@ public void
testProcessingRetriedForApparentlyTransientErrors() throws IOExcepti
jobStatusMonitor.shutDown();
}
+ @Test (dependsOnMethods =
"testProcessingRetriedForApparentlyTransientErrors")
+ public void testProcessMessageForCancelledAndKilledEvent() throws
IOException, ReflectiveOperationException {
Review Comment:
sorry, can't recall: was this in the prior revision or just introduced?
I'm not clear whether it belongs in this PR or is unrelated...
##########
gobblin-service/src/main/java/org/apache/gobblin/service/monitoring/GaaSObservabilityEventProducer.java:
##########
@@ -0,0 +1,161 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.gobblin.service.monitoring;
+
+import java.io.Closeable;
+import java.io.IOException;
+import java.util.List;
+import java.util.stream.Collectors;
+
+import com.codahale.metrics.MetricRegistry;
+
+import lombok.extern.slf4j.Slf4j;
+
+import org.apache.gobblin.configuration.State;
+import org.apache.gobblin.instrumented.Instrumented;
+import org.apache.gobblin.metrics.ContextAwareMeter;
+import org.apache.gobblin.metrics.GaaSObservabilityEventExperimental;
+import org.apache.gobblin.metrics.Issue;
+import org.apache.gobblin.metrics.IssueSeverity;
+import org.apache.gobblin.metrics.JobStatus;
+import org.apache.gobblin.metrics.MetricContext;
+import org.apache.gobblin.metrics.ServiceMetricNames;
+import org.apache.gobblin.metrics.event.TimingEvent;
+import org.apache.gobblin.runtime.troubleshooter.MultiContextIssueRepository;
+import org.apache.gobblin.runtime.troubleshooter.TroubleshooterException;
+import org.apache.gobblin.runtime.troubleshooter.TroubleshooterUtils;
+import org.apache.gobblin.service.ExecutionStatus;
+
+
+
+/**
+ * A class embedded within GaaS running in the JobStatusMonitor which emits
GaaSObservabilityEvents after each job in a flow
+ * This is an abstract class, we need a sub system like Kakfa, which support
at least once delivery, to emit the event
+ */
+@Slf4j
+public abstract class GaaSObservabilityEventProducer implements Closeable {
+ public static final String GAAS_OBSERVABILITY_EVENT_PRODUCER_PREFIX =
"GaaSObservabilityEventProducer.";
+ public static final String GAAS_OBSERVABILITY_EVENT_PRODUCER_CLASS_KEY =
GAAS_OBSERVABILITY_EVENT_PRODUCER_PREFIX + "class.name";
+ public static final String DEFAULT_GAAS_OBSERVABILITY_EVENT_PRODUCER_CLASS =
NoopGaaSObservabilityEventProducer.class.getName();
+ public static final String ISSUES_READ_FAILED_METRIC_NAME =
GAAS_OBSERVABILITY_EVENT_PRODUCER_PREFIX + "getIssuesFailedCount";
+
+ protected MetricContext metricContext;
+ protected State state;
+ protected MultiContextIssueRepository issueRepository;
+ boolean instrumentationEnabled;
+ ContextAwareMeter getIssuesFailedMeter;
+
+ public GaaSObservabilityEventProducer(State state,
MultiContextIssueRepository issueRepository, boolean instrumentationEnabled) {
+ this.state = state;
+ this.issueRepository = issueRepository;
+ this.instrumentationEnabled = instrumentationEnabled;
+ if (this.instrumentationEnabled) {
+ this.metricContext = Instrumented.getMetricContext(state, getClass());
+ this.getIssuesFailedMeter =
this.metricContext.contextAwareMeter(MetricRegistry.name(ServiceMetricNames.GOBBLIN_SERVICE_PREFIX,
+ ISSUES_READ_FAILED_METRIC_NAME));
+ }
+ }
+
+ public void emitObservabilityEvent(State jobState) {
Review Comment:
an argument cold be made for this to be `final`
update: below, I found out why you don't... perhaps document for maintainers?
##########
gobblin-service/src/main/java/org/apache/gobblin/service/monitoring/NoopGaaSObservabilityEventProducer.java:
##########
@@ -0,0 +1,40 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.gobblin.service.monitoring;
+
+import org.apache.gobblin.configuration.State;
+import org.apache.gobblin.metrics.GaaSObservabilityEventExperimental;
+import org.apache.gobblin.runtime.troubleshooter.MultiContextIssueRepository;
+
+
+public class NoopGaaSObservabilityEventProducer extends
GaaSObservabilityEventProducer {
Review Comment:
good use of the Null Pattern!
any javadoc?
##########
gobblin-service/src/test/java/org/apache/gobblin/service/monitoring/GaaSObservabilityProducerTest.java:
##########
@@ -0,0 +1,117 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.gobblin.service.monitoring;
+
+import java.time.ZonedDateTime;
+import java.util.ArrayList;
+import java.util.Collections;
+import java.util.Iterator;
+import java.util.List;
+import java.util.Map;
+import java.util.Properties;
+
+import org.junit.Test;
+import org.testng.Assert;
+
+import com.google.common.collect.Maps;
+
+import org.apache.gobblin.configuration.State;
+import org.apache.gobblin.metrics.GaaSObservabilityEventExperimental;
+import org.apache.gobblin.metrics.JobStatus;
+import org.apache.gobblin.metrics.event.TimingEvent;
+import
org.apache.gobblin.runtime.troubleshooter.InMemoryMultiContextIssueRepository;
+import org.apache.gobblin.runtime.troubleshooter.Issue;
+import org.apache.gobblin.runtime.troubleshooter.IssueSeverity;
+import org.apache.gobblin.runtime.troubleshooter.MultiContextIssueRepository;
+import org.apache.gobblin.runtime.troubleshooter.TroubleshooterUtils;
+import org.apache.gobblin.service.ExecutionStatus;
+
+
+public class GaaSObservabilityProducerTest {
+
+ private MultiContextIssueRepository issueRepository = new
InMemoryMultiContextIssueRepository();
+
+ @Test
+ public void testCreateGaaSObservabilityEvent() throws Exception {
+ String flowGroup = "testFlowGroup1";
+ String flowName = "testFlowName1";
+ String jobName = String.format("%s_%s_%s", flowGroup, flowName,
"testJobName1");
+ String flowExecutionId = "1";
+ this.issueRepository.put(
+ TroubleshooterUtils.getContextIdForJob(flowGroup, flowName,
flowExecutionId, jobName),
+ createTestIssue("issueSummary", "issueCode", IssueSeverity.INFO)
+ );
+ MockGaaSObservabilityProducer producer = new
MockGaaSObservabilityProducer(new State(), this.issueRepository);
+ Map<String, String> gteEventMetadata = Maps.newHashMap();
+ gteEventMetadata.put(TimingEvent.FlowEventConstants.FLOW_GROUP_FIELD,
flowGroup);
+ gteEventMetadata.put(TimingEvent.FlowEventConstants.FLOW_NAME_FIELD,
flowName);
+
gteEventMetadata.put(TimingEvent.FlowEventConstants.FLOW_EXECUTION_ID_FIELD,
"1");
+ gteEventMetadata.put(TimingEvent.FlowEventConstants.JOB_NAME_FIELD,
jobName);
+ gteEventMetadata.put(TimingEvent.FlowEventConstants.JOB_GROUP_FIELD,
flowName);
+ gteEventMetadata.put(TimingEvent.METADATA_MESSAGE, "hostName");
+ gteEventMetadata.put(TimingEvent.METADATA_START_TIME, "1");
+ gteEventMetadata.put(TimingEvent.METADATA_END_TIME, "100");
+ gteEventMetadata.put(JobStatusRetriever.EVENT_NAME_FIELD,
ExecutionStatus.COMPLETE.name());
+
+ Properties jobStatusProps = new Properties();
+ jobStatusProps.putAll(gteEventMetadata);
+ producer.emitObservabilityEvent(new State(jobStatusProps));
+
+ List<GaaSObservabilityEventExperimental> emittedEvents =
producer.getTestEmittedEvents();
+
+ Assert.assertEquals(emittedEvents.size(), 1);
+ Iterator<GaaSObservabilityEventExperimental> iterator =
emittedEvents.iterator();
+ GaaSObservabilityEventExperimental event = iterator.next();
+ Assert.assertEquals(event.getFlowGroup(), flowGroup);
+ Assert.assertEquals(event.getFlowName(), flowName);
+ Assert.assertEquals(event.getJobName(), jobName);
+ Assert.assertEquals(event.getFlowExecutionId(),
Long.valueOf(flowExecutionId));
+ Assert.assertEquals(event.getJobStatus(), JobStatus.SUCCEEDED);
+ Assert.assertEquals(event.getExecutorUrl(), "hostName");
+ Assert.assertEquals(event.getIssues().size(), 1);
+ }
+
+ private Issue createTestIssue(String summary, String code, IssueSeverity
severity) {
+ return
Issue.builder().summary(summary).code(code).time(ZonedDateTime.now()).severity(severity).build();
+ }
+
+
+ public class MockGaaSObservabilityProducer extends
GaaSObservabilityEventProducer {
Review Comment:
didn't I see this impl above? couldn't it be shared there (e.g. if you were
to make this `public static`)?
(or even put it into its own file)
##########
gobblin-modules/gobblin-kafka-09/src/test/java/org/apache/gobblin/runtime/KafkaAvroJobStatusMonitorTest.java:
##########
@@ -527,13 +637,14 @@ private GobblinTrackingEvent createGTE(String eventName,
Map<String, String> cus
return new GobblinTrackingEvent(timestamp, namespace, eventName, metadata);
}
- MockKafkaAvroJobStatusMonitor
createMockKafkaAvroJobStatusMonitor(AtomicBoolean
shouldThrowFakeExceptionInParseJobStatusToggle, Config additionalConfig) throws
IOException, ReflectiveOperationException {
+ MockKafkaAvroJobStatusMonitor
createMockKafkaAvroJobStatusMonitor(AtomicBoolean
shouldThrowFakeExceptionInParseJobStatusToggle, Config additionalConfig,
+ GaaSObservabilityEventProducer eventProducer) throws IOException,
ReflectiveOperationException {
Review Comment:
your choice, but rather than updating so many calls to now take a no-op
version, I might overload this method with a two-param form, that merely
forwards to the three-param one here, after constructing the no-op instance.
##########
gobblin-service/src/main/java/org/apache/gobblin/service/monitoring/KafkaJobStatusMonitor.java:
##########
@@ -288,6 +303,15 @@ private static void
modifyStateIfRetryRequired(org.apache.gobblin.configuration.
state.removeProp(TimingEvent.FlowEventConstants.DOES_CANCELED_FLOW_MERIT_RETRY);
}
+ private static boolean
isStateTransitionToFinal(org.apache.gobblin.configuration.State currentState,
List<org.apache.gobblin.configuration.State> prevStates) {
+ Set<String> finalStates = ImmutableSet.of(ExecutionStatus.COMPLETE.name(),
ExecutionStatus.CANCELLED.name(), ExecutionStatus.FAILED.name());
+ if (prevStates.size() == 0) {
+ return
finalStates.contains(currentState.getProp(JobStatusRetriever.EVENT_NAME_FIELD));
+ }
+ return currentState.contains(JobStatusRetriever.EVENT_NAME_FIELD) &&
finalStates.contains(currentState.getProp(JobStatusRetriever.EVENT_NAME_FIELD))
+ &&
!finalStates.contains(prevStates.get(prevStates.size()-1).getProp(JobStatusRetriever.EVENT_NAME_FIELD));
Review Comment:
agreed... that's what I meant by perhaps not possible. so the difference
would be purely stylistic... either form looks fine
##########
gobblin-service/src/main/java/org/apache/gobblin/service/monitoring/KafkaJobStatusMonitor.java:
##########
@@ -288,6 +303,15 @@ private static void
modifyStateIfRetryRequired(org.apache.gobblin.configuration.
state.removeProp(TimingEvent.FlowEventConstants.DOES_CANCELED_FLOW_MERIT_RETRY);
}
+ private static boolean
isStateTransitionToFinal(org.apache.gobblin.configuration.State currentState,
List<org.apache.gobblin.configuration.State> prevStates) {
+ Set<String> finalStates = ImmutableSet.of(ExecutionStatus.COMPLETE.name(),
ExecutionStatus.CANCELLED.name(), ExecutionStatus.FAILED.name());
Review Comment:
still thinking the same... why initialize on every invocation, rather than
once when the class is loaded?
Issue Time Tracking
-------------------
Worklog Id: (was: 840096)
Time Spent: 4h 10m (was: 4h)
> Emit GaaSObservabilityEvent
> ---------------------------
>
> Key: GOBBLIN-1764
> URL: https://issues.apache.org/jira/browse/GOBBLIN-1764
> Project: Apache Gobblin
> Issue Type: New Feature
> Components: gobblin-service
> Reporter: William Lo
> Assignee: Abhishek Tiwari
> Priority: Major
> Time Spent: 4h 10m
> Remaining Estimate: 0h
>
> GaaSObservabilityEvents are a new events that provides a job summary from
> pipelines in GaaS. It differs from GobblinTrackingEvents as it runs once per
> job pipeline, and it intended to be easily queryable and alert on.
> We want to emit this observability event from GaaS by deriving it from a
> job's job status. Since this feature is Experimental and WIP, it is not
> expected to fill out all of the fields immediately.
--
This message was sent by Atlassian Jira
(v8.20.10#820010)