phet commented on code in PR #3513:
URL: https://github.com/apache/gobblin/pull/3513#discussion_r892976761


##########
gobblin-service/src/main/java/org/apache/gobblin/service/monitoring/KafkaJobStatusMonitor.java:
##########
@@ -221,56 +221,63 @@ protected void 
processMessage(DecodeableKafkaRecord<byte[],byte[]> message) {
   @VisibleForTesting
   static void addJobStatusToStateStore(org.apache.gobblin.configuration.State 
jobStatus, StateStore stateStore)
       throws IOException {
-    if (!jobStatus.contains(TimingEvent.FlowEventConstants.JOB_NAME_FIELD)) {
-      jobStatus.setProp(TimingEvent.FlowEventConstants.JOB_NAME_FIELD, 
JobStatusRetriever.NA_KEY);
-    }
-    if (!jobStatus.contains(TimingEvent.FlowEventConstants.JOB_GROUP_FIELD)) {
-      jobStatus.setProp(TimingEvent.FlowEventConstants.JOB_GROUP_FIELD, 
JobStatusRetriever.NA_KEY);
-    }
-    String flowName = 
jobStatus.getProp(TimingEvent.FlowEventConstants.FLOW_NAME_FIELD);
-    String flowGroup = 
jobStatus.getProp(TimingEvent.FlowEventConstants.FLOW_GROUP_FIELD);
-    String flowExecutionId = 
jobStatus.getProp(TimingEvent.FlowEventConstants.FLOW_EXECUTION_ID_FIELD);
-    String jobName = 
jobStatus.getProp(TimingEvent.FlowEventConstants.JOB_NAME_FIELD);
-    String jobGroup = 
jobStatus.getProp(TimingEvent.FlowEventConstants.JOB_GROUP_FIELD);
-    String storeName = jobStatusStoreName(flowGroup, flowName);
-    String tableName = jobStatusTableName(flowExecutionId, jobGroup, jobName);
-
-    List<org.apache.gobblin.configuration.State> states = 
stateStore.getAll(storeName, tableName);
-    if (states.size() > 0) {
-      org.apache.gobblin.configuration.State previousJobStatus = 
states.get(states.size() - 1);
-      String previousStatus = 
previousJobStatus.getProp(JobStatusRetriever.EVENT_NAME_FIELD);
-      String currentStatus = 
jobStatus.getProp(JobStatusRetriever.EVENT_NAME_FIELD);
-      int previousGeneration = 
previousJobStatus.getPropAsInt(TimingEvent.FlowEventConstants.CURRENT_GENERATION_FIELD,
 1);
-      // This is to make the change backward compatible as we may not have 
this info in cluster events
-      // If we does not have those info, we treat the event as coming from the 
same attempts as previous one
-      int currentGeneration = 
jobStatus.getPropAsInt(TimingEvent.FlowEventConstants.CURRENT_GENERATION_FIELD, 
previousGeneration);
-      int previousAttempts = 
previousJobStatus.getPropAsInt(TimingEvent.FlowEventConstants.CURRENT_ATTEMPTS_FIELD,
 1);
-      int currentAttempts = 
jobStatus.getPropAsInt(TimingEvent.FlowEventConstants.CURRENT_ATTEMPTS_FIELD, 
previousAttempts);
-      // We use three things to accurately count and thereby bound retries, 
even amidst out-of-order events (by skipping late arrivals).
-      // The generation is monotonically increasing, while the attempts may 
re-initialize back to 0. this two-part form prevents the composite value from 
ever repeating.
-      // And job status reflect the execution status in one attempt
-      if (previousStatus != null && currentStatus != null &&
-          (previousGeneration > currentGeneration
-              || (previousGeneration == currentGeneration && previousAttempts 
> currentAttempts)
-              || (previousGeneration == currentGeneration && previousAttempts 
== currentAttempts
-              && 
ORDERED_EXECUTION_STATUSES.indexOf(ExecutionStatus.valueOf(currentStatus)) < 
ORDERED_EXECUTION_STATUSES.indexOf(ExecutionStatus.valueOf(previousStatus))))){
-        log.warn(String.format("Received status [generation.attempts] = %s 
[%s.%s] when already %s [%s.%s] for flow (%s, %s, %s), job (%s, %s)",
-            currentStatus, currentGeneration, currentAttempts, previousStatus, 
previousGeneration, previousAttempts, flowGroup, flowName, flowExecutionId, 
jobGroup, jobName));
-        jobStatus = mergeState(states.get(states.size() - 1), jobStatus);
-      } else {
-        jobStatus = mergeState(jobStatus, states.get(states.size() - 1));
+    try {
+      if (!jobStatus.contains(TimingEvent.FlowEventConstants.JOB_NAME_FIELD)) {
+        jobStatus.setProp(TimingEvent.FlowEventConstants.JOB_NAME_FIELD, 
JobStatusRetriever.NA_KEY);
+      }
+      if (!jobStatus.contains(TimingEvent.FlowEventConstants.JOB_GROUP_FIELD)) 
{
+        jobStatus.setProp(TimingEvent.FlowEventConstants.JOB_GROUP_FIELD, 
JobStatusRetriever.NA_KEY);
+      }
+      String flowName = 
jobStatus.getProp(TimingEvent.FlowEventConstants.FLOW_NAME_FIELD);
+      String flowGroup = 
jobStatus.getProp(TimingEvent.FlowEventConstants.FLOW_GROUP_FIELD);
+      String flowExecutionId = 
jobStatus.getProp(TimingEvent.FlowEventConstants.FLOW_EXECUTION_ID_FIELD);
+      String jobName = 
jobStatus.getProp(TimingEvent.FlowEventConstants.JOB_NAME_FIELD);
+      String jobGroup = 
jobStatus.getProp(TimingEvent.FlowEventConstants.JOB_GROUP_FIELD);
+      String storeName = jobStatusStoreName(flowGroup, flowName);
+      String tableName = jobStatusTableName(flowExecutionId, jobGroup, 
jobName);
+
+      List<org.apache.gobblin.configuration.State> states = 
stateStore.getAll(storeName, tableName);
+      if (states.size() > 0) {
+        org.apache.gobblin.configuration.State previousJobStatus = 
states.get(states.size() - 1);
+        String previousStatus = 
previousJobStatus.getProp(JobStatusRetriever.EVENT_NAME_FIELD);
+        String currentStatus = 
jobStatus.getProp(JobStatusRetriever.EVENT_NAME_FIELD);
+        int previousGeneration = 
previousJobStatus.getPropAsInt(TimingEvent.FlowEventConstants.CURRENT_GENERATION_FIELD,
 1);
+        // This is to make the change backward compatible as we may not have 
this info in cluster events
+        // If we does not have those info, we treat the event as coming from 
the same attempts as previous one
+        int currentGeneration = 
jobStatus.getPropAsInt(TimingEvent.FlowEventConstants.CURRENT_GENERATION_FIELD, 
previousGeneration);
+        int previousAttempts = 
previousJobStatus.getPropAsInt(TimingEvent.FlowEventConstants.CURRENT_ATTEMPTS_FIELD,
 1);
+        int currentAttempts = 
jobStatus.getPropAsInt(TimingEvent.FlowEventConstants.CURRENT_ATTEMPTS_FIELD, 
previousAttempts);
+        // We use three things to accurately count and thereby bound retries, 
even amidst out-of-order events (by skipping late arrivals).
+        // The generation is monotonically increasing, while the attempts may 
re-initialize back to 0. this two-part form prevents the composite value from 
ever repeating.
+        // And job status reflect the execution status in one attempt
+        if (previousStatus != null && currentStatus != null && 
(previousGeneration > currentGeneration || (
+            previousGeneration == currentGeneration && previousAttempts > 
currentAttempts) || (previousGeneration == currentGeneration && 
previousAttempts == currentAttempts
+            && 
ORDERED_EXECUTION_STATUSES.indexOf(ExecutionStatus.valueOf(currentStatus))
+            < 
ORDERED_EXECUTION_STATUSES.indexOf(ExecutionStatus.valueOf(previousStatus))))) {
+          log.warn(String.format(
+              "Received status [generation.attempts] = %s [%s.%s] when already 
%s [%s.%s] for flow (%s, %s, %s), job (%s, %s)",
+              currentStatus, currentGeneration, currentAttempts, 
previousStatus, previousGeneration, previousAttempts,
+              flowGroup, flowName, flowExecutionId, jobGroup, jobName));
+          jobStatus = mergeState(states.get(states.size() - 1), jobStatus);
+        } else {
+          jobStatus = mergeState(jobStatus, states.get(states.size() - 1));
+        }
       }
-    }
 
-    modifyStateIfRetryRequired(jobStatus);
-    stateStore.put(storeName, tableName, jobStatus);
+      modifyStateIfRetryRequired(jobStatus);
+      stateStore.put(storeName, tableName, jobStatus);
+    } catch (Exception e) {
+      log.warn("Meet exception when adding jobStatus to state store at "
+          + e.getStackTrace()[0].getClassName() + "line number: " + 
e.getStackTrace()[0].getLineNumber(), e);

Review Comment:
   makes sense to log that info.  I was just thinking the full stack trace 
(`warn` outputs from `e`) should have the info in
   ```
   e.getStackTrace()[0].getClassName() + "line number: " + 
e.getStackTrace()[0].getLineNumber()
   ```



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: [email protected]

For queries about this service, please contact Infrastructure at:
[email protected]

Reply via email to