[ 
https://issues.apache.org/jira/browse/GOBBLIN-1764?focusedWorklogId=840096&page=com.atlassian.jira.plugin.system.issuetabpanels:worklog-tabpanel#worklog-840096
 ]

ASF GitHub Bot logged work on GOBBLIN-1764:
-------------------------------------------

                Author: ASF GitHub Bot
            Created on: 18/Jan/23 21:46
            Start Date: 18/Jan/23 21:46
    Worklog Time Spent: 10m 
      Work Description: phet commented on code in PR #3623:
URL: https://github.com/apache/gobblin/pull/3623#discussion_r1080567642


##########
gobblin-modules/gobblin-kafka-09/src/test/java/org/apache/gobblin/runtime/KafkaAvroJobStatusMonitorTest.java:
##########
@@ -387,62 +460,95 @@ public void 
testProcessProgressingMessageWhenNoPreviousStatus() throws IOExcepti
     Assert.assertNull(state.getProp(JobStatusRetriever.EVENT_NAME_FIELD));
   }
 
-  @Test (dependsOnMethods = 
"testProcessingRetriedForApparentlyTransientErrors")
-  public void testProcessMessageForCancelledAndKilledEvent() throws 
IOException, ReflectiveOperationException {
-    KafkaEventReporter kafkaReporter = builder.build("localhost:0000", 
"topic4");
+  @Test (dependsOnMethods = 
"testProcessProgressingMessageWhenNoPreviousStatus")
+  public void testJobMonitorCreatesGaaSObservabilityEvent() throws 
IOException, ReflectiveOperationException {
+    KafkaEventReporter kafkaReporter = builder.build("localhost:0000", 
"topic6");
 
     //Submit GobblinTrackingEvents to Kafka
     ImmutableList.of(
         createFlowCompiledEvent(),
-        createJobOrchestratedEvent(1, 4),
-        createJobSLAKilledEvent(),
-        createJobOrchestratedEvent(2, 4),
-        createJobStartSLAKilledEvent(),
-        // Verify that kill event will not retry
-        createJobOrchestratedEvent(3, 4),
-        createJobCancelledEvent()
+        createJobSucceededEvent()
     ).forEach(event -> {
       context.submitEvent(event);
       kafkaReporter.report();
     });
-
     try {
       Thread.sleep(1000);
-    } catch(InterruptedException ex) {
+    } catch (InterruptedException ex) {
       Thread.currentThread().interrupt();
     }
-
-    MockKafkaAvroJobStatusMonitor jobStatusMonitor = 
createMockKafkaAvroJobStatusMonitor(new AtomicBoolean(false), 
ConfigFactory.empty());
+    MultiContextIssueRepository issueRepository = new 
InMemoryMultiContextIssueRepository();
+    MockGaaSObservabilityEventProducer mockEventProducer = new 
MockGaaSObservabilityEventProducer(
+        ConfigUtils.configToState(ConfigFactory.empty()), issueRepository);
+    MockKafkaAvroJobStatusMonitor jobStatusMonitor = 
createMockKafkaAvroJobStatusMonitor(new AtomicBoolean(false), 
ConfigFactory.empty(),
+        mockEventProducer);
     jobStatusMonitor.buildMetricsContextAndMetrics();
     Iterator<DecodeableKafkaRecord> recordIterator = Iterators.transform(
-      this.kafkaTestHelper.getIteratorForTopic(TOPIC),
-      this::convertMessageAndMetadataToDecodableKafkaRecord);
+        this.kafkaTestHelper.getIteratorForTopic(TOPIC),
+        this::convertMessageAndMetadataToDecodableKafkaRecord);
 
     State state = getNextJobStatusState(jobStatusMonitor, recordIterator, 
"NA", "NA");
     Assert.assertEquals(state.getProp(JobStatusRetriever.EVENT_NAME_FIELD), 
ExecutionStatus.COMPILED.name());
 
     state = getNextJobStatusState(jobStatusMonitor, recordIterator, 
this.jobGroup, this.jobName);
-    Assert.assertEquals(state.getProp(JobStatusRetriever.EVENT_NAME_FIELD), 
ExecutionStatus.ORCHESTRATED.name());
+    Assert.assertEquals(state.getProp(JobStatusRetriever.EVENT_NAME_FIELD), 
ExecutionStatus.COMPLETE.name());
 
-    state = getNextJobStatusState(jobStatusMonitor, recordIterator, 
this.jobGroup, this.jobName);
-    Assert.assertEquals(state.getProp(JobStatusRetriever.EVENT_NAME_FIELD), 
ExecutionStatus.PENDING_RETRY.name());
-    
Assert.assertEquals(state.getProp(TimingEvent.FlowEventConstants.SHOULD_RETRY_FIELD),
 Boolean.toString(true));
+    // Only the COMPLETE event should create a GaaSObservabilityEvent
+    List<GaaSObservabilityEventExperimental> emittedEvents = 
mockEventProducer.getTestEmittedEvents();
+    Iterator<GaaSObservabilityEventExperimental> iterator = 
emittedEvents.iterator();
+    GaaSObservabilityEventExperimental event1 = iterator.next();
+    Assert.assertEquals(event1.getJobStatus(), JobStatus.SUCCEEDED);
+    Assert.assertEquals(event1.getFlowName(), this.flowName);
+    Assert.assertEquals(event1.getFlowGroup(), this.flowGroup);
 
-    state = getNextJobStatusState(jobStatusMonitor, recordIterator, 
this.jobGroup, this.jobName);
-    //Job orchestrated for retrying
-    Assert.assertEquals(state.getProp(JobStatusRetriever.EVENT_NAME_FIELD), 
ExecutionStatus.ORCHESTRATED.name());
+    jobStatusMonitor.shutDown();
+  }
 
-    state = getNextJobStatusState(jobStatusMonitor, recordIterator, 
this.jobGroup, this.jobName);
-    Assert.assertEquals(state.getProp(JobStatusRetriever.EVENT_NAME_FIELD), 
ExecutionStatus.PENDING_RETRY.name());
+  @Test (dependsOnMethods = "testJobMonitorCreatesGaaSObservabilityEvent")
+  public void testObservabilityEventSingleEmission() throws IOException, 
ReflectiveOperationException {
+    KafkaEventReporter kafkaReporter = builder.build("localhost:0000", 
"topic7");
+
+    //Submit GobblinTrackingEvents to Kafka
+    ImmutableList.of(
+        createFlowCompiledEvent(),
+        createJobCancelledEvent(),
+        createJobSucceededEvent() // This event should be ignored
+    ).forEach(event -> {
+      context.submitEvent(event);
+      kafkaReporter.report();
+    });
+    try {
+      Thread.sleep(1000);
+    } catch (InterruptedException ex) {
+      Thread.currentThread().interrupt();
+    }
+    MultiContextIssueRepository issueRepository = new 
InMemoryMultiContextIssueRepository();
+    MockGaaSObservabilityEventProducer mockEventProducer = new 
MockGaaSObservabilityEventProducer(
+        ConfigUtils.configToState(ConfigFactory.empty()), issueRepository);
+    MockKafkaAvroJobStatusMonitor jobStatusMonitor = 
createMockKafkaAvroJobStatusMonitor(new AtomicBoolean(false), 
ConfigFactory.empty(),
+        mockEventProducer);
+    jobStatusMonitor.buildMetricsContextAndMetrics();
+    Iterator<DecodeableKafkaRecord> recordIterator = Iterators.transform(
+        this.kafkaTestHelper.getIteratorForTopic(TOPIC),
+        this::convertMessageAndMetadataToDecodableKafkaRecord);

Review Comment:
   perhaps not now... but definitely if you need to add further tests (as you 
fill in details of the observability event) - let's explore whether this 
repetitive boilerplate could live behind a reusable abstraction.



##########
gobblin-modules/gobblin-kafka-09/src/test/java/org/apache/gobblin/runtime/KafkaAvroJobStatusMonitorTest.java:
##########
@@ -358,6 +370,66 @@ public void 
testProcessingRetriedForApparentlyTransientErrors() throws IOExcepti
     jobStatusMonitor.shutDown();
   }
 
+  @Test (dependsOnMethods = 
"testProcessingRetriedForApparentlyTransientErrors")
+  public void testProcessMessageForCancelledAndKilledEvent() throws 
IOException, ReflectiveOperationException {

Review Comment:
   sorry, can't recall: was this in the prior revision or just introduced?
   
   I'm not clear whether it belongs in this PR or is unrelated...



##########
gobblin-service/src/main/java/org/apache/gobblin/service/monitoring/GaaSObservabilityEventProducer.java:
##########
@@ -0,0 +1,161 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.gobblin.service.monitoring;
+
+import java.io.Closeable;
+import java.io.IOException;
+import java.util.List;
+import java.util.stream.Collectors;
+
+import com.codahale.metrics.MetricRegistry;
+
+import lombok.extern.slf4j.Slf4j;
+
+import org.apache.gobblin.configuration.State;
+import org.apache.gobblin.instrumented.Instrumented;
+import org.apache.gobblin.metrics.ContextAwareMeter;
+import org.apache.gobblin.metrics.GaaSObservabilityEventExperimental;
+import org.apache.gobblin.metrics.Issue;
+import org.apache.gobblin.metrics.IssueSeverity;
+import org.apache.gobblin.metrics.JobStatus;
+import org.apache.gobblin.metrics.MetricContext;
+import org.apache.gobblin.metrics.ServiceMetricNames;
+import org.apache.gobblin.metrics.event.TimingEvent;
+import org.apache.gobblin.runtime.troubleshooter.MultiContextIssueRepository;
+import org.apache.gobblin.runtime.troubleshooter.TroubleshooterException;
+import org.apache.gobblin.runtime.troubleshooter.TroubleshooterUtils;
+import org.apache.gobblin.service.ExecutionStatus;
+
+
+
+/**
+ * A class embedded within GaaS running in the JobStatusMonitor which emits 
GaaSObservabilityEvents after each job in a flow
+ * This is an abstract class, we need a sub system like Kakfa, which support 
at least once delivery, to emit the event
+ */
+@Slf4j
+public abstract class GaaSObservabilityEventProducer implements Closeable {
+  public static final String GAAS_OBSERVABILITY_EVENT_PRODUCER_PREFIX = 
"GaaSObservabilityEventProducer.";
+  public static final String GAAS_OBSERVABILITY_EVENT_PRODUCER_CLASS_KEY = 
GAAS_OBSERVABILITY_EVENT_PRODUCER_PREFIX + "class.name";
+  public static final String DEFAULT_GAAS_OBSERVABILITY_EVENT_PRODUCER_CLASS = 
NoopGaaSObservabilityEventProducer.class.getName();
+  public static final String ISSUES_READ_FAILED_METRIC_NAME =  
GAAS_OBSERVABILITY_EVENT_PRODUCER_PREFIX + "getIssuesFailedCount";
+
+  protected MetricContext metricContext;
+  protected State state;
+  protected MultiContextIssueRepository issueRepository;
+  boolean instrumentationEnabled;
+  ContextAwareMeter getIssuesFailedMeter;
+
+  public GaaSObservabilityEventProducer(State state, 
MultiContextIssueRepository issueRepository, boolean instrumentationEnabled) {
+    this.state = state;
+    this.issueRepository = issueRepository;
+    this.instrumentationEnabled = instrumentationEnabled;
+    if (this.instrumentationEnabled) {
+      this.metricContext = Instrumented.getMetricContext(state, getClass());
+      this.getIssuesFailedMeter = 
this.metricContext.contextAwareMeter(MetricRegistry.name(ServiceMetricNames.GOBBLIN_SERVICE_PREFIX,
+          ISSUES_READ_FAILED_METRIC_NAME));
+    }
+  }
+
+  public void emitObservabilityEvent(State jobState) {

Review Comment:
   an argument cold be made for this to be `final`
   
   update: below, I found out why you don't... perhaps document for maintainers?



##########
gobblin-service/src/main/java/org/apache/gobblin/service/monitoring/NoopGaaSObservabilityEventProducer.java:
##########
@@ -0,0 +1,40 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.gobblin.service.monitoring;
+
+import org.apache.gobblin.configuration.State;
+import org.apache.gobblin.metrics.GaaSObservabilityEventExperimental;
+import org.apache.gobblin.runtime.troubleshooter.MultiContextIssueRepository;
+
+
+public class NoopGaaSObservabilityEventProducer extends 
GaaSObservabilityEventProducer {

Review Comment:
   good use of the Null Pattern!
   
   any javadoc?



##########
gobblin-service/src/test/java/org/apache/gobblin/service/monitoring/GaaSObservabilityProducerTest.java:
##########
@@ -0,0 +1,117 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.gobblin.service.monitoring;
+
+import java.time.ZonedDateTime;
+import java.util.ArrayList;
+import java.util.Collections;
+import java.util.Iterator;
+import java.util.List;
+import java.util.Map;
+import java.util.Properties;
+
+import org.junit.Test;
+import org.testng.Assert;
+
+import com.google.common.collect.Maps;
+
+import org.apache.gobblin.configuration.State;
+import org.apache.gobblin.metrics.GaaSObservabilityEventExperimental;
+import org.apache.gobblin.metrics.JobStatus;
+import org.apache.gobblin.metrics.event.TimingEvent;
+import 
org.apache.gobblin.runtime.troubleshooter.InMemoryMultiContextIssueRepository;
+import org.apache.gobblin.runtime.troubleshooter.Issue;
+import org.apache.gobblin.runtime.troubleshooter.IssueSeverity;
+import org.apache.gobblin.runtime.troubleshooter.MultiContextIssueRepository;
+import org.apache.gobblin.runtime.troubleshooter.TroubleshooterUtils;
+import org.apache.gobblin.service.ExecutionStatus;
+
+
+public class GaaSObservabilityProducerTest {
+
+  private MultiContextIssueRepository issueRepository = new 
InMemoryMultiContextIssueRepository();
+
+  @Test
+  public void testCreateGaaSObservabilityEvent() throws Exception {
+    String flowGroup = "testFlowGroup1";
+    String flowName = "testFlowName1";
+    String jobName = String.format("%s_%s_%s", flowGroup, flowName, 
"testJobName1");
+    String flowExecutionId = "1";
+    this.issueRepository.put(
+        TroubleshooterUtils.getContextIdForJob(flowGroup, flowName, 
flowExecutionId, jobName),
+        createTestIssue("issueSummary", "issueCode", IssueSeverity.INFO)
+    );
+    MockGaaSObservabilityProducer producer = new 
MockGaaSObservabilityProducer(new State(), this.issueRepository);
+    Map<String, String> gteEventMetadata = Maps.newHashMap();
+    gteEventMetadata.put(TimingEvent.FlowEventConstants.FLOW_GROUP_FIELD, 
flowGroup);
+    gteEventMetadata.put(TimingEvent.FlowEventConstants.FLOW_NAME_FIELD, 
flowName);
+    
gteEventMetadata.put(TimingEvent.FlowEventConstants.FLOW_EXECUTION_ID_FIELD, 
"1");
+    gteEventMetadata.put(TimingEvent.FlowEventConstants.JOB_NAME_FIELD, 
jobName);
+    gteEventMetadata.put(TimingEvent.FlowEventConstants.JOB_GROUP_FIELD, 
flowName);
+    gteEventMetadata.put(TimingEvent.METADATA_MESSAGE, "hostName");
+    gteEventMetadata.put(TimingEvent.METADATA_START_TIME, "1");
+    gteEventMetadata.put(TimingEvent.METADATA_END_TIME, "100");
+    gteEventMetadata.put(JobStatusRetriever.EVENT_NAME_FIELD, 
ExecutionStatus.COMPLETE.name());
+
+    Properties jobStatusProps = new Properties();
+    jobStatusProps.putAll(gteEventMetadata);
+    producer.emitObservabilityEvent(new State(jobStatusProps));
+
+    List<GaaSObservabilityEventExperimental> emittedEvents = 
producer.getTestEmittedEvents();
+
+    Assert.assertEquals(emittedEvents.size(), 1);
+    Iterator<GaaSObservabilityEventExperimental> iterator = 
emittedEvents.iterator();
+    GaaSObservabilityEventExperimental event = iterator.next();
+    Assert.assertEquals(event.getFlowGroup(), flowGroup);
+    Assert.assertEquals(event.getFlowName(), flowName);
+    Assert.assertEquals(event.getJobName(), jobName);
+    Assert.assertEquals(event.getFlowExecutionId(), 
Long.valueOf(flowExecutionId));
+    Assert.assertEquals(event.getJobStatus(), JobStatus.SUCCEEDED);
+    Assert.assertEquals(event.getExecutorUrl(), "hostName");
+    Assert.assertEquals(event.getIssues().size(), 1);
+  }
+
+  private Issue createTestIssue(String summary, String code, IssueSeverity 
severity) {
+    return 
Issue.builder().summary(summary).code(code).time(ZonedDateTime.now()).severity(severity).build();
+  }
+
+
+  public class MockGaaSObservabilityProducer extends 
GaaSObservabilityEventProducer {

Review Comment:
   didn't I see this impl above?  couldn't it be shared there (e.g. if you were 
to make this `public static`)?
   
   (or even put it into its own file)



##########
gobblin-modules/gobblin-kafka-09/src/test/java/org/apache/gobblin/runtime/KafkaAvroJobStatusMonitorTest.java:
##########
@@ -527,13 +637,14 @@ private GobblinTrackingEvent createGTE(String eventName, 
Map<String, String> cus
     return new GobblinTrackingEvent(timestamp, namespace, eventName, metadata);
   }
 
-  MockKafkaAvroJobStatusMonitor 
createMockKafkaAvroJobStatusMonitor(AtomicBoolean 
shouldThrowFakeExceptionInParseJobStatusToggle, Config additionalConfig) throws 
IOException, ReflectiveOperationException {
+  MockKafkaAvroJobStatusMonitor 
createMockKafkaAvroJobStatusMonitor(AtomicBoolean 
shouldThrowFakeExceptionInParseJobStatusToggle, Config additionalConfig,
+      GaaSObservabilityEventProducer eventProducer) throws IOException, 
ReflectiveOperationException {

Review Comment:
   your choice, but rather than updating so many calls to now take a no-op 
version, I might overload this method with a two-param form, that merely 
forwards to the three-param one here, after constructing the no-op instance.



##########
gobblin-service/src/main/java/org/apache/gobblin/service/monitoring/KafkaJobStatusMonitor.java:
##########
@@ -288,6 +303,15 @@ private static void 
modifyStateIfRetryRequired(org.apache.gobblin.configuration.
     
state.removeProp(TimingEvent.FlowEventConstants.DOES_CANCELED_FLOW_MERIT_RETRY);
   }
 
+  private static boolean 
isStateTransitionToFinal(org.apache.gobblin.configuration.State currentState, 
List<org.apache.gobblin.configuration.State> prevStates) {
+    Set<String> finalStates = ImmutableSet.of(ExecutionStatus.COMPLETE.name(), 
ExecutionStatus.CANCELLED.name(), ExecutionStatus.FAILED.name());
+    if (prevStates.size() == 0) {
+      return 
finalStates.contains(currentState.getProp(JobStatusRetriever.EVENT_NAME_FIELD));
+    }
+    return currentState.contains(JobStatusRetriever.EVENT_NAME_FIELD) && 
finalStates.contains(currentState.getProp(JobStatusRetriever.EVENT_NAME_FIELD))
+        && 
!finalStates.contains(prevStates.get(prevStates.size()-1).getProp(JobStatusRetriever.EVENT_NAME_FIELD));

Review Comment:
   agreed... that's what I meant by perhaps not possible.  so the difference 
would be purely stylistic... either form looks fine



##########
gobblin-service/src/main/java/org/apache/gobblin/service/monitoring/KafkaJobStatusMonitor.java:
##########
@@ -288,6 +303,15 @@ private static void 
modifyStateIfRetryRequired(org.apache.gobblin.configuration.
     
state.removeProp(TimingEvent.FlowEventConstants.DOES_CANCELED_FLOW_MERIT_RETRY);
   }
 
+  private static boolean 
isStateTransitionToFinal(org.apache.gobblin.configuration.State currentState, 
List<org.apache.gobblin.configuration.State> prevStates) {
+    Set<String> finalStates = ImmutableSet.of(ExecutionStatus.COMPLETE.name(), 
ExecutionStatus.CANCELLED.name(), ExecutionStatus.FAILED.name());

Review Comment:
   still thinking the same... why initialize on every invocation, rather than 
once when the class is loaded?





Issue Time Tracking
-------------------

    Worklog Id:     (was: 840096)
    Time Spent: 4h 10m  (was: 4h)

> Emit GaaSObservabilityEvent
> ---------------------------
>
>                 Key: GOBBLIN-1764
>                 URL: https://issues.apache.org/jira/browse/GOBBLIN-1764
>             Project: Apache Gobblin
>          Issue Type: New Feature
>          Components: gobblin-service
>            Reporter: William Lo
>            Assignee: Abhishek Tiwari
>            Priority: Major
>          Time Spent: 4h 10m
>  Remaining Estimate: 0h
>
> GaaSObservabilityEvents are a new events that provides a job summary from 
> pipelines in GaaS. It differs from GobblinTrackingEvents as it runs once per 
> job pipeline, and it intended to be easily queryable and alert on.
> We want to emit this observability event from GaaS by deriving it from a 
> job's job status. Since this feature is Experimental and WIP, it is not 
> expected to fill out all of the fields immediately.



--
This message was sent by Atlassian Jira
(v8.20.10#820010)

Reply via email to