[ 
https://issues.apache.org/jira/browse/GOBBLIN-1652?focusedWorklogId=774749&page=com.atlassian.jira.plugin.system.issuetabpanels:worklog-tabpanel#worklog-774749
 ]

ASF GitHub Bot logged work on GOBBLIN-1652:
-------------------------------------------

                Author: ASF GitHub Bot
            Created on: 25/May/22 18:17
            Start Date: 25/May/22 18:17
    Worklog Time Spent: 10m 
      Work Description: ZihanLi58 commented on code in PR #3513:
URL: https://github.com/apache/gobblin/pull/3513#discussion_r881976009


##########
gobblin-service/src/main/java/org/apache/gobblin/service/monitoring/KafkaJobStatusMonitor.java:
##########
@@ -221,56 +221,63 @@ protected void 
processMessage(DecodeableKafkaRecord<byte[],byte[]> message) {
   @VisibleForTesting
   static void addJobStatusToStateStore(org.apache.gobblin.configuration.State 
jobStatus, StateStore stateStore)
       throws IOException {
-    if (!jobStatus.contains(TimingEvent.FlowEventConstants.JOB_NAME_FIELD)) {
-      jobStatus.setProp(TimingEvent.FlowEventConstants.JOB_NAME_FIELD, 
JobStatusRetriever.NA_KEY);
-    }
-    if (!jobStatus.contains(TimingEvent.FlowEventConstants.JOB_GROUP_FIELD)) {
-      jobStatus.setProp(TimingEvent.FlowEventConstants.JOB_GROUP_FIELD, 
JobStatusRetriever.NA_KEY);
-    }
-    String flowName = 
jobStatus.getProp(TimingEvent.FlowEventConstants.FLOW_NAME_FIELD);
-    String flowGroup = 
jobStatus.getProp(TimingEvent.FlowEventConstants.FLOW_GROUP_FIELD);
-    String flowExecutionId = 
jobStatus.getProp(TimingEvent.FlowEventConstants.FLOW_EXECUTION_ID_FIELD);
-    String jobName = 
jobStatus.getProp(TimingEvent.FlowEventConstants.JOB_NAME_FIELD);
-    String jobGroup = 
jobStatus.getProp(TimingEvent.FlowEventConstants.JOB_GROUP_FIELD);
-    String storeName = jobStatusStoreName(flowGroup, flowName);
-    String tableName = jobStatusTableName(flowExecutionId, jobGroup, jobName);
-
-    List<org.apache.gobblin.configuration.State> states = 
stateStore.getAll(storeName, tableName);
-    if (states.size() > 0) {
-      org.apache.gobblin.configuration.State previousJobStatus = 
states.get(states.size() - 1);
-      String previousStatus = 
previousJobStatus.getProp(JobStatusRetriever.EVENT_NAME_FIELD);
-      String currentStatus = 
jobStatus.getProp(JobStatusRetriever.EVENT_NAME_FIELD);
-      int previousGeneration = 
previousJobStatus.getPropAsInt(TimingEvent.FlowEventConstants.CURRENT_GENERATION_FIELD,
 1);
-      // This is to make the change backward compatible as we may not have 
this info in cluster events
-      // If we does not have those info, we treat the event as coming from the 
same attempts as previous one
-      int currentGeneration = 
jobStatus.getPropAsInt(TimingEvent.FlowEventConstants.CURRENT_GENERATION_FIELD, 
previousGeneration);
-      int previousAttempts = 
previousJobStatus.getPropAsInt(TimingEvent.FlowEventConstants.CURRENT_ATTEMPTS_FIELD,
 1);
-      int currentAttempts = 
jobStatus.getPropAsInt(TimingEvent.FlowEventConstants.CURRENT_ATTEMPTS_FIELD, 
previousAttempts);
-      // We use three things to accurately count and thereby bound retries, 
even amidst out-of-order events (by skipping late arrivals).
-      // The generation is monotonically increasing, while the attempts may 
re-initialize back to 0. this two-part form prevents the composite value from 
ever repeating.
-      // And job status reflect the execution status in one attempt
-      if (previousStatus != null && currentStatus != null &&
-          (previousGeneration > currentGeneration
-              || (previousGeneration == currentGeneration && previousAttempts 
> currentAttempts)
-              || (previousGeneration == currentGeneration && previousAttempts 
== currentAttempts
-              && 
ORDERED_EXECUTION_STATUSES.indexOf(ExecutionStatus.valueOf(currentStatus)) < 
ORDERED_EXECUTION_STATUSES.indexOf(ExecutionStatus.valueOf(previousStatus))))){
-        log.warn(String.format("Received status [generation.attempts] = %s 
[%s.%s] when already %s [%s.%s] for flow (%s, %s, %s), job (%s, %s)",
-            currentStatus, currentGeneration, currentAttempts, previousStatus, 
previousGeneration, previousAttempts, flowGroup, flowName, flowExecutionId, 
jobGroup, jobName));
-        jobStatus = mergeState(states.get(states.size() - 1), jobStatus);
-      } else {
-        jobStatus = mergeState(jobStatus, states.get(states.size() - 1));
+    try {
+      if (!jobStatus.contains(TimingEvent.FlowEventConstants.JOB_NAME_FIELD)) {
+        jobStatus.setProp(TimingEvent.FlowEventConstants.JOB_NAME_FIELD, 
JobStatusRetriever.NA_KEY);
+      }
+      if (!jobStatus.contains(TimingEvent.FlowEventConstants.JOB_GROUP_FIELD)) 
{
+        jobStatus.setProp(TimingEvent.FlowEventConstants.JOB_GROUP_FIELD, 
JobStatusRetriever.NA_KEY);
+      }
+      String flowName = 
jobStatus.getProp(TimingEvent.FlowEventConstants.FLOW_NAME_FIELD);
+      String flowGroup = 
jobStatus.getProp(TimingEvent.FlowEventConstants.FLOW_GROUP_FIELD);
+      String flowExecutionId = 
jobStatus.getProp(TimingEvent.FlowEventConstants.FLOW_EXECUTION_ID_FIELD);
+      String jobName = 
jobStatus.getProp(TimingEvent.FlowEventConstants.JOB_NAME_FIELD);
+      String jobGroup = 
jobStatus.getProp(TimingEvent.FlowEventConstants.JOB_GROUP_FIELD);
+      String storeName = jobStatusStoreName(flowGroup, flowName);
+      String tableName = jobStatusTableName(flowExecutionId, jobGroup, 
jobName);
+
+      List<org.apache.gobblin.configuration.State> states = 
stateStore.getAll(storeName, tableName);
+      if (states.size() > 0) {
+        org.apache.gobblin.configuration.State previousJobStatus = 
states.get(states.size() - 1);
+        String previousStatus = 
previousJobStatus.getProp(JobStatusRetriever.EVENT_NAME_FIELD);
+        String currentStatus = 
jobStatus.getProp(JobStatusRetriever.EVENT_NAME_FIELD);
+        int previousGeneration = 
previousJobStatus.getPropAsInt(TimingEvent.FlowEventConstants.CURRENT_GENERATION_FIELD,
 1);
+        // This is to make the change backward compatible as we may not have 
this info in cluster events
+        // If we does not have those info, we treat the event as coming from 
the same attempts as previous one
+        int currentGeneration = 
jobStatus.getPropAsInt(TimingEvent.FlowEventConstants.CURRENT_GENERATION_FIELD, 
previousGeneration);
+        int previousAttempts = 
previousJobStatus.getPropAsInt(TimingEvent.FlowEventConstants.CURRENT_ATTEMPTS_FIELD,
 1);
+        int currentAttempts = 
jobStatus.getPropAsInt(TimingEvent.FlowEventConstants.CURRENT_ATTEMPTS_FIELD, 
previousAttempts);
+        // We use three things to accurately count and thereby bound retries, 
even amidst out-of-order events (by skipping late arrivals).
+        // The generation is monotonically increasing, while the attempts may 
re-initialize back to 0. this two-part form prevents the composite value from 
ever repeating.
+        // And job status reflect the execution status in one attempt
+        if (previousStatus != null && currentStatus != null && 
(previousGeneration > currentGeneration || (
+            previousGeneration == currentGeneration && previousAttempts > 
currentAttempts) || (previousGeneration == currentGeneration && 
previousAttempts == currentAttempts
+            && 
ORDERED_EXECUTION_STATUSES.indexOf(ExecutionStatus.valueOf(currentStatus))
+            < 
ORDERED_EXECUTION_STATUSES.indexOf(ExecutionStatus.valueOf(previousStatus))))) {
+          log.warn(String.format(
+              "Received status [generation.attempts] = %s [%s.%s] when already 
%s [%s.%s] for flow (%s, %s, %s), job (%s, %s)",
+              currentStatus, currentGeneration, currentAttempts, 
previousStatus, previousGeneration, previousAttempts,
+              flowGroup, flowName, flowExecutionId, jobGroup, jobName));
+          jobStatus = mergeState(states.get(states.size() - 1), jobStatus);
+        } else {
+          jobStatus = mergeState(jobStatus, states.get(states.size() - 1));
+        }
       }
-    }
 
-    modifyStateIfRetryRequired(jobStatus);
-    stateStore.put(storeName, tableName, jobStatus);
+      modifyStateIfRetryRequired(jobStatus);
+      stateStore.put(storeName, tableName, jobStatus);
+    } catch (Exception e) {

Review Comment:
   We are seeing NPE without any information, so want to catch the entire method





Issue Time Tracking
-------------------

    Worklog Id:     (was: 774749)
    Time Spent: 40m  (was: 0.5h)

> Add more log in the KafkaJobStatusMonitor in case it fails to process one 
> GobblinTrackingEvent
> ----------------------------------------------------------------------------------------------
>
>                 Key: GOBBLIN-1652
>                 URL: https://issues.apache.org/jira/browse/GOBBLIN-1652
>             Project: Apache Gobblin
>          Issue Type: Improvement
>            Reporter: Zihan Li
>            Priority: Major
>          Time Spent: 40m
>  Remaining Estimate: 0h
>
> Now in some conner case, we fail to process one kafka message, and it's super 
> hard to get the reason for why it fails. To get more info there we want to 
> add more log when processing message. Along with this, we need to add null 
> check inĀ 
> modifyStateIfRetryRequired to avoid NPE as well.



--
This message was sent by Atlassian Jira
(v8.20.7#820007)

Reply via email to