phet commented on code in PR #3513:
URL: https://github.com/apache/gobblin/pull/3513#discussion_r881190936
##########
gobblin-service/src/main/java/org/apache/gobblin/service/monitoring/KafkaJobStatusMonitor.java:
##########
@@ -221,56 +221,63 @@ protected void
processMessage(DecodeableKafkaRecord<byte[],byte[]> message) {
@VisibleForTesting
static void addJobStatusToStateStore(org.apache.gobblin.configuration.State
jobStatus, StateStore stateStore)
throws IOException {
- if (!jobStatus.contains(TimingEvent.FlowEventConstants.JOB_NAME_FIELD)) {
- jobStatus.setProp(TimingEvent.FlowEventConstants.JOB_NAME_FIELD,
JobStatusRetriever.NA_KEY);
- }
- if (!jobStatus.contains(TimingEvent.FlowEventConstants.JOB_GROUP_FIELD)) {
- jobStatus.setProp(TimingEvent.FlowEventConstants.JOB_GROUP_FIELD,
JobStatusRetriever.NA_KEY);
- }
- String flowName =
jobStatus.getProp(TimingEvent.FlowEventConstants.FLOW_NAME_FIELD);
- String flowGroup =
jobStatus.getProp(TimingEvent.FlowEventConstants.FLOW_GROUP_FIELD);
- String flowExecutionId =
jobStatus.getProp(TimingEvent.FlowEventConstants.FLOW_EXECUTION_ID_FIELD);
- String jobName =
jobStatus.getProp(TimingEvent.FlowEventConstants.JOB_NAME_FIELD);
- String jobGroup =
jobStatus.getProp(TimingEvent.FlowEventConstants.JOB_GROUP_FIELD);
- String storeName = jobStatusStoreName(flowGroup, flowName);
- String tableName = jobStatusTableName(flowExecutionId, jobGroup, jobName);
-
- List<org.apache.gobblin.configuration.State> states =
stateStore.getAll(storeName, tableName);
- if (states.size() > 0) {
- org.apache.gobblin.configuration.State previousJobStatus =
states.get(states.size() - 1);
- String previousStatus =
previousJobStatus.getProp(JobStatusRetriever.EVENT_NAME_FIELD);
- String currentStatus =
jobStatus.getProp(JobStatusRetriever.EVENT_NAME_FIELD);
- int previousGeneration =
previousJobStatus.getPropAsInt(TimingEvent.FlowEventConstants.CURRENT_GENERATION_FIELD,
1);
- // This is to make the change backward compatible as we may not have
this info in cluster events
- // If we does not have those info, we treat the event as coming from the
same attempts as previous one
- int currentGeneration =
jobStatus.getPropAsInt(TimingEvent.FlowEventConstants.CURRENT_GENERATION_FIELD,
previousGeneration);
- int previousAttempts =
previousJobStatus.getPropAsInt(TimingEvent.FlowEventConstants.CURRENT_ATTEMPTS_FIELD,
1);
- int currentAttempts =
jobStatus.getPropAsInt(TimingEvent.FlowEventConstants.CURRENT_ATTEMPTS_FIELD,
previousAttempts);
- // We use three things to accurately count and thereby bound retries,
even amidst out-of-order events (by skipping late arrivals).
- // The generation is monotonically increasing, while the attempts may
re-initialize back to 0. this two-part form prevents the composite value from
ever repeating.
- // And job status reflect the execution status in one attempt
- if (previousStatus != null && currentStatus != null &&
- (previousGeneration > currentGeneration
- || (previousGeneration == currentGeneration && previousAttempts
> currentAttempts)
- || (previousGeneration == currentGeneration && previousAttempts
== currentAttempts
- &&
ORDERED_EXECUTION_STATUSES.indexOf(ExecutionStatus.valueOf(currentStatus)) <
ORDERED_EXECUTION_STATUSES.indexOf(ExecutionStatus.valueOf(previousStatus))))){
- log.warn(String.format("Received status [generation.attempts] = %s
[%s.%s] when already %s [%s.%s] for flow (%s, %s, %s), job (%s, %s)",
- currentStatus, currentGeneration, currentAttempts, previousStatus,
previousGeneration, previousAttempts, flowGroup, flowName, flowExecutionId,
jobGroup, jobName));
- jobStatus = mergeState(states.get(states.size() - 1), jobStatus);
- } else {
- jobStatus = mergeState(jobStatus, states.get(states.size() - 1));
+ try {
+ if (!jobStatus.contains(TimingEvent.FlowEventConstants.JOB_NAME_FIELD)) {
+ jobStatus.setProp(TimingEvent.FlowEventConstants.JOB_NAME_FIELD,
JobStatusRetriever.NA_KEY);
+ }
+ if (!jobStatus.contains(TimingEvent.FlowEventConstants.JOB_GROUP_FIELD))
{
+ jobStatus.setProp(TimingEvent.FlowEventConstants.JOB_GROUP_FIELD,
JobStatusRetriever.NA_KEY);
+ }
+ String flowName =
jobStatus.getProp(TimingEvent.FlowEventConstants.FLOW_NAME_FIELD);
+ String flowGroup =
jobStatus.getProp(TimingEvent.FlowEventConstants.FLOW_GROUP_FIELD);
+ String flowExecutionId =
jobStatus.getProp(TimingEvent.FlowEventConstants.FLOW_EXECUTION_ID_FIELD);
+ String jobName =
jobStatus.getProp(TimingEvent.FlowEventConstants.JOB_NAME_FIELD);
+ String jobGroup =
jobStatus.getProp(TimingEvent.FlowEventConstants.JOB_GROUP_FIELD);
+ String storeName = jobStatusStoreName(flowGroup, flowName);
+ String tableName = jobStatusTableName(flowExecutionId, jobGroup,
jobName);
+
+ List<org.apache.gobblin.configuration.State> states =
stateStore.getAll(storeName, tableName);
+ if (states.size() > 0) {
+ org.apache.gobblin.configuration.State previousJobStatus =
states.get(states.size() - 1);
+ String previousStatus =
previousJobStatus.getProp(JobStatusRetriever.EVENT_NAME_FIELD);
+ String currentStatus =
jobStatus.getProp(JobStatusRetriever.EVENT_NAME_FIELD);
+ int previousGeneration =
previousJobStatus.getPropAsInt(TimingEvent.FlowEventConstants.CURRENT_GENERATION_FIELD,
1);
+ // This is to make the change backward compatible as we may not have
this info in cluster events
+ // If we does not have those info, we treat the event as coming from
the same attempts as previous one
+ int currentGeneration =
jobStatus.getPropAsInt(TimingEvent.FlowEventConstants.CURRENT_GENERATION_FIELD,
previousGeneration);
+ int previousAttempts =
previousJobStatus.getPropAsInt(TimingEvent.FlowEventConstants.CURRENT_ATTEMPTS_FIELD,
1);
+ int currentAttempts =
jobStatus.getPropAsInt(TimingEvent.FlowEventConstants.CURRENT_ATTEMPTS_FIELD,
previousAttempts);
+ // We use three things to accurately count and thereby bound retries,
even amidst out-of-order events (by skipping late arrivals).
+ // The generation is monotonically increasing, while the attempts may
re-initialize back to 0. this two-part form prevents the composite value from
ever repeating.
+ // And job status reflect the execution status in one attempt
+ if (previousStatus != null && currentStatus != null &&
(previousGeneration > currentGeneration || (
+ previousGeneration == currentGeneration && previousAttempts >
currentAttempts) || (previousGeneration == currentGeneration &&
previousAttempts == currentAttempts
+ &&
ORDERED_EXECUTION_STATUSES.indexOf(ExecutionStatus.valueOf(currentStatus))
+ <
ORDERED_EXECUTION_STATUSES.indexOf(ExecutionStatus.valueOf(previousStatus))))) {
+ log.warn(String.format(
+ "Received status [generation.attempts] = %s [%s.%s] when already
%s [%s.%s] for flow (%s, %s, %s), job (%s, %s)",
+ currentStatus, currentGeneration, currentAttempts,
previousStatus, previousGeneration, previousAttempts,
+ flowGroup, flowName, flowExecutionId, jobGroup, jobName));
+ jobStatus = mergeState(states.get(states.size() - 1), jobStatus);
+ } else {
+ jobStatus = mergeState(jobStatus, states.get(states.size() - 1));
+ }
}
- }
- modifyStateIfRetryRequired(jobStatus);
- stateStore.put(storeName, tableName, jobStatus);
+ modifyStateIfRetryRequired(jobStatus);
+ stateStore.put(storeName, tableName, jobStatus);
+ } catch (Exception e) {
+ log.warn("Meet exception when adding jobStatus to state store at "
+ + e.getStackTrace()[0].getClassName() + "line number: " +
e.getStackTrace()[0].getLineNumber(), e);
Review Comment:
won't the exception stack trace be logged, since you pass it as the final
arg?
##########
gobblin-service/src/main/java/org/apache/gobblin/service/monitoring/KafkaJobStatusMonitor.java:
##########
@@ -221,56 +221,63 @@ protected void
processMessage(DecodeableKafkaRecord<byte[],byte[]> message) {
@VisibleForTesting
static void addJobStatusToStateStore(org.apache.gobblin.configuration.State
jobStatus, StateStore stateStore)
throws IOException {
- if (!jobStatus.contains(TimingEvent.FlowEventConstants.JOB_NAME_FIELD)) {
- jobStatus.setProp(TimingEvent.FlowEventConstants.JOB_NAME_FIELD,
JobStatusRetriever.NA_KEY);
- }
- if (!jobStatus.contains(TimingEvent.FlowEventConstants.JOB_GROUP_FIELD)) {
- jobStatus.setProp(TimingEvent.FlowEventConstants.JOB_GROUP_FIELD,
JobStatusRetriever.NA_KEY);
- }
- String flowName =
jobStatus.getProp(TimingEvent.FlowEventConstants.FLOW_NAME_FIELD);
- String flowGroup =
jobStatus.getProp(TimingEvent.FlowEventConstants.FLOW_GROUP_FIELD);
- String flowExecutionId =
jobStatus.getProp(TimingEvent.FlowEventConstants.FLOW_EXECUTION_ID_FIELD);
- String jobName =
jobStatus.getProp(TimingEvent.FlowEventConstants.JOB_NAME_FIELD);
- String jobGroup =
jobStatus.getProp(TimingEvent.FlowEventConstants.JOB_GROUP_FIELD);
- String storeName = jobStatusStoreName(flowGroup, flowName);
- String tableName = jobStatusTableName(flowExecutionId, jobGroup, jobName);
-
- List<org.apache.gobblin.configuration.State> states =
stateStore.getAll(storeName, tableName);
- if (states.size() > 0) {
- org.apache.gobblin.configuration.State previousJobStatus =
states.get(states.size() - 1);
- String previousStatus =
previousJobStatus.getProp(JobStatusRetriever.EVENT_NAME_FIELD);
- String currentStatus =
jobStatus.getProp(JobStatusRetriever.EVENT_NAME_FIELD);
- int previousGeneration =
previousJobStatus.getPropAsInt(TimingEvent.FlowEventConstants.CURRENT_GENERATION_FIELD,
1);
- // This is to make the change backward compatible as we may not have
this info in cluster events
- // If we does not have those info, we treat the event as coming from the
same attempts as previous one
- int currentGeneration =
jobStatus.getPropAsInt(TimingEvent.FlowEventConstants.CURRENT_GENERATION_FIELD,
previousGeneration);
- int previousAttempts =
previousJobStatus.getPropAsInt(TimingEvent.FlowEventConstants.CURRENT_ATTEMPTS_FIELD,
1);
- int currentAttempts =
jobStatus.getPropAsInt(TimingEvent.FlowEventConstants.CURRENT_ATTEMPTS_FIELD,
previousAttempts);
- // We use three things to accurately count and thereby bound retries,
even amidst out-of-order events (by skipping late arrivals).
- // The generation is monotonically increasing, while the attempts may
re-initialize back to 0. this two-part form prevents the composite value from
ever repeating.
- // And job status reflect the execution status in one attempt
- if (previousStatus != null && currentStatus != null &&
- (previousGeneration > currentGeneration
- || (previousGeneration == currentGeneration && previousAttempts
> currentAttempts)
- || (previousGeneration == currentGeneration && previousAttempts
== currentAttempts
- &&
ORDERED_EXECUTION_STATUSES.indexOf(ExecutionStatus.valueOf(currentStatus)) <
ORDERED_EXECUTION_STATUSES.indexOf(ExecutionStatus.valueOf(previousStatus))))){
- log.warn(String.format("Received status [generation.attempts] = %s
[%s.%s] when already %s [%s.%s] for flow (%s, %s, %s), job (%s, %s)",
- currentStatus, currentGeneration, currentAttempts, previousStatus,
previousGeneration, previousAttempts, flowGroup, flowName, flowExecutionId,
jobGroup, jobName));
- jobStatus = mergeState(states.get(states.size() - 1), jobStatus);
- } else {
- jobStatus = mergeState(jobStatus, states.get(states.size() - 1));
+ try {
+ if (!jobStatus.contains(TimingEvent.FlowEventConstants.JOB_NAME_FIELD)) {
+ jobStatus.setProp(TimingEvent.FlowEventConstants.JOB_NAME_FIELD,
JobStatusRetriever.NA_KEY);
+ }
+ if (!jobStatus.contains(TimingEvent.FlowEventConstants.JOB_GROUP_FIELD))
{
+ jobStatus.setProp(TimingEvent.FlowEventConstants.JOB_GROUP_FIELD,
JobStatusRetriever.NA_KEY);
+ }
+ String flowName =
jobStatus.getProp(TimingEvent.FlowEventConstants.FLOW_NAME_FIELD);
+ String flowGroup =
jobStatus.getProp(TimingEvent.FlowEventConstants.FLOW_GROUP_FIELD);
+ String flowExecutionId =
jobStatus.getProp(TimingEvent.FlowEventConstants.FLOW_EXECUTION_ID_FIELD);
+ String jobName =
jobStatus.getProp(TimingEvent.FlowEventConstants.JOB_NAME_FIELD);
+ String jobGroup =
jobStatus.getProp(TimingEvent.FlowEventConstants.JOB_GROUP_FIELD);
+ String storeName = jobStatusStoreName(flowGroup, flowName);
+ String tableName = jobStatusTableName(flowExecutionId, jobGroup,
jobName);
+
+ List<org.apache.gobblin.configuration.State> states =
stateStore.getAll(storeName, tableName);
+ if (states.size() > 0) {
+ org.apache.gobblin.configuration.State previousJobStatus =
states.get(states.size() - 1);
+ String previousStatus =
previousJobStatus.getProp(JobStatusRetriever.EVENT_NAME_FIELD);
+ String currentStatus =
jobStatus.getProp(JobStatusRetriever.EVENT_NAME_FIELD);
+ int previousGeneration =
previousJobStatus.getPropAsInt(TimingEvent.FlowEventConstants.CURRENT_GENERATION_FIELD,
1);
+ // This is to make the change backward compatible as we may not have
this info in cluster events
+ // If we does not have those info, we treat the event as coming from
the same attempts as previous one
+ int currentGeneration =
jobStatus.getPropAsInt(TimingEvent.FlowEventConstants.CURRENT_GENERATION_FIELD,
previousGeneration);
+ int previousAttempts =
previousJobStatus.getPropAsInt(TimingEvent.FlowEventConstants.CURRENT_ATTEMPTS_FIELD,
1);
+ int currentAttempts =
jobStatus.getPropAsInt(TimingEvent.FlowEventConstants.CURRENT_ATTEMPTS_FIELD,
previousAttempts);
+ // We use three things to accurately count and thereby bound retries,
even amidst out-of-order events (by skipping late arrivals).
+ // The generation is monotonically increasing, while the attempts may
re-initialize back to 0. this two-part form prevents the composite value from
ever repeating.
+ // And job status reflect the execution status in one attempt
+ if (previousStatus != null && currentStatus != null &&
(previousGeneration > currentGeneration || (
+ previousGeneration == currentGeneration && previousAttempts >
currentAttempts) || (previousGeneration == currentGeneration &&
previousAttempts == currentAttempts
+ &&
ORDERED_EXECUTION_STATUSES.indexOf(ExecutionStatus.valueOf(currentStatus))
+ <
ORDERED_EXECUTION_STATUSES.indexOf(ExecutionStatus.valueOf(previousStatus))))) {
+ log.warn(String.format(
+ "Received status [generation.attempts] = %s [%s.%s] when already
%s [%s.%s] for flow (%s, %s, %s), job (%s, %s)",
+ currentStatus, currentGeneration, currentAttempts,
previousStatus, previousGeneration, previousAttempts,
+ flowGroup, flowName, flowExecutionId, jobGroup, jobName));
+ jobStatus = mergeState(states.get(states.size() - 1), jobStatus);
+ } else {
+ jobStatus = mergeState(jobStatus, states.get(states.size() - 1));
+ }
}
- }
- modifyStateIfRetryRequired(jobStatus);
- stateStore.put(storeName, tableName, jobStatus);
+ modifyStateIfRetryRequired(jobStatus);
+ stateStore.put(storeName, tableName, jobStatus);
+ } catch (Exception e) {
Review Comment:
what kind of exception were we seeing here? curious whether the entire
block above needs wrapping or perhaps just the tail end subset of it
--
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
To unsubscribe, e-mail: [email protected]
For queries about this service, please contact Infrastructure at:
[email protected]