This is an automated email from the ASF dual-hosted git repository.

wlo pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/gobblin.git


The following commit(s) were added to refs/heads/master by this push:
     new 13faea46b [GOBBLIN-1782] Fix Merge State for Flow Pending Resume 
statuses (#3639)
13faea46b is described below

commit 13faea46bd2f23999fb1bf9ea579296fb86d1e3d
Author: meethngala <[email protected]>
AuthorDate: Wed Feb 8 14:03:19 2023 -0800

    [GOBBLIN-1782] Fix Merge State for Flow Pending Resume statuses (#3639)
    
    * fix merge state for flow pending resume flow statuses
    
    * address feedback on the PR
    
    ---------
    
    Co-authored-by: Meeth Gala <[email protected]>
---
 .../runtime/KafkaAvroJobStatusMonitorTest.java     | 64 ++++++++++++++++++++++
 .../service/monitoring/KafkaJobStatusMonitor.java  | 13 ++++-
 2 files changed, 75 insertions(+), 2 deletions(-)

diff --git 
a/gobblin-modules/gobblin-kafka-09/src/test/java/org/apache/gobblin/runtime/KafkaAvroJobStatusMonitorTest.java
 
b/gobblin-modules/gobblin-kafka-09/src/test/java/org/apache/gobblin/runtime/KafkaAvroJobStatusMonitorTest.java
index 6f1069e15..b9a3521a8 100644
--- 
a/gobblin-modules/gobblin-kafka-09/src/test/java/org/apache/gobblin/runtime/KafkaAvroJobStatusMonitorTest.java
+++ 
b/gobblin-modules/gobblin-kafka-09/src/test/java/org/apache/gobblin/runtime/KafkaAvroJobStatusMonitorTest.java
@@ -429,6 +429,62 @@ public class KafkaAvroJobStatusMonitorTest {
     jobStatusMonitor.shutDown();
   }
 
+  @Test (dependsOnMethods = 
"testProcessingRetriedForApparentlyTransientErrors")
+  public void testProcessMessageForFlowPendingResume() throws IOException, 
ReflectiveOperationException {
+    KafkaEventReporter kafkaReporter = builder.build("localhost:0000", 
"topic4");
+
+    //Submit GobblinTrackingEvents to Kafka
+    ImmutableList.of(
+        createFlowCompiledEvent(),
+        createJobOrchestratedEvent(1, 2),
+        createJobCancelledEvent(),
+        createFlowPendingResumeEvent(),
+        createJobOrchestratedEvent(2, 2),
+        createJobStartEvent(),
+        createJobSucceededEvent()
+    ).forEach(event -> {
+      context.submitEvent(event);
+      kafkaReporter.report();
+    });
+
+    try {
+      Thread.sleep(1000);
+    } catch(InterruptedException ex) {
+      Thread.currentThread().interrupt();
+    }
+
+    MockKafkaAvroJobStatusMonitor jobStatusMonitor = 
createMockKafkaAvroJobStatusMonitor(new AtomicBoolean(false), 
ConfigFactory.empty(), new NoopGaaSObservabilityEventProducer());
+    jobStatusMonitor.buildMetricsContextAndMetrics();
+    Iterator<DecodeableKafkaRecord> recordIterator = Iterators.transform(
+        this.kafkaTestHelper.getIteratorForTopic(TOPIC),
+        this::convertMessageAndMetadataToDecodableKafkaRecord);
+
+    State state = getNextJobStatusState(jobStatusMonitor, recordIterator, 
"NA", "NA");
+    Assert.assertEquals(state.getProp(JobStatusRetriever.EVENT_NAME_FIELD), 
ExecutionStatus.COMPILED.name());
+
+    state = getNextJobStatusState(jobStatusMonitor, recordIterator, 
this.jobGroup, this.jobName);
+    Assert.assertEquals(state.getProp(JobStatusRetriever.EVENT_NAME_FIELD), 
ExecutionStatus.ORCHESTRATED.name());
+
+    state = getNextJobStatusState(jobStatusMonitor, recordIterator, 
this.jobGroup, this.jobName);
+    Assert.assertEquals(state.getProp(JobStatusRetriever.EVENT_NAME_FIELD), 
ExecutionStatus.CANCELLED.name());
+
+    // Job for flow pending resume status after it was cancelled or failed
+    state = getNextJobStatusState(jobStatusMonitor, recordIterator, "NA", 
"NA");
+    Assert.assertEquals(state.getProp(JobStatusRetriever.EVENT_NAME_FIELD), 
ExecutionStatus.PENDING_RESUME.name());
+
+    state = getNextJobStatusState(jobStatusMonitor, recordIterator, 
this.jobGroup, this.jobName);
+    //Job orchestrated for retrying
+    Assert.assertEquals(state.getProp(JobStatusRetriever.EVENT_NAME_FIELD), 
ExecutionStatus.ORCHESTRATED.name());
+
+    state = getNextJobStatusState(jobStatusMonitor, recordIterator, 
this.jobGroup, this.jobName);
+    Assert.assertEquals(state.getProp(JobStatusRetriever.EVENT_NAME_FIELD), 
ExecutionStatus.RUNNING.name());
+
+    state = getNextJobStatusState(jobStatusMonitor, recordIterator, 
this.jobGroup, this.jobName);
+    Assert.assertEquals(state.getProp(JobStatusRetriever.EVENT_NAME_FIELD), 
ExecutionStatus.COMPLETE.name());
+
+    jobStatusMonitor.shutDown();
+  }
+
   @Test (dependsOnMethods = "testProcessMessageForCancelledAndKilledEvent")
   public void testProcessProgressingMessageWhenNoPreviousStatus() throws 
IOException, ReflectiveOperationException {
     KafkaEventReporter kafkaReporter = builder.build("localhost:0000", 
"topic5");
@@ -619,6 +675,14 @@ public class KafkaAvroJobStatusMonitorTest {
     return createGTE(TimingEvent.FlowTimings.FLOW_START_DEADLINE_EXCEEDED, 
Maps.newHashMap());
   }
 
+  private GobblinTrackingEvent createFlowPendingResumeEvent() {
+    GobblinTrackingEvent event = 
createGTE(TimingEvent.FlowTimings.FLOW_PENDING_RESUME, Maps.newHashMap());
+    event.getMetadata().remove(TimingEvent.FlowEventConstants.JOB_NAME_FIELD);
+    event.getMetadata().remove(TimingEvent.FlowEventConstants.JOB_GROUP_FIELD);
+    return event;
+
+  }
+
   private GobblinTrackingEvent createGTE(String eventName, Map<String, String> 
customMetadata) {
     String namespace = "org.apache.gobblin.metrics";
     Long timestamp = System.currentTimeMillis();
diff --git 
a/gobblin-service/src/main/java/org/apache/gobblin/service/monitoring/KafkaJobStatusMonitor.java
 
b/gobblin-service/src/main/java/org/apache/gobblin/service/monitoring/KafkaJobStatusMonitor.java
index 7d1654af3..87e1c3b5a 100644
--- 
a/gobblin-service/src/main/java/org/apache/gobblin/service/monitoring/KafkaJobStatusMonitor.java
+++ 
b/gobblin-service/src/main/java/org/apache/gobblin/service/monitoring/KafkaJobStatusMonitor.java
@@ -251,13 +251,17 @@ public abstract class KafkaJobStatusMonitor extends 
HighLevelConsumer<byte[], by
         int currentGeneration = 
jobStatus.getPropAsInt(TimingEvent.FlowEventConstants.CURRENT_GENERATION_FIELD, 
previousGeneration);
         int previousAttempts = 
previousJobStatus.getPropAsInt(TimingEvent.FlowEventConstants.CURRENT_ATTEMPTS_FIELD,
 1);
         int currentAttempts = 
jobStatus.getPropAsInt(TimingEvent.FlowEventConstants.CURRENT_ATTEMPTS_FIELD, 
previousAttempts);
+        // Verify if the current job status is flow status. If yes, we check 
for its current execution status to be PENDING_RESUME (limiting to just resume 
flow statuses)
+        // When the above two conditions satisfy, we NEED NOT check for the 
out-of-order events since GaaS would manage the lifecycle of these events
+        // Hence, we update the merge state accordingly so that the flow can 
proceed with its execution to the next state in the DAG
+        boolean isFlowStatusAndPendingResume = 
isFlowStatusAndPendingResume(jobName, jobGroup, currentStatus);
         // We use three things to accurately count and thereby bound retries, 
even amidst out-of-order events (by skipping late arrivals).
         // The generation is monotonically increasing, while the attempts may 
re-initialize back to 0. this two-part form prevents the composite value from 
ever repeating.
         // And job status reflect the execution status in one attempt
-        if (previousStatus != null && currentStatus != null && 
(previousGeneration > currentGeneration || (
+         if (!isFlowStatusAndPendingResume && (previousStatus != null && 
currentStatus != null && (previousGeneration > currentGeneration || (
             previousGeneration == currentGeneration && previousAttempts > 
currentAttempts) || (previousGeneration == currentGeneration && 
previousAttempts == currentAttempts
             && 
ORDERED_EXECUTION_STATUSES.indexOf(ExecutionStatus.valueOf(currentStatus))
-            < 
ORDERED_EXECUTION_STATUSES.indexOf(ExecutionStatus.valueOf(previousStatus))))) {
+            < 
ORDERED_EXECUTION_STATUSES.indexOf(ExecutionStatus.valueOf(previousStatus)))))) 
{
           log.warn(String.format(
               "Received status [generation.attempts] = %s [%s.%s] when already 
%s [%s.%s] for flow (%s, %s, %s), job (%s, %s)",
               currentStatus, currentGeneration, currentAttempts, 
previousStatus, previousGeneration, previousAttempts,
@@ -280,6 +284,11 @@ public abstract class KafkaJobStatusMonitor extends 
HighLevelConsumer<byte[], by
     }
   }
 
+  private static boolean isFlowStatusAndPendingResume(String jobName, String 
jobGroup, String currentStatus) {
+    return jobName != null && jobGroup != null && 
jobName.equals(JobStatusRetriever.NA_KEY) && 
jobGroup.equals(JobStatusRetriever.NA_KEY)
+        && currentStatus.equals(ExecutionStatus.PENDING_RESUME.name());
+  }
+
   private static void 
modifyStateIfRetryRequired(org.apache.gobblin.configuration.State state) {
     int maxAttempts = 
state.getPropAsInt(TimingEvent.FlowEventConstants.MAX_ATTEMPTS_FIELD, 1);
     int currentAttempts = 
state.getPropAsInt(TimingEvent.FlowEventConstants.CURRENT_ATTEMPTS_FIELD, 1);

Reply via email to