phet commented on code in PR #3513:
URL: https://github.com/apache/gobblin/pull/3513#discussion_r892976761
##########
gobblin-service/src/main/java/org/apache/gobblin/service/monitoring/KafkaJobStatusMonitor.java:
##########
@@ -221,56 +221,63 @@ protected void
processMessage(DecodeableKafkaRecord<byte[],byte[]> message) {
@VisibleForTesting
static void addJobStatusToStateStore(org.apache.gobblin.configuration.State
jobStatus, StateStore stateStore)
throws IOException {
- if (!jobStatus.contains(TimingEvent.FlowEventConstants.JOB_NAME_FIELD)) {
- jobStatus.setProp(TimingEvent.FlowEventConstants.JOB_NAME_FIELD,
JobStatusRetriever.NA_KEY);
- }
- if (!jobStatus.contains(TimingEvent.FlowEventConstants.JOB_GROUP_FIELD)) {
- jobStatus.setProp(TimingEvent.FlowEventConstants.JOB_GROUP_FIELD,
JobStatusRetriever.NA_KEY);
- }
- String flowName =
jobStatus.getProp(TimingEvent.FlowEventConstants.FLOW_NAME_FIELD);
- String flowGroup =
jobStatus.getProp(TimingEvent.FlowEventConstants.FLOW_GROUP_FIELD);
- String flowExecutionId =
jobStatus.getProp(TimingEvent.FlowEventConstants.FLOW_EXECUTION_ID_FIELD);
- String jobName =
jobStatus.getProp(TimingEvent.FlowEventConstants.JOB_NAME_FIELD);
- String jobGroup =
jobStatus.getProp(TimingEvent.FlowEventConstants.JOB_GROUP_FIELD);
- String storeName = jobStatusStoreName(flowGroup, flowName);
- String tableName = jobStatusTableName(flowExecutionId, jobGroup, jobName);
-
- List<org.apache.gobblin.configuration.State> states =
stateStore.getAll(storeName, tableName);
- if (states.size() > 0) {
- org.apache.gobblin.configuration.State previousJobStatus =
states.get(states.size() - 1);
- String previousStatus =
previousJobStatus.getProp(JobStatusRetriever.EVENT_NAME_FIELD);
- String currentStatus =
jobStatus.getProp(JobStatusRetriever.EVENT_NAME_FIELD);
- int previousGeneration =
previousJobStatus.getPropAsInt(TimingEvent.FlowEventConstants.CURRENT_GENERATION_FIELD,
1);
- // This is to make the change backward compatible as we may not have
this info in cluster events
- // If we does not have those info, we treat the event as coming from the
same attempts as previous one
- int currentGeneration =
jobStatus.getPropAsInt(TimingEvent.FlowEventConstants.CURRENT_GENERATION_FIELD,
previousGeneration);
- int previousAttempts =
previousJobStatus.getPropAsInt(TimingEvent.FlowEventConstants.CURRENT_ATTEMPTS_FIELD,
1);
- int currentAttempts =
jobStatus.getPropAsInt(TimingEvent.FlowEventConstants.CURRENT_ATTEMPTS_FIELD,
previousAttempts);
- // We use three things to accurately count and thereby bound retries,
even amidst out-of-order events (by skipping late arrivals).
- // The generation is monotonically increasing, while the attempts may
re-initialize back to 0. this two-part form prevents the composite value from
ever repeating.
- // And job status reflect the execution status in one attempt
- if (previousStatus != null && currentStatus != null &&
- (previousGeneration > currentGeneration
- || (previousGeneration == currentGeneration && previousAttempts
> currentAttempts)
- || (previousGeneration == currentGeneration && previousAttempts
== currentAttempts
- &&
ORDERED_EXECUTION_STATUSES.indexOf(ExecutionStatus.valueOf(currentStatus)) <
ORDERED_EXECUTION_STATUSES.indexOf(ExecutionStatus.valueOf(previousStatus))))){
- log.warn(String.format("Received status [generation.attempts] = %s
[%s.%s] when already %s [%s.%s] for flow (%s, %s, %s), job (%s, %s)",
- currentStatus, currentGeneration, currentAttempts, previousStatus,
previousGeneration, previousAttempts, flowGroup, flowName, flowExecutionId,
jobGroup, jobName));
- jobStatus = mergeState(states.get(states.size() - 1), jobStatus);
- } else {
- jobStatus = mergeState(jobStatus, states.get(states.size() - 1));
+ try {
+ if (!jobStatus.contains(TimingEvent.FlowEventConstants.JOB_NAME_FIELD)) {
+ jobStatus.setProp(TimingEvent.FlowEventConstants.JOB_NAME_FIELD,
JobStatusRetriever.NA_KEY);
+ }
+ if (!jobStatus.contains(TimingEvent.FlowEventConstants.JOB_GROUP_FIELD))
{
+ jobStatus.setProp(TimingEvent.FlowEventConstants.JOB_GROUP_FIELD,
JobStatusRetriever.NA_KEY);
+ }
+ String flowName =
jobStatus.getProp(TimingEvent.FlowEventConstants.FLOW_NAME_FIELD);
+ String flowGroup =
jobStatus.getProp(TimingEvent.FlowEventConstants.FLOW_GROUP_FIELD);
+ String flowExecutionId =
jobStatus.getProp(TimingEvent.FlowEventConstants.FLOW_EXECUTION_ID_FIELD);
+ String jobName =
jobStatus.getProp(TimingEvent.FlowEventConstants.JOB_NAME_FIELD);
+ String jobGroup =
jobStatus.getProp(TimingEvent.FlowEventConstants.JOB_GROUP_FIELD);
+ String storeName = jobStatusStoreName(flowGroup, flowName);
+ String tableName = jobStatusTableName(flowExecutionId, jobGroup,
jobName);
+
+ List<org.apache.gobblin.configuration.State> states =
stateStore.getAll(storeName, tableName);
+ if (states.size() > 0) {
+ org.apache.gobblin.configuration.State previousJobStatus =
states.get(states.size() - 1);
+ String previousStatus =
previousJobStatus.getProp(JobStatusRetriever.EVENT_NAME_FIELD);
+ String currentStatus =
jobStatus.getProp(JobStatusRetriever.EVENT_NAME_FIELD);
+ int previousGeneration =
previousJobStatus.getPropAsInt(TimingEvent.FlowEventConstants.CURRENT_GENERATION_FIELD,
1);
+ // This is to make the change backward compatible as we may not have
this info in cluster events
+ // If we does not have those info, we treat the event as coming from
the same attempts as previous one
+ int currentGeneration =
jobStatus.getPropAsInt(TimingEvent.FlowEventConstants.CURRENT_GENERATION_FIELD,
previousGeneration);
+ int previousAttempts =
previousJobStatus.getPropAsInt(TimingEvent.FlowEventConstants.CURRENT_ATTEMPTS_FIELD,
1);
+ int currentAttempts =
jobStatus.getPropAsInt(TimingEvent.FlowEventConstants.CURRENT_ATTEMPTS_FIELD,
previousAttempts);
+ // We use three things to accurately count and thereby bound retries,
even amidst out-of-order events (by skipping late arrivals).
+ // The generation is monotonically increasing, while the attempts may
re-initialize back to 0. this two-part form prevents the composite value from
ever repeating.
+ // And job status reflect the execution status in one attempt
+ if (previousStatus != null && currentStatus != null &&
(previousGeneration > currentGeneration || (
+ previousGeneration == currentGeneration && previousAttempts >
currentAttempts) || (previousGeneration == currentGeneration &&
previousAttempts == currentAttempts
+ &&
ORDERED_EXECUTION_STATUSES.indexOf(ExecutionStatus.valueOf(currentStatus))
+ <
ORDERED_EXECUTION_STATUSES.indexOf(ExecutionStatus.valueOf(previousStatus))))) {
+ log.warn(String.format(
+ "Received status [generation.attempts] = %s [%s.%s] when already
%s [%s.%s] for flow (%s, %s, %s), job (%s, %s)",
+ currentStatus, currentGeneration, currentAttempts,
previousStatus, previousGeneration, previousAttempts,
+ flowGroup, flowName, flowExecutionId, jobGroup, jobName));
+ jobStatus = mergeState(states.get(states.size() - 1), jobStatus);
+ } else {
+ jobStatus = mergeState(jobStatus, states.get(states.size() - 1));
+ }
}
- }
- modifyStateIfRetryRequired(jobStatus);
- stateStore.put(storeName, tableName, jobStatus);
+ modifyStateIfRetryRequired(jobStatus);
+ stateStore.put(storeName, tableName, jobStatus);
+ } catch (Exception e) {
+ log.warn("Meet exception when adding jobStatus to state store at "
+ + e.getStackTrace()[0].getClassName() + "line number: " +
e.getStackTrace()[0].getLineNumber(), e);
Review Comment:
makes sense to log that info. I was just thinking the full stack trace
(`warn` outputs from `e`) should have the info in
```
e.getStackTrace()[0].getClassName() + "line number: " +
e.getStackTrace()[0].getLineNumber()
```
--
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
To unsubscribe, e-mail: [email protected]
For queries about this service, please contact Infrastructure at:
[email protected]