This is an automated email from the ASF dual-hosted git repository.

arjun4084346 pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/gobblin.git


The following commit(s) were added to refs/heads/master by this push:
     new 977928ed3e [GOBBLIN-2127] do not emit GaaSObservabilityEvent for a 
failed job that is going to be retried (#4018)
977928ed3e is described below

commit 977928ed3ecee82bd44132906312c088f1d5ca28
Author: Arjun Singh Bora <[email protected]>
AuthorDate: Wed Aug 7 15:18:27 2024 -0700

    [GOBBLIN-2127] do not emit GaaSObservabilityEvent for a failed job that is 
going to be retried (#4018)
    
    * modify job status to pending_retry in the right order
---
 .../runtime/KafkaAvroJobStatusMonitorTest.java       | 12 +++++++-----
 .../service/monitoring/FlowStatusGenerator.java      |  2 +-
 .../service/monitoring/KafkaJobStatusMonitor.java    | 20 ++++++++++++++++----
 .../service/monitoring/JobStatusRetrieverTest.java   |  7 ++++---
 4 files changed, 28 insertions(+), 13 deletions(-)

diff --git 
a/gobblin-modules/gobblin-kafka-09/src/test/java/org/apache/gobblin/runtime/KafkaAvroJobStatusMonitorTest.java
 
b/gobblin-modules/gobblin-kafka-09/src/test/java/org/apache/gobblin/runtime/KafkaAvroJobStatusMonitorTest.java
index d2f1588de4..ce820b1140 100644
--- 
a/gobblin-modules/gobblin-kafka-09/src/test/java/org/apache/gobblin/runtime/KafkaAvroJobStatusMonitorTest.java
+++ 
b/gobblin-modules/gobblin-kafka-09/src/test/java/org/apache/gobblin/runtime/KafkaAvroJobStatusMonitorTest.java
@@ -477,28 +477,30 @@ public class KafkaAvroJobStatusMonitorTest {
     
Assert.assertEquals(state.getProp(TimingEvent.FlowEventConstants.SHOULD_RETRY_FIELD),
 Boolean.toString(true));
     Mockito.verify(dagManagementStateStore, 
Mockito.times(1)).addJobDagAction(any(), any(), anyLong(), any(),
         eq(DagActionStore.DagActionType.REEVALUATE));
-    Mockito.verify(dagManagementStateStore, 
Mockito.never()).deleteDagAction(eq(this.enforceJobStartDeadlineDagAction));
+    // this is a cancelled job, so its JobStartDeadlineDagAction should get 
removed
+    Mockito.verify(dagManagementStateStore, 
Mockito.times(1)).deleteDagAction(eq(this.enforceJobStartDeadlineDagAction));
 
     state = getNextJobStatusState(jobStatusMonitor, recordIterator, 
this.jobGroup, this.jobName);
     //Job orchestrated for retrying
     Assert.assertEquals(state.getProp(JobStatusRetriever.EVENT_NAME_FIELD), 
ExecutionStatus.ORCHESTRATED.name());
     Mockito.verify(dagManagementStateStore, 
Mockito.times(1)).addJobDagAction(any(), any(), anyLong(), any(),
         eq(DagActionStore.DagActionType.REEVALUATE));
-    Mockito.verify(dagManagementStateStore, 
Mockito.never()).deleteDagAction(eq(this.enforceJobStartDeadlineDagAction));
+    Mockito.verify(dagManagementStateStore, 
Mockito.times(1)).deleteDagAction(eq(this.enforceJobStartDeadlineDagAction));
 
     state = getNextJobStatusState(jobStatusMonitor, recordIterator, 
this.jobGroup, this.jobName);
     Assert.assertEquals(state.getProp(JobStatusRetriever.EVENT_NAME_FIELD), 
ExecutionStatus.PENDING_RETRY.name());
     // second pending retry, creates a second reevaluate dag proc
     Mockito.verify(dagManagementStateStore, 
Mockito.times(2)).addJobDagAction(any(), any(), anyLong(), any(),
         eq(DagActionStore.DagActionType.REEVALUATE));
-    Mockito.verify(dagManagementStateStore, 
Mockito.never()).deleteDagAction(eq(this.enforceJobStartDeadlineDagAction));
+    // cancelled again, so JobStartDeadlineDagAction is removed again
+    Mockito.verify(dagManagementStateStore, 
Mockito.times(2)).deleteDagAction(eq(this.enforceJobStartDeadlineDagAction));
 
     state = getNextJobStatusState(jobStatusMonitor, recordIterator, 
this.jobGroup, this.jobName);
     //Job orchestrated for retrying
     Assert.assertEquals(state.getProp(JobStatusRetriever.EVENT_NAME_FIELD), 
ExecutionStatus.ORCHESTRATED.name());
     Mockito.verify(dagManagementStateStore, 
Mockito.times(2)).addJobDagAction(any(), any(), anyLong(), any(),
         eq(DagActionStore.DagActionType.REEVALUATE));
-    Mockito.verify(dagManagementStateStore, 
Mockito.never()).deleteDagAction(eq(this.enforceJobStartDeadlineDagAction));
+    Mockito.verify(dagManagementStateStore, 
Mockito.times(2)).deleteDagAction(eq(this.enforceJobStartDeadlineDagAction));
 
     state = getNextJobStatusState(jobStatusMonitor, recordIterator, 
this.jobGroup, this.jobName);
     // Received kill flow event, should not retry the flow even though there 
is 1 pending attempt left
@@ -506,7 +508,7 @@ public class KafkaAvroJobStatusMonitorTest {
     
Assert.assertEquals(state.getProp(TimingEvent.FlowEventConstants.SHOULD_RETRY_FIELD),
 Boolean.toString(false));
     Mockito.verify(dagManagementStateStore, 
Mockito.times(3)).addJobDagAction(any(), any(),
         anyLong(), any(), eq(DagActionStore.DagActionType.REEVALUATE));
-    Mockito.verify(dagManagementStateStore, 
Mockito.times(1)).deleteDagAction(eq(this.enforceJobStartDeadlineDagAction));
+    Mockito.verify(dagManagementStateStore, 
Mockito.times(3)).deleteDagAction(eq(this.enforceJobStartDeadlineDagAction));
 
     jobStatusMonitor.shutDown();
   }
diff --git 
a/gobblin-runtime/src/main/java/org/apache/gobblin/service/monitoring/FlowStatusGenerator.java
 
b/gobblin-runtime/src/main/java/org/apache/gobblin/service/monitoring/FlowStatusGenerator.java
index 3be35de041..b4ec8c3530 100644
--- 
a/gobblin-runtime/src/main/java/org/apache/gobblin/service/monitoring/FlowStatusGenerator.java
+++ 
b/gobblin-runtime/src/main/java/org/apache/gobblin/service/monitoring/FlowStatusGenerator.java
@@ -41,7 +41,7 @@ import org.apache.gobblin.service.ExecutionStatus;
 @Slf4j
 public class FlowStatusGenerator {
   public static final List<String> FINISHED_STATUSES = 
Lists.newArrayList(ExecutionStatus.FAILED.name(),
-      ExecutionStatus.COMPLETE.name(), ExecutionStatus.CANCELLED.name(), 
ExecutionStatus.PENDING_RETRY.name());
+      ExecutionStatus.COMPLETE.name(), ExecutionStatus.CANCELLED.name());
   public static final int MAX_LOOKBACK = 100;
 
   private final JobStatusRetriever jobStatusRetriever;
diff --git 
a/gobblin-service/src/main/java/org/apache/gobblin/service/monitoring/KafkaJobStatusMonitor.java
 
b/gobblin-service/src/main/java/org/apache/gobblin/service/monitoring/KafkaJobStatusMonitor.java
index 7061cd61f5..b897f74381 100644
--- 
a/gobblin-service/src/main/java/org/apache/gobblin/service/monitoring/KafkaJobStatusMonitor.java
+++ 
b/gobblin-service/src/main/java/org/apache/gobblin/service/monitoring/KafkaJobStatusMonitor.java
@@ -228,7 +228,15 @@ public abstract class KafkaJobStatusMonitor extends 
HighLevelConsumer<byte[], by
           String tableName = jobStatusTableName(flowExecutionId, jobGroup, 
jobName);
           String status = 
jobStatus.getProp(JobStatusRetriever.EVENT_NAME_FIELD);
 
-          if (updatedJobStatus.getRight() == NewState.FINISHED) {
+          // modify the status to be PENDING_RETRY only after calculating 
`updatedJobStatus via recalcJobStatus()`
+          // because ObservabilityEventProducer does not and should not 
understand `PENDING_RETRY` status in convertExecutionStatusTojobState()
+          // which is called inside emitObservabilityEvent()
+          // this can also be addressed by some other new job status like 
FAILED_PENDING_RETRY which does not alert the user
+          // as much as FAILED does if we chose to emit ObservabilityEvent for 
FAILED_PENDING_RETRY
+          boolean retryRequired = modifyStateIfRetryRequired(jobStatus);
+
+          if (updatedJobStatus.getRight() == NewState.FINISHED && 
!retryRequired) {
+            // do not send event if retry is required, because it can alert 
users to re-submit a job that is already set to be retried by GaaS
             this.eventProducer.emitObservabilityEvent(jobStatus);
           }
 
@@ -334,7 +342,6 @@ public abstract class KafkaJobStatusMonitor extends 
HighLevelConsumer<byte[], by
         }
       }
 
-      modifyStateIfRetryRequired(jobStatus);
       NewState newState = newState(jobStatus, states);
       String newStatus = 
jobStatus.getProp(JobStatusRetriever.EVENT_NAME_FIELD);
       if (newState == NewState.FINISHED) {
@@ -363,19 +370,24 @@ public abstract class KafkaJobStatusMonitor extends 
HighLevelConsumer<byte[], by
         && currentStatus.equals(ExecutionStatus.PENDING_RESUME.name());
   }
 
-  private static void 
modifyStateIfRetryRequired(org.apache.gobblin.configuration.State state) {
+  // if job retry is required, it sets the job status to PENDING_RETRY and 
returns true
+  @VisibleForTesting
+  static boolean 
modifyStateIfRetryRequired(org.apache.gobblin.configuration.State state) {
     int maxAttempts = 
state.getPropAsInt(TimingEvent.FlowEventConstants.MAX_ATTEMPTS_FIELD, 1);
     int currentAttempts = 
state.getPropAsInt(TimingEvent.FlowEventConstants.CURRENT_ATTEMPTS_FIELD, 1);
+    boolean retryRequired = false;
     // SHOULD_RETRY_FIELD maybe reset by JOB_COMPLETION_PERCENTAGE event
-    if (state.contains(JobStatusRetriever.EVENT_NAME_FIELD) 
&&(state.getProp(JobStatusRetriever.EVENT_NAME_FIELD).equals(ExecutionStatus.FAILED.name())
+    if (state.contains(JobStatusRetriever.EVENT_NAME_FIELD) && 
(state.getProp(JobStatusRetriever.EVENT_NAME_FIELD).equals(ExecutionStatus.FAILED.name())
         || 
state.getProp(JobStatusRetriever.EVENT_NAME_FIELD).equals(ExecutionStatus.PENDING_RETRY.name())
         || 
(state.getProp(JobStatusRetriever.EVENT_NAME_FIELD).equals(ExecutionStatus.CANCELLED.name())
 && 
state.contains(TimingEvent.FlowEventConstants.DOES_CANCELED_FLOW_MERIT_RETRY))
     ) && currentAttempts < maxAttempts) {
       state.setProp(TimingEvent.FlowEventConstants.SHOULD_RETRY_FIELD, true);
       state.setProp(JobStatusRetriever.EVENT_NAME_FIELD, 
ExecutionStatus.PENDING_RETRY.name());
       state.removeProp(TimingEvent.JOB_END_TIME);
+      retryRequired = true;
     }
     
state.removeProp(TimingEvent.FlowEventConstants.DOES_CANCELED_FLOW_MERIT_RETRY);
+    return retryRequired;
   }
 
   static boolean 
isNewStateTransitionToFinal(org.apache.gobblin.configuration.State 
currentState, List<org.apache.gobblin.configuration.State> prevStates) {
diff --git 
a/gobblin-service/src/test/java/org/apache/gobblin/service/monitoring/JobStatusRetrieverTest.java
 
b/gobblin-service/src/test/java/org/apache/gobblin/service/monitoring/JobStatusRetrieverTest.java
index 1b8a496f93..85891023b1 100644
--- 
a/gobblin-service/src/test/java/org/apache/gobblin/service/monitoring/JobStatusRetrieverTest.java
+++ 
b/gobblin-service/src/test/java/org/apache/gobblin/service/monitoring/JobStatusRetrieverTest.java
@@ -104,6 +104,7 @@ public abstract class JobStatusRetrieverTest {
     State jobStatus = new State(properties);
     Pair<State, KafkaJobStatusMonitor.NewState> updatedJobStatus = 
KafkaJobStatusMonitor.recalcJobStatus(jobStatus, 
this.jobStatusRetriever.getStateStore());
     jobStatus = updatedJobStatus.getLeft();
+    KafkaJobStatusMonitor.modifyStateIfRetryRequired(jobStatus);
     this.jobStatusRetriever.getStateStore().put(
         KafkaJobStatusMonitor.jobStatusStoreName(flowGroup, flowName),
         KafkaJobStatusMonitor.jobStatusTableName(flowExecutionId, jobGroup, 
jobName),
@@ -131,7 +132,7 @@ public abstract class JobStatusRetrieverTest {
       jobStatus = jobStatusIterator.next();
     }
     Assert.assertEquals(jobStatus.getEventName(), 
ExecutionStatus.PENDING_RETRY.name());
-    Assert.assertEquals(jobStatus.isShouldRetry(), true);
+    Assert.assertTrue(jobStatus.isShouldRetry());
     properties = createAttemptsProperties(1, 1, false);
     addJobStatusToStateStore(flowExecutionId, MY_JOB_NAME_1, 
ExecutionStatus.RUNNING.name(), JOB_START_TIME, JOB_START_TIME, properties);
     addJobStatusToStateStore(flowExecutionId, MY_JOB_NAME_1, 
ExecutionStatus.ORCHESTRATED.name(), JOB_ORCHESTRATED_TIME, 
JOB_ORCHESTRATED_TIME, properties);
@@ -141,7 +142,7 @@ public abstract class JobStatusRetrieverTest {
       jobStatus = jobStatusIterator.next();
     }
     Assert.assertEquals(jobStatus.getEventName(), 
ExecutionStatus.RUNNING.name());
-    Assert.assertEquals(jobStatus.isShouldRetry(), false);
+    Assert.assertFalse(jobStatus.isShouldRetry());
     Assert.assertEquals(jobStatus.getCurrentAttempts(), 1);
     Properties properties_new = createAttemptsProperties(2, 0, false);
     addJobStatusToStateStore(flowExecutionId, MY_JOB_NAME_1, 
ExecutionStatus.PENDING_RESUME.name(), JOB_START_TIME, JOB_START_TIME, 
properties_new);
@@ -186,7 +187,7 @@ public abstract class JobStatusRetrieverTest {
     Assert.assertEquals(jobStatus.getJobGroup(), jobGroup);
     Assert.assertFalse(jobStatusIterator.hasNext());
     Assert.assertEquals(ExecutionStatus.RUNNING,
-        
this.jobStatusRetriever.getFlowStatusFromJobStatuses(this.jobStatusRetriever.getJobStatusesForFlowExecution(FLOW_NAME,
 FLOW_GROUP, flowExecutionId)));
+        
JobStatusRetriever.getFlowStatusFromJobStatuses(this.jobStatusRetriever.getJobStatusesForFlowExecution(FLOW_NAME,
 FLOW_GROUP, flowExecutionId)));
 
     addJobStatusToStateStore(flowExecutionId, MY_JOB_NAME_2, 
ExecutionStatus.RUNNING.name());
     jobStatusIterator = 
this.jobStatusRetriever.getJobStatusesForFlowExecution(FLOW_NAME, FLOW_GROUP, 
flowExecutionId);

Reply via email to