This is an automated email from the ASF dual-hosted git repository.
arjun4084346 pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/gobblin.git
The following commit(s) were added to refs/heads/master by this push:
new 977928ed3e [GOBBLIN-2127] do not emit GaaSObservabilityEvent for a
failed job that is going to be retried (#4018)
977928ed3e is described below
commit 977928ed3ecee82bd44132906312c088f1d5ca28
Author: Arjun Singh Bora <[email protected]>
AuthorDate: Wed Aug 7 15:18:27 2024 -0700
[GOBBLIN-2127] do not emit GaaSObservabilityEvent for a failed job that is
going to be retried (#4018)
* modify job status to pending_retry in the right order
---
.../runtime/KafkaAvroJobStatusMonitorTest.java | 12 +++++++-----
.../service/monitoring/FlowStatusGenerator.java | 2 +-
.../service/monitoring/KafkaJobStatusMonitor.java | 20 ++++++++++++++++----
.../service/monitoring/JobStatusRetrieverTest.java | 7 ++++---
4 files changed, 28 insertions(+), 13 deletions(-)
diff --git
a/gobblin-modules/gobblin-kafka-09/src/test/java/org/apache/gobblin/runtime/KafkaAvroJobStatusMonitorTest.java
b/gobblin-modules/gobblin-kafka-09/src/test/java/org/apache/gobblin/runtime/KafkaAvroJobStatusMonitorTest.java
index d2f1588de4..ce820b1140 100644
---
a/gobblin-modules/gobblin-kafka-09/src/test/java/org/apache/gobblin/runtime/KafkaAvroJobStatusMonitorTest.java
+++
b/gobblin-modules/gobblin-kafka-09/src/test/java/org/apache/gobblin/runtime/KafkaAvroJobStatusMonitorTest.java
@@ -477,28 +477,30 @@ public class KafkaAvroJobStatusMonitorTest {
Assert.assertEquals(state.getProp(TimingEvent.FlowEventConstants.SHOULD_RETRY_FIELD),
Boolean.toString(true));
Mockito.verify(dagManagementStateStore,
Mockito.times(1)).addJobDagAction(any(), any(), anyLong(), any(),
eq(DagActionStore.DagActionType.REEVALUATE));
- Mockito.verify(dagManagementStateStore,
Mockito.never()).deleteDagAction(eq(this.enforceJobStartDeadlineDagAction));
+ // this is a cancelled job, so its JobStartDeadlineDagAction should get
removed
+ Mockito.verify(dagManagementStateStore,
Mockito.times(1)).deleteDagAction(eq(this.enforceJobStartDeadlineDagAction));
state = getNextJobStatusState(jobStatusMonitor, recordIterator,
this.jobGroup, this.jobName);
//Job orchestrated for retrying
Assert.assertEquals(state.getProp(JobStatusRetriever.EVENT_NAME_FIELD),
ExecutionStatus.ORCHESTRATED.name());
Mockito.verify(dagManagementStateStore,
Mockito.times(1)).addJobDagAction(any(), any(), anyLong(), any(),
eq(DagActionStore.DagActionType.REEVALUATE));
- Mockito.verify(dagManagementStateStore,
Mockito.never()).deleteDagAction(eq(this.enforceJobStartDeadlineDagAction));
+ Mockito.verify(dagManagementStateStore,
Mockito.times(1)).deleteDagAction(eq(this.enforceJobStartDeadlineDagAction));
state = getNextJobStatusState(jobStatusMonitor, recordIterator,
this.jobGroup, this.jobName);
Assert.assertEquals(state.getProp(JobStatusRetriever.EVENT_NAME_FIELD),
ExecutionStatus.PENDING_RETRY.name());
// second pending retry, creates a second reevaluate dag proc
Mockito.verify(dagManagementStateStore,
Mockito.times(2)).addJobDagAction(any(), any(), anyLong(), any(),
eq(DagActionStore.DagActionType.REEVALUATE));
- Mockito.verify(dagManagementStateStore,
Mockito.never()).deleteDagAction(eq(this.enforceJobStartDeadlineDagAction));
+ // cancelled again, so JobStartDeadlineDagAction is removed again
+ Mockito.verify(dagManagementStateStore,
Mockito.times(2)).deleteDagAction(eq(this.enforceJobStartDeadlineDagAction));
state = getNextJobStatusState(jobStatusMonitor, recordIterator,
this.jobGroup, this.jobName);
//Job orchestrated for retrying
Assert.assertEquals(state.getProp(JobStatusRetriever.EVENT_NAME_FIELD),
ExecutionStatus.ORCHESTRATED.name());
Mockito.verify(dagManagementStateStore,
Mockito.times(2)).addJobDagAction(any(), any(), anyLong(), any(),
eq(DagActionStore.DagActionType.REEVALUATE));
- Mockito.verify(dagManagementStateStore,
Mockito.never()).deleteDagAction(eq(this.enforceJobStartDeadlineDagAction));
+ Mockito.verify(dagManagementStateStore,
Mockito.times(2)).deleteDagAction(eq(this.enforceJobStartDeadlineDagAction));
state = getNextJobStatusState(jobStatusMonitor, recordIterator,
this.jobGroup, this.jobName);
// Received kill flow event, should not retry the flow even though there
is 1 pending attempt left
@@ -506,7 +508,7 @@ public class KafkaAvroJobStatusMonitorTest {
Assert.assertEquals(state.getProp(TimingEvent.FlowEventConstants.SHOULD_RETRY_FIELD),
Boolean.toString(false));
Mockito.verify(dagManagementStateStore,
Mockito.times(3)).addJobDagAction(any(), any(),
anyLong(), any(), eq(DagActionStore.DagActionType.REEVALUATE));
- Mockito.verify(dagManagementStateStore,
Mockito.times(1)).deleteDagAction(eq(this.enforceJobStartDeadlineDagAction));
+ Mockito.verify(dagManagementStateStore,
Mockito.times(3)).deleteDagAction(eq(this.enforceJobStartDeadlineDagAction));
jobStatusMonitor.shutDown();
}
diff --git
a/gobblin-runtime/src/main/java/org/apache/gobblin/service/monitoring/FlowStatusGenerator.java
b/gobblin-runtime/src/main/java/org/apache/gobblin/service/monitoring/FlowStatusGenerator.java
index 3be35de041..b4ec8c3530 100644
---
a/gobblin-runtime/src/main/java/org/apache/gobblin/service/monitoring/FlowStatusGenerator.java
+++
b/gobblin-runtime/src/main/java/org/apache/gobblin/service/monitoring/FlowStatusGenerator.java
@@ -41,7 +41,7 @@ import org.apache.gobblin.service.ExecutionStatus;
@Slf4j
public class FlowStatusGenerator {
public static final List<String> FINISHED_STATUSES =
Lists.newArrayList(ExecutionStatus.FAILED.name(),
- ExecutionStatus.COMPLETE.name(), ExecutionStatus.CANCELLED.name(),
ExecutionStatus.PENDING_RETRY.name());
+ ExecutionStatus.COMPLETE.name(), ExecutionStatus.CANCELLED.name());
public static final int MAX_LOOKBACK = 100;
private final JobStatusRetriever jobStatusRetriever;
diff --git
a/gobblin-service/src/main/java/org/apache/gobblin/service/monitoring/KafkaJobStatusMonitor.java
b/gobblin-service/src/main/java/org/apache/gobblin/service/monitoring/KafkaJobStatusMonitor.java
index 7061cd61f5..b897f74381 100644
---
a/gobblin-service/src/main/java/org/apache/gobblin/service/monitoring/KafkaJobStatusMonitor.java
+++
b/gobblin-service/src/main/java/org/apache/gobblin/service/monitoring/KafkaJobStatusMonitor.java
@@ -228,7 +228,15 @@ public abstract class KafkaJobStatusMonitor extends
HighLevelConsumer<byte[], by
String tableName = jobStatusTableName(flowExecutionId, jobGroup,
jobName);
String status =
jobStatus.getProp(JobStatusRetriever.EVENT_NAME_FIELD);
- if (updatedJobStatus.getRight() == NewState.FINISHED) {
+ // modify the status to be PENDING_RETRY only after calculating
`updatedJobStatus via recalcJobStatus()`
+ // because ObservabilityEventProducer does not and should not
understand `PENDING_RETRY` status in convertExecutionStatusTojobState()
+ // which is called inside emitObservabilityEvent()
+ // this can also be addressed by some other new job status like
FAILED_PENDING_RETRY which does not alert the user
+ // as much as FAILED does if we chose to emit ObservabilityEvent for
FAILED_PENDING_RETRY
+ boolean retryRequired = modifyStateIfRetryRequired(jobStatus);
+
+ if (updatedJobStatus.getRight() == NewState.FINISHED &&
!retryRequired) {
+ // do not send event if retry is required, because it can alert
users to re-submit a job that is already set to be retried by GaaS
this.eventProducer.emitObservabilityEvent(jobStatus);
}
@@ -334,7 +342,6 @@ public abstract class KafkaJobStatusMonitor extends
HighLevelConsumer<byte[], by
}
}
- modifyStateIfRetryRequired(jobStatus);
NewState newState = newState(jobStatus, states);
String newStatus =
jobStatus.getProp(JobStatusRetriever.EVENT_NAME_FIELD);
if (newState == NewState.FINISHED) {
@@ -363,19 +370,24 @@ public abstract class KafkaJobStatusMonitor extends
HighLevelConsumer<byte[], by
&& currentStatus.equals(ExecutionStatus.PENDING_RESUME.name());
}
- private static void
modifyStateIfRetryRequired(org.apache.gobblin.configuration.State state) {
+ // if job retry is required, it sets the job status to PENDING_RETRY and
returns true
+ @VisibleForTesting
+ static boolean
modifyStateIfRetryRequired(org.apache.gobblin.configuration.State state) {
int maxAttempts =
state.getPropAsInt(TimingEvent.FlowEventConstants.MAX_ATTEMPTS_FIELD, 1);
int currentAttempts =
state.getPropAsInt(TimingEvent.FlowEventConstants.CURRENT_ATTEMPTS_FIELD, 1);
+ boolean retryRequired = false;
// SHOULD_RETRY_FIELD maybe reset by JOB_COMPLETION_PERCENTAGE event
- if (state.contains(JobStatusRetriever.EVENT_NAME_FIELD)
&&(state.getProp(JobStatusRetriever.EVENT_NAME_FIELD).equals(ExecutionStatus.FAILED.name())
+ if (state.contains(JobStatusRetriever.EVENT_NAME_FIELD) &&
(state.getProp(JobStatusRetriever.EVENT_NAME_FIELD).equals(ExecutionStatus.FAILED.name())
||
state.getProp(JobStatusRetriever.EVENT_NAME_FIELD).equals(ExecutionStatus.PENDING_RETRY.name())
||
(state.getProp(JobStatusRetriever.EVENT_NAME_FIELD).equals(ExecutionStatus.CANCELLED.name())
&&
state.contains(TimingEvent.FlowEventConstants.DOES_CANCELED_FLOW_MERIT_RETRY))
) && currentAttempts < maxAttempts) {
state.setProp(TimingEvent.FlowEventConstants.SHOULD_RETRY_FIELD, true);
state.setProp(JobStatusRetriever.EVENT_NAME_FIELD,
ExecutionStatus.PENDING_RETRY.name());
state.removeProp(TimingEvent.JOB_END_TIME);
+ retryRequired = true;
}
state.removeProp(TimingEvent.FlowEventConstants.DOES_CANCELED_FLOW_MERIT_RETRY);
+ return retryRequired;
}
static boolean
isNewStateTransitionToFinal(org.apache.gobblin.configuration.State
currentState, List<org.apache.gobblin.configuration.State> prevStates) {
diff --git
a/gobblin-service/src/test/java/org/apache/gobblin/service/monitoring/JobStatusRetrieverTest.java
b/gobblin-service/src/test/java/org/apache/gobblin/service/monitoring/JobStatusRetrieverTest.java
index 1b8a496f93..85891023b1 100644
---
a/gobblin-service/src/test/java/org/apache/gobblin/service/monitoring/JobStatusRetrieverTest.java
+++
b/gobblin-service/src/test/java/org/apache/gobblin/service/monitoring/JobStatusRetrieverTest.java
@@ -104,6 +104,7 @@ public abstract class JobStatusRetrieverTest {
State jobStatus = new State(properties);
Pair<State, KafkaJobStatusMonitor.NewState> updatedJobStatus =
KafkaJobStatusMonitor.recalcJobStatus(jobStatus,
this.jobStatusRetriever.getStateStore());
jobStatus = updatedJobStatus.getLeft();
+ KafkaJobStatusMonitor.modifyStateIfRetryRequired(jobStatus);
this.jobStatusRetriever.getStateStore().put(
KafkaJobStatusMonitor.jobStatusStoreName(flowGroup, flowName),
KafkaJobStatusMonitor.jobStatusTableName(flowExecutionId, jobGroup,
jobName),
@@ -131,7 +132,7 @@ public abstract class JobStatusRetrieverTest {
jobStatus = jobStatusIterator.next();
}
Assert.assertEquals(jobStatus.getEventName(),
ExecutionStatus.PENDING_RETRY.name());
- Assert.assertEquals(jobStatus.isShouldRetry(), true);
+ Assert.assertTrue(jobStatus.isShouldRetry());
properties = createAttemptsProperties(1, 1, false);
addJobStatusToStateStore(flowExecutionId, MY_JOB_NAME_1,
ExecutionStatus.RUNNING.name(), JOB_START_TIME, JOB_START_TIME, properties);
addJobStatusToStateStore(flowExecutionId, MY_JOB_NAME_1,
ExecutionStatus.ORCHESTRATED.name(), JOB_ORCHESTRATED_TIME,
JOB_ORCHESTRATED_TIME, properties);
@@ -141,7 +142,7 @@ public abstract class JobStatusRetrieverTest {
jobStatus = jobStatusIterator.next();
}
Assert.assertEquals(jobStatus.getEventName(),
ExecutionStatus.RUNNING.name());
- Assert.assertEquals(jobStatus.isShouldRetry(), false);
+ Assert.assertFalse(jobStatus.isShouldRetry());
Assert.assertEquals(jobStatus.getCurrentAttempts(), 1);
Properties properties_new = createAttemptsProperties(2, 0, false);
addJobStatusToStateStore(flowExecutionId, MY_JOB_NAME_1,
ExecutionStatus.PENDING_RESUME.name(), JOB_START_TIME, JOB_START_TIME,
properties_new);
@@ -186,7 +187,7 @@ public abstract class JobStatusRetrieverTest {
Assert.assertEquals(jobStatus.getJobGroup(), jobGroup);
Assert.assertFalse(jobStatusIterator.hasNext());
Assert.assertEquals(ExecutionStatus.RUNNING,
-
this.jobStatusRetriever.getFlowStatusFromJobStatuses(this.jobStatusRetriever.getJobStatusesForFlowExecution(FLOW_NAME,
FLOW_GROUP, flowExecutionId)));
+
JobStatusRetriever.getFlowStatusFromJobStatuses(this.jobStatusRetriever.getJobStatusesForFlowExecution(FLOW_NAME,
FLOW_GROUP, flowExecutionId)));
addJobStatusToStateStore(flowExecutionId, MY_JOB_NAME_2,
ExecutionStatus.RUNNING.name());
jobStatusIterator =
this.jobStatusRetriever.getJobStatusesForFlowExecution(FLOW_NAME, FLOW_GROUP,
flowExecutionId);