[
https://issues.apache.org/jira/browse/GOBBLIN-1652?focusedWorklogId=779726&page=com.atlassian.jira.plugin.system.issuetabpanels:worklog-tabpanel#worklog-779726
]
ASF GitHub Bot logged work on GOBBLIN-1652:
-------------------------------------------
Author: ASF GitHub Bot
Created on: 09/Jun/22 00:44
Start Date: 09/Jun/22 00:44
Worklog Time Spent: 10m
Work Description: phet commented on code in PR #3513:
URL: https://github.com/apache/gobblin/pull/3513#discussion_r892976761
##########
gobblin-service/src/main/java/org/apache/gobblin/service/monitoring/KafkaJobStatusMonitor.java:
##########
@@ -221,56 +221,63 @@ protected void
processMessage(DecodeableKafkaRecord<byte[],byte[]> message) {
@VisibleForTesting
static void addJobStatusToStateStore(org.apache.gobblin.configuration.State
jobStatus, StateStore stateStore)
throws IOException {
- if (!jobStatus.contains(TimingEvent.FlowEventConstants.JOB_NAME_FIELD)) {
- jobStatus.setProp(TimingEvent.FlowEventConstants.JOB_NAME_FIELD,
JobStatusRetriever.NA_KEY);
- }
- if (!jobStatus.contains(TimingEvent.FlowEventConstants.JOB_GROUP_FIELD)) {
- jobStatus.setProp(TimingEvent.FlowEventConstants.JOB_GROUP_FIELD,
JobStatusRetriever.NA_KEY);
- }
- String flowName =
jobStatus.getProp(TimingEvent.FlowEventConstants.FLOW_NAME_FIELD);
- String flowGroup =
jobStatus.getProp(TimingEvent.FlowEventConstants.FLOW_GROUP_FIELD);
- String flowExecutionId =
jobStatus.getProp(TimingEvent.FlowEventConstants.FLOW_EXECUTION_ID_FIELD);
- String jobName =
jobStatus.getProp(TimingEvent.FlowEventConstants.JOB_NAME_FIELD);
- String jobGroup =
jobStatus.getProp(TimingEvent.FlowEventConstants.JOB_GROUP_FIELD);
- String storeName = jobStatusStoreName(flowGroup, flowName);
- String tableName = jobStatusTableName(flowExecutionId, jobGroup, jobName);
-
- List<org.apache.gobblin.configuration.State> states =
stateStore.getAll(storeName, tableName);
- if (states.size() > 0) {
- org.apache.gobblin.configuration.State previousJobStatus =
states.get(states.size() - 1);
- String previousStatus =
previousJobStatus.getProp(JobStatusRetriever.EVENT_NAME_FIELD);
- String currentStatus =
jobStatus.getProp(JobStatusRetriever.EVENT_NAME_FIELD);
- int previousGeneration =
previousJobStatus.getPropAsInt(TimingEvent.FlowEventConstants.CURRENT_GENERATION_FIELD,
1);
- // This is to make the change backward compatible as we may not have
this info in cluster events
- // If we does not have those info, we treat the event as coming from the
same attempts as previous one
- int currentGeneration =
jobStatus.getPropAsInt(TimingEvent.FlowEventConstants.CURRENT_GENERATION_FIELD,
previousGeneration);
- int previousAttempts =
previousJobStatus.getPropAsInt(TimingEvent.FlowEventConstants.CURRENT_ATTEMPTS_FIELD,
1);
- int currentAttempts =
jobStatus.getPropAsInt(TimingEvent.FlowEventConstants.CURRENT_ATTEMPTS_FIELD,
previousAttempts);
- // We use three things to accurately count and thereby bound retries,
even amidst out-of-order events (by skipping late arrivals).
- // The generation is monotonically increasing, while the attempts may
re-initialize back to 0. this two-part form prevents the composite value from
ever repeating.
- // And job status reflect the execution status in one attempt
- if (previousStatus != null && currentStatus != null &&
- (previousGeneration > currentGeneration
- || (previousGeneration == currentGeneration && previousAttempts
> currentAttempts)
- || (previousGeneration == currentGeneration && previousAttempts
== currentAttempts
- &&
ORDERED_EXECUTION_STATUSES.indexOf(ExecutionStatus.valueOf(currentStatus)) <
ORDERED_EXECUTION_STATUSES.indexOf(ExecutionStatus.valueOf(previousStatus))))){
- log.warn(String.format("Received status [generation.attempts] = %s
[%s.%s] when already %s [%s.%s] for flow (%s, %s, %s), job (%s, %s)",
- currentStatus, currentGeneration, currentAttempts, previousStatus,
previousGeneration, previousAttempts, flowGroup, flowName, flowExecutionId,
jobGroup, jobName));
- jobStatus = mergeState(states.get(states.size() - 1), jobStatus);
- } else {
- jobStatus = mergeState(jobStatus, states.get(states.size() - 1));
+ try {
+ if (!jobStatus.contains(TimingEvent.FlowEventConstants.JOB_NAME_FIELD)) {
+ jobStatus.setProp(TimingEvent.FlowEventConstants.JOB_NAME_FIELD,
JobStatusRetriever.NA_KEY);
+ }
+ if (!jobStatus.contains(TimingEvent.FlowEventConstants.JOB_GROUP_FIELD))
{
+ jobStatus.setProp(TimingEvent.FlowEventConstants.JOB_GROUP_FIELD,
JobStatusRetriever.NA_KEY);
+ }
+ String flowName =
jobStatus.getProp(TimingEvent.FlowEventConstants.FLOW_NAME_FIELD);
+ String flowGroup =
jobStatus.getProp(TimingEvent.FlowEventConstants.FLOW_GROUP_FIELD);
+ String flowExecutionId =
jobStatus.getProp(TimingEvent.FlowEventConstants.FLOW_EXECUTION_ID_FIELD);
+ String jobName =
jobStatus.getProp(TimingEvent.FlowEventConstants.JOB_NAME_FIELD);
+ String jobGroup =
jobStatus.getProp(TimingEvent.FlowEventConstants.JOB_GROUP_FIELD);
+ String storeName = jobStatusStoreName(flowGroup, flowName);
+ String tableName = jobStatusTableName(flowExecutionId, jobGroup,
jobName);
+
+ List<org.apache.gobblin.configuration.State> states =
stateStore.getAll(storeName, tableName);
+ if (states.size() > 0) {
+ org.apache.gobblin.configuration.State previousJobStatus =
states.get(states.size() - 1);
+ String previousStatus =
previousJobStatus.getProp(JobStatusRetriever.EVENT_NAME_FIELD);
+ String currentStatus =
jobStatus.getProp(JobStatusRetriever.EVENT_NAME_FIELD);
+ int previousGeneration =
previousJobStatus.getPropAsInt(TimingEvent.FlowEventConstants.CURRENT_GENERATION_FIELD,
1);
+ // This is to make the change backward compatible as we may not have
this info in cluster events
+ // If we does not have those info, we treat the event as coming from
the same attempts as previous one
+ int currentGeneration =
jobStatus.getPropAsInt(TimingEvent.FlowEventConstants.CURRENT_GENERATION_FIELD,
previousGeneration);
+ int previousAttempts =
previousJobStatus.getPropAsInt(TimingEvent.FlowEventConstants.CURRENT_ATTEMPTS_FIELD,
1);
+ int currentAttempts =
jobStatus.getPropAsInt(TimingEvent.FlowEventConstants.CURRENT_ATTEMPTS_FIELD,
previousAttempts);
+ // We use three things to accurately count and thereby bound retries,
even amidst out-of-order events (by skipping late arrivals).
+ // The generation is monotonically increasing, while the attempts may
re-initialize back to 0. this two-part form prevents the composite value from
ever repeating.
+ // And job status reflect the execution status in one attempt
+ if (previousStatus != null && currentStatus != null &&
(previousGeneration > currentGeneration || (
+ previousGeneration == currentGeneration && previousAttempts >
currentAttempts) || (previousGeneration == currentGeneration &&
previousAttempts == currentAttempts
+ &&
ORDERED_EXECUTION_STATUSES.indexOf(ExecutionStatus.valueOf(currentStatus))
+ <
ORDERED_EXECUTION_STATUSES.indexOf(ExecutionStatus.valueOf(previousStatus))))) {
+ log.warn(String.format(
+ "Received status [generation.attempts] = %s [%s.%s] when already
%s [%s.%s] for flow (%s, %s, %s), job (%s, %s)",
+ currentStatus, currentGeneration, currentAttempts,
previousStatus, previousGeneration, previousAttempts,
+ flowGroup, flowName, flowExecutionId, jobGroup, jobName));
+ jobStatus = mergeState(states.get(states.size() - 1), jobStatus);
+ } else {
+ jobStatus = mergeState(jobStatus, states.get(states.size() - 1));
+ }
}
- }
- modifyStateIfRetryRequired(jobStatus);
- stateStore.put(storeName, tableName, jobStatus);
+ modifyStateIfRetryRequired(jobStatus);
+ stateStore.put(storeName, tableName, jobStatus);
+ } catch (Exception e) {
+ log.warn("Meet exception when adding jobStatus to state store at "
+ + e.getStackTrace()[0].getClassName() + "line number: " +
e.getStackTrace()[0].getLineNumber(), e);
Review Comment:
makes sense to log that info. I was just thinking the full stack trace
(`warn` outputs from `e`) should have the info in
```
e.getStackTrace()[0].getClassName() + "line number: " +
e.getStackTrace()[0].getLineNumber()
```
Issue Time Tracking
-------------------
Worklog Id: (was: 779726)
Time Spent: 1h (was: 50m)
> Add more log in the KafkaJobStatusMonitor in case it fails to process one
> GobblinTrackingEvent
> ----------------------------------------------------------------------------------------------
>
> Key: GOBBLIN-1652
> URL: https://issues.apache.org/jira/browse/GOBBLIN-1652
> Project: Apache Gobblin
> Issue Type: Improvement
> Reporter: Zihan Li
> Priority: Major
> Time Spent: 1h
> Remaining Estimate: 0h
>
> Now in some conner case, we fail to process one kafka message, and it's super
> hard to get the reason for why it fails. To get more info there we want to
> add more log when processing message. Along with this, we need to add null
> check inĀ
> modifyStateIfRetryRequired to avoid NPE as well.
--
This message was sent by Atlassian Jira
(v8.20.7#820007)