This is an automated email from the ASF dual-hosted git repository.

zihanli58 pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/gobblin.git


The following commit(s) were added to refs/heads/master by this push:
     new d0135b064 [GOBBLIN-1652]Add more log in the KafkaJobStatusMonitor in 
case it fails to process one GobblinTrackingEvent (#3513)
d0135b064 is described below

commit d0135b06447a5a456042db75a0d6d772fba9ab4b
Author: Zihan Li <[email protected]>
AuthorDate: Tue Jun 14 13:21:01 2022 -0700

    [GOBBLIN-1652]Add more log in the KafkaJobStatusMonitor in case it fails to 
process one GobblinTrackingEvent (#3513)
    
    * address comments
    
    * use connectionmanager when httpclient is not cloesable
    
    * [GOBBLIN-1652]Add more log in the KafkaJobStatusMonitor in case it fails 
to process one GobblinTrackingEvent
    
    * add test
    
    * fix test
    
    Co-authored-by: Zihan Li <[email protected]>
---
 .../runtime/KafkaAvroJobStatusMonitorTest.java     | 30 +++++++
 .../service/monitoring/KafkaJobStatusMonitor.java  | 91 ++++++++++++----------
 .../monitoring/MysqlJobStatusRetrieverTest.java    |  2 +-
 3 files changed, 80 insertions(+), 43 deletions(-)

diff --git 
a/gobblin-modules/gobblin-kafka-09/src/test/java/org/apache/gobblin/runtime/KafkaAvroJobStatusMonitorTest.java
 
b/gobblin-modules/gobblin-kafka-09/src/test/java/org/apache/gobblin/runtime/KafkaAvroJobStatusMonitorTest.java
index eb86ebfb1..fc5b87802 100644
--- 
a/gobblin-modules/gobblin-kafka-09/src/test/java/org/apache/gobblin/runtime/KafkaAvroJobStatusMonitorTest.java
+++ 
b/gobblin-modules/gobblin-kafka-09/src/test/java/org/apache/gobblin/runtime/KafkaAvroJobStatusMonitorTest.java
@@ -19,6 +19,7 @@ package org.apache.gobblin.runtime;
 import java.io.File;
 import java.io.IOException;
 import java.util.Arrays;
+import java.util.HashMap;
 import java.util.Iterator;
 import java.util.List;
 import java.util.Map;
@@ -357,6 +358,35 @@ public class KafkaAvroJobStatusMonitorTest {
     jobStatusMonitor.shutDown();
   }
 
+  @Test (dependsOnMethods = "testProcessMessageForCancelledAndKilledEvent")
+  public void testProcessProgressingMessageWhenNoPreviousStatus() throws 
IOException, ReflectiveOperationException {
+    KafkaEventReporter kafkaReporter = builder.build("localhost:0000", 
"topic5");
+
+    //Submit GobblinTrackingEvents to Kafka
+    ImmutableList.of(
+        createGTE(TimingEvent.JOB_COMPLETION_PERCENTAGE, new HashMap<>())
+    ).forEach(event -> {
+      context.submitEvent(event);
+      kafkaReporter.report();
+    });
+
+    try {
+      Thread.sleep(1000);
+    } catch(InterruptedException ex) {
+      Thread.currentThread().interrupt();
+    }
+
+    MockKafkaAvroJobStatusMonitor jobStatusMonitor = 
createMockKafkaAvroJobStatusMonitor(new AtomicBoolean(false), 
ConfigFactory.empty());
+    jobStatusMonitor.buildMetricsContextAndMetrics();
+    Iterator<DecodeableKafkaRecord> recordIterator = Iterators.transform(
+        this.kafkaTestHelper.getIteratorForTopic(TOPIC),
+        this::convertMessageAndMetadataToDecodableKafkaRecord);
+
+    State state = getNextJobStatusState(jobStatusMonitor, recordIterator, 
this.jobGroup, this.jobName);
+    // Verify we are able to process it without NPE
+    Assert.assertNull(state.getProp(JobStatusRetriever.EVENT_NAME_FIELD));
+  }
+
   @Test (dependsOnMethods = 
"testProcessingRetriedForApparentlyTransientErrors")
   public void testProcessMessageForCancelledAndKilledEvent() throws 
IOException, ReflectiveOperationException {
     KafkaEventReporter kafkaReporter = builder.build("localhost:0000", 
"topic4");
diff --git 
a/gobblin-service/src/main/java/org/apache/gobblin/service/monitoring/KafkaJobStatusMonitor.java
 
b/gobblin-service/src/main/java/org/apache/gobblin/service/monitoring/KafkaJobStatusMonitor.java
index 363d5c744..272c94977 100644
--- 
a/gobblin-service/src/main/java/org/apache/gobblin/service/monitoring/KafkaJobStatusMonitor.java
+++ 
b/gobblin-service/src/main/java/org/apache/gobblin/service/monitoring/KafkaJobStatusMonitor.java
@@ -221,56 +221,63 @@ public abstract class KafkaJobStatusMonitor extends 
HighLevelConsumer<byte[], by
   @VisibleForTesting
   static void addJobStatusToStateStore(org.apache.gobblin.configuration.State 
jobStatus, StateStore stateStore)
       throws IOException {
-    if (!jobStatus.contains(TimingEvent.FlowEventConstants.JOB_NAME_FIELD)) {
-      jobStatus.setProp(TimingEvent.FlowEventConstants.JOB_NAME_FIELD, 
JobStatusRetriever.NA_KEY);
-    }
-    if (!jobStatus.contains(TimingEvent.FlowEventConstants.JOB_GROUP_FIELD)) {
-      jobStatus.setProp(TimingEvent.FlowEventConstants.JOB_GROUP_FIELD, 
JobStatusRetriever.NA_KEY);
-    }
-    String flowName = 
jobStatus.getProp(TimingEvent.FlowEventConstants.FLOW_NAME_FIELD);
-    String flowGroup = 
jobStatus.getProp(TimingEvent.FlowEventConstants.FLOW_GROUP_FIELD);
-    String flowExecutionId = 
jobStatus.getProp(TimingEvent.FlowEventConstants.FLOW_EXECUTION_ID_FIELD);
-    String jobName = 
jobStatus.getProp(TimingEvent.FlowEventConstants.JOB_NAME_FIELD);
-    String jobGroup = 
jobStatus.getProp(TimingEvent.FlowEventConstants.JOB_GROUP_FIELD);
-    String storeName = jobStatusStoreName(flowGroup, flowName);
-    String tableName = jobStatusTableName(flowExecutionId, jobGroup, jobName);
-
-    List<org.apache.gobblin.configuration.State> states = 
stateStore.getAll(storeName, tableName);
-    if (states.size() > 0) {
-      org.apache.gobblin.configuration.State previousJobStatus = 
states.get(states.size() - 1);
-      String previousStatus = 
previousJobStatus.getProp(JobStatusRetriever.EVENT_NAME_FIELD);
-      String currentStatus = 
jobStatus.getProp(JobStatusRetriever.EVENT_NAME_FIELD);
-      int previousGeneration = 
previousJobStatus.getPropAsInt(TimingEvent.FlowEventConstants.CURRENT_GENERATION_FIELD,
 1);
-      // This is to make the change backward compatible as we may not have 
this info in cluster events
-      // If we does not have those info, we treat the event as coming from the 
same attempts as previous one
-      int currentGeneration = 
jobStatus.getPropAsInt(TimingEvent.FlowEventConstants.CURRENT_GENERATION_FIELD, 
previousGeneration);
-      int previousAttempts = 
previousJobStatus.getPropAsInt(TimingEvent.FlowEventConstants.CURRENT_ATTEMPTS_FIELD,
 1);
-      int currentAttempts = 
jobStatus.getPropAsInt(TimingEvent.FlowEventConstants.CURRENT_ATTEMPTS_FIELD, 
previousAttempts);
-      // We use three things to accurately count and thereby bound retries, 
even amidst out-of-order events (by skipping late arrivals).
-      // The generation is monotonically increasing, while the attempts may 
re-initialize back to 0. this two-part form prevents the composite value from 
ever repeating.
-      // And job status reflect the execution status in one attempt
-      if (previousStatus != null && currentStatus != null &&
-          (previousGeneration > currentGeneration
-              || (previousGeneration == currentGeneration && previousAttempts 
> currentAttempts)
-              || (previousGeneration == currentGeneration && previousAttempts 
== currentAttempts
-              && 
ORDERED_EXECUTION_STATUSES.indexOf(ExecutionStatus.valueOf(currentStatus)) < 
ORDERED_EXECUTION_STATUSES.indexOf(ExecutionStatus.valueOf(previousStatus))))){
-        log.warn(String.format("Received status [generation.attempts] = %s 
[%s.%s] when already %s [%s.%s] for flow (%s, %s, %s), job (%s, %s)",
-            currentStatus, currentGeneration, currentAttempts, previousStatus, 
previousGeneration, previousAttempts, flowGroup, flowName, flowExecutionId, 
jobGroup, jobName));
-        jobStatus = mergeState(states.get(states.size() - 1), jobStatus);
-      } else {
-        jobStatus = mergeState(jobStatus, states.get(states.size() - 1));
+    try {
+      if (!jobStatus.contains(TimingEvent.FlowEventConstants.JOB_NAME_FIELD)) {
+        jobStatus.setProp(TimingEvent.FlowEventConstants.JOB_NAME_FIELD, 
JobStatusRetriever.NA_KEY);
+      }
+      if (!jobStatus.contains(TimingEvent.FlowEventConstants.JOB_GROUP_FIELD)) 
{
+        jobStatus.setProp(TimingEvent.FlowEventConstants.JOB_GROUP_FIELD, 
JobStatusRetriever.NA_KEY);
+      }
+      String flowName = 
jobStatus.getProp(TimingEvent.FlowEventConstants.FLOW_NAME_FIELD);
+      String flowGroup = 
jobStatus.getProp(TimingEvent.FlowEventConstants.FLOW_GROUP_FIELD);
+      String flowExecutionId = 
jobStatus.getProp(TimingEvent.FlowEventConstants.FLOW_EXECUTION_ID_FIELD);
+      String jobName = 
jobStatus.getProp(TimingEvent.FlowEventConstants.JOB_NAME_FIELD);
+      String jobGroup = 
jobStatus.getProp(TimingEvent.FlowEventConstants.JOB_GROUP_FIELD);
+      String storeName = jobStatusStoreName(flowGroup, flowName);
+      String tableName = jobStatusTableName(flowExecutionId, jobGroup, 
jobName);
+
+      List<org.apache.gobblin.configuration.State> states = 
stateStore.getAll(storeName, tableName);
+      if (states.size() > 0) {
+        org.apache.gobblin.configuration.State previousJobStatus = 
states.get(states.size() - 1);
+        String previousStatus = 
previousJobStatus.getProp(JobStatusRetriever.EVENT_NAME_FIELD);
+        String currentStatus = 
jobStatus.getProp(JobStatusRetriever.EVENT_NAME_FIELD);
+        int previousGeneration = 
previousJobStatus.getPropAsInt(TimingEvent.FlowEventConstants.CURRENT_GENERATION_FIELD,
 1);
+        // This is to make the change backward compatible as we may not have 
this info in cluster events
+        // If we does not have those info, we treat the event as coming from 
the same attempts as previous one
+        int currentGeneration = 
jobStatus.getPropAsInt(TimingEvent.FlowEventConstants.CURRENT_GENERATION_FIELD, 
previousGeneration);
+        int previousAttempts = 
previousJobStatus.getPropAsInt(TimingEvent.FlowEventConstants.CURRENT_ATTEMPTS_FIELD,
 1);
+        int currentAttempts = 
jobStatus.getPropAsInt(TimingEvent.FlowEventConstants.CURRENT_ATTEMPTS_FIELD, 
previousAttempts);
+        // We use three things to accurately count and thereby bound retries, 
even amidst out-of-order events (by skipping late arrivals).
+        // The generation is monotonically increasing, while the attempts may 
re-initialize back to 0. this two-part form prevents the composite value from 
ever repeating.
+        // And job status reflect the execution status in one attempt
+        if (previousStatus != null && currentStatus != null && 
(previousGeneration > currentGeneration || (
+            previousGeneration == currentGeneration && previousAttempts > 
currentAttempts) || (previousGeneration == currentGeneration && 
previousAttempts == currentAttempts
+            && 
ORDERED_EXECUTION_STATUSES.indexOf(ExecutionStatus.valueOf(currentStatus))
+            < 
ORDERED_EXECUTION_STATUSES.indexOf(ExecutionStatus.valueOf(previousStatus))))) {
+          log.warn(String.format(
+              "Received status [generation.attempts] = %s [%s.%s] when already 
%s [%s.%s] for flow (%s, %s, %s), job (%s, %s)",
+              currentStatus, currentGeneration, currentAttempts, 
previousStatus, previousGeneration, previousAttempts,
+              flowGroup, flowName, flowExecutionId, jobGroup, jobName));
+          jobStatus = mergeState(states.get(states.size() - 1), jobStatus);
+        } else {
+          jobStatus = mergeState(jobStatus, states.get(states.size() - 1));
+        }
       }
-    }
 
-    modifyStateIfRetryRequired(jobStatus);
-    stateStore.put(storeName, tableName, jobStatus);
+      modifyStateIfRetryRequired(jobStatus);
+      stateStore.put(storeName, tableName, jobStatus);
+    } catch (Exception e) {
+      log.warn("Meet exception when adding jobStatus to state store at "
+          + e.getStackTrace()[0].getClassName() + "line number: " + 
e.getStackTrace()[0].getLineNumber(), e);
+      throw new IOException(e);
+    }
   }
 
   private static void 
modifyStateIfRetryRequired(org.apache.gobblin.configuration.State state) {
     int maxAttempts = 
state.getPropAsInt(TimingEvent.FlowEventConstants.MAX_ATTEMPTS_FIELD, 1);
     int currentAttempts = 
state.getPropAsInt(TimingEvent.FlowEventConstants.CURRENT_ATTEMPTS_FIELD, 1);
     // SHOULD_RETRY_FIELD maybe reset by JOB_COMPLETION_PERCENTAGE event
-    if 
((state.getProp(JobStatusRetriever.EVENT_NAME_FIELD).equals(ExecutionStatus.FAILED.name())
+    if (state.contains(JobStatusRetriever.EVENT_NAME_FIELD) 
&&(state.getProp(JobStatusRetriever.EVENT_NAME_FIELD).equals(ExecutionStatus.FAILED.name())
         || 
state.getProp(JobStatusRetriever.EVENT_NAME_FIELD).equals(ExecutionStatus.PENDING_RETRY.name())
         || 
(state.getProp(JobStatusRetriever.EVENT_NAME_FIELD).equals(ExecutionStatus.CANCELLED.name())
 && 
state.contains(TimingEvent.FlowEventConstants.DOES_CANCELED_FLOW_MERIT_RETRY))
     ) && currentAttempts < maxAttempts) {
diff --git 
a/gobblin-service/src/test/java/org/apache/gobblin/service/monitoring/MysqlJobStatusRetrieverTest.java
 
b/gobblin-service/src/test/java/org/apache/gobblin/service/monitoring/MysqlJobStatusRetrieverTest.java
index 765b526df..d226c8322 100644
--- 
a/gobblin-service/src/test/java/org/apache/gobblin/service/monitoring/MysqlJobStatusRetrieverTest.java
+++ 
b/gobblin-service/src/test/java/org/apache/gobblin/service/monitoring/MysqlJobStatusRetrieverTest.java
@@ -160,7 +160,7 @@ public class MysqlJobStatusRetrieverTest extends 
JobStatusRetrieverTest {
     try {
       KafkaJobStatusMonitor.addJobStatusToStateStore(jobStatus, 
this.jobStatusRetriever.getStateStore());
     } catch (IOException e) {
-      Assert.assertTrue(e.getCause().getMessage().contains("Data too long"));
+      Assert.assertTrue(e.getCause().getCause().getMessage().contains("Data 
too long"));
       return;
     }
     Assert.fail();

Reply via email to